Skip to content

Commit f12e2cf

Browse files
committed
fixes
1 parent 7a5397b commit f12e2cf

5 files changed

Lines changed: 174 additions & 30 deletions

File tree

processing/src/main/java/org/apache/druid/data/input/impl/ClusteredValueGroupsBaseTableProjectionSpec.java

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.druid.query.aggregation.AggregatorFactory;
3333
import org.apache.druid.segment.VirtualColumns;
3434
import org.apache.druid.segment.column.ColumnHolder;
35+
import org.apache.druid.segment.column.ColumnType;
36+
import org.apache.druid.segment.column.ValueType;
3537
import org.apache.druid.utils.CollectionUtils;
3638

3739
import javax.annotation.Nullable;
@@ -49,18 +51,19 @@
4951
* <p>
5052
* The operator declares a single ordered {@link #columns} list — the full set of columns in segment order, plus a
5153
* {@link #clusteringColumns} list of NAMES designating the leading prefix of {@link #columns} that rows are
52-
* clustered by. The time position is an explicit positional entry in {@link #columns} (named {@code __time}, or the
53-
* query-granularity column {@link Granularities#GRANULARITY_VIRTUAL_COLUMN_NAME}); clustering by the time column is not
54-
* yet supported, so the time marker must be a non-clustering column. A clustered base table is never rollup and has
55-
* no metric columns.
54+
* clustered by. The time position is an explicit positional entry in {@link #columns} named {@code __time}; clustering
55+
* by the time column is not yet supported, so {@code __time} must be a non-clustering column. A clustered base table
56+
* is never rollup and has no metric columns.
5657
* <p>
5758
* {@link #getDimensionsSpec()} returns the unified spec built from {@link #columns} in declared order with
5859
* {@code forceSegmentSortByTime=false}; {@link #getOrdering()} is computed as every column of {@link #columns}
5960
* ascending, in list order.
6061
* <p>
61-
* Query granularity, when wanted, is just another entry in {@link #getVirtualColumns()} named
62-
* {@link Granularities#GRANULARITY_VIRTUAL_COLUMN_NAME}; absent that virtual column the query granularity is
63-
* {@code NONE}. Segment granularity and intervals live on the top-level
62+
* Query granularity, when wanted, is a virtual column in {@link #getVirtualColumns()} named
63+
* {@link Granularities#GRANULARITY_VIRTUAL_COLUMN_NAME}. It is a granularity <em>carrier</em>: it supplies the
64+
* granularity that floors the stored {@code __time} column, and is NOT itself a stored column, so it never appears in
65+
* {@link #columns} (declare {@code __time} there as the time column). Absent that virtual column the query granularity
66+
* is {@code NONE}.
6467
* {@link org.apache.druid.indexer.granularity.SegmentGranularitySpec}, not here.
6568
*/
6669
@JsonTypeName(ClusteredValueGroupsBaseTableProjectionSpec.TYPE_NAME)
@@ -180,9 +183,19 @@ private static void validate(List<DimensionSchema> columns, List<String> cluster
180183
throw clusteringPrefixException(columns, clusteringColumns);
181184
}
182185
for (int i = 0; i < clusteringColumns.size(); i++) {
183-
if (!columns.get(i).getName().equals(clusteringColumns.get(i))) {
186+
final DimensionSchema clusteringColumn = columns.get(i);
187+
if (!clusteringColumn.getName().equals(clusteringColumns.get(i))) {
184188
throw clusteringPrefixException(columns, clusteringColumns);
185189
}
190+
// Clustering values are dictionary-encoded into per-type dictionaries on the write side, which supports only
191+
// these scalar types; reject anything else up front rather than failing later at ingest.
192+
if (!isSupportedClusteringType(clusteringColumn.getColumnType())) {
193+
throw InvalidInput.exception(
194+
"clustering column [%s] has unsupported type [%s]; clustering columns must be STRING, LONG, DOUBLE, or FLOAT",
195+
clusteringColumn.getName(),
196+
clusteringColumn.getColumnType()
197+
);
198+
}
186199
}
187200

188201
final Set<String> seen = Sets.newHashSetWithExpectedSize(columns.size());
@@ -193,40 +206,45 @@ private static void validate(List<DimensionSchema> columns, List<String> cluster
193206
}
194207

195208
int timeIndex = -1;
196-
boolean bothPresent = false;
197209
for (int i = 0; i < columns.size(); i++) {
198210
final String name = columns.get(i).getName();
199-
if (ColumnHolder.TIME_COLUMN_NAME.equals(name) || Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME.equals(name)) {
200-
if (timeIndex >= 0) {
201-
bothPresent = true;
202-
}
211+
// The query-granularity virtual column is a granularity carrier in virtualColumns (it floors the stored __time
212+
// column); it is not itself a stored column, so it must not be declared in 'columns'.
213+
if (Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME.equals(name)) {
214+
throw InvalidInput.exception(
215+
"[%s] is the query-granularity virtual column, not a stored column; declare it in 'virtualColumns' and use"
216+
+ " [%s] as the time column in 'columns'",
217+
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME,
218+
ColumnHolder.TIME_COLUMN_NAME
219+
);
220+
}
221+
if (ColumnHolder.TIME_COLUMN_NAME.equals(name)) {
203222
timeIndex = i;
204223
}
205224
}
206225
if (timeIndex < 0) {
207226
throw InvalidInput.exception(
208-
"clustered base table must include %s (or the query-granularity column [%s]) in 'columns' to define the"
209-
+ " time position",
210-
ColumnHolder.TIME_COLUMN_NAME,
211-
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
212-
);
213-
}
214-
if (bothPresent) {
215-
throw InvalidInput.exception(
216-
"clustered base table must include exactly one of %s / %s in 'columns' to define the time position",
217-
ColumnHolder.TIME_COLUMN_NAME,
218-
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
227+
"clustered base table must include [%s] in 'columns' to define the time position",
228+
ColumnHolder.TIME_COLUMN_NAME
219229
);
220230
}
221231
if (timeIndex < clusteringColumns.size()) {
222232
throw InvalidInput.exception(
223-
"clustering by %s / %s is not yet supported; the time column must be a non-clustering column",
224-
ColumnHolder.TIME_COLUMN_NAME,
225-
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
233+
"clustering by [%s] is not yet supported; the time column must be a non-clustering column",
234+
ColumnHolder.TIME_COLUMN_NAME
226235
);
227236
}
228237
}
229238

239+
private static boolean isSupportedClusteringType(ColumnType type)
240+
{
241+
return type != null
242+
&& (type.is(ValueType.STRING)
243+
|| type.is(ValueType.LONG)
244+
|| type.is(ValueType.DOUBLE)
245+
|| type.is(ValueType.FLOAT));
246+
}
247+
230248
private static DruidException clusteringPrefixException(
231249
List<DimensionSchema> columns,
232250
List<String> clusteringColumns

processing/src/main/java/org/apache/druid/segment/incremental/OnHeapClusterGroup.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public final class OnHeapClusterGroup implements IncrementalIndexRowSelector
7474
private final AtomicInteger numEntries = new AtomicInteger(0);
7575
private final int groupTimePosition;
7676

77+
private final VirtualColumns virtualColumns;
7778
private final ColumnSelectorFactory virtualSelectorFactory;
7879

7980
OnHeapClusterGroup(
@@ -114,6 +115,7 @@ public final class OnHeapClusterGroup implements IncrementalIndexRowSelector
114115
this.factsHolder = new OnheapIncrementalIndex.PlainNonTimeOrderedFactsHolder(rowComparator);
115116
}
116117

118+
this.virtualColumns = virtualColumns;
117119
this.virtualSelectorFactory = new OnheapIncrementalIndex.CachingColumnSelectorFactory(
118120
IncrementalIndex.makeColumnSelectorFactory(virtualColumns, inputRowHolder, null)
119121
);
@@ -301,10 +303,16 @@ boolean addToFacts(
301303
long dimsKeySize = 0L;
302304
for (int i = 0; i < dimensions.size(); i++) {
303305
final IncrementalIndex.DimensionDesc desc = dimensions.get(i);
306+
final String name = desc.getName();
307+
// A column declared as a virtual-column output is computed through the (VC-aware) selector factory; a plain
308+
// column is read straight from the raw row.
309+
final Object dimValue = virtualColumns.exists(name)
310+
? virtualSelectorFactory.makeColumnValueSelector(name).getObject()
311+
: row.getRaw(name);
304312
try {
305313
@SuppressWarnings({"unchecked", "rawtypes"})
306314
final EncodedKeyComponent<?> k = ((DimensionIndexer) desc.getIndexer())
307-
.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(desc.getName()), true);
315+
.processRowValsToUnsortedEncodedKeyComponent(dimValue, true);
308316
groupDims[i] = k.getComponent();
309317
dimsKeySize += k.getEffectiveSizeBytes();
310318
}

processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,8 +677,7 @@ public void close()
677677

678678
/**
679679
* Caches references to selector objects for each column instead of creating a new object each time in order to save
680-
* heap space. In general the selectorFactory need not to thread-safe. If required, set concurrentEventAdd to true to
681-
* use concurrent hash map instead of vanilla hash map for thread-safe operations.
680+
* heap space.
682681
*/
683682
static class CachingColumnSelectorFactory implements ColumnSelectorFactory
684683
{

processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryClusteredTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,20 @@
2727
import org.apache.druid.java.util.common.DateTimes;
2828
import org.apache.druid.java.util.common.granularity.Granularities;
2929
import org.apache.druid.query.dimension.DefaultDimensionSpec;
30+
import org.apache.druid.query.expression.TestExprMacroTable;
3031
import org.apache.druid.query.filter.EqualityFilter;
3132
import org.apache.druid.query.filter.Filter;
3233
import org.apache.druid.query.filter.TypedInFilter;
3334
import org.apache.druid.segment.Cursor;
3435
import org.apache.druid.segment.CursorBuildSpec;
3536
import org.apache.druid.segment.CursorHolder;
3637
import org.apache.druid.segment.DimensionSelector;
38+
import org.apache.druid.segment.VirtualColumns;
3739
import org.apache.druid.segment.column.ColumnHolder;
3840
import org.apache.druid.segment.column.ColumnType;
3941
import org.apache.druid.segment.column.RowSignature;
4042
import org.apache.druid.segment.filter.AndFilter;
43+
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
4144
import org.apache.druid.testing.InitializedNullHandlingTest;
4245
import org.junit.jupiter.api.Assertions;
4346
import org.junit.jupiter.api.Test;
@@ -114,6 +117,65 @@ private static List<List<String>> scanTenantRegion(CursorHolder holder)
114117
return out;
115118
}
116119

120+
@Test
121+
void testNonClusteringVirtualColumnDimensionIsMaterialized()
122+
{
123+
// A non-clustering column declared as a virtual-column output (region_upper = upper(region)) is computed at
124+
// ingest through the VC-aware selector and stored like any other column — VCs aren't limited to clustering
125+
// columns. region_upper is never in the input row, so a null here would mean the VC was not applied.
126+
final ClusteredValueGroupsBaseTableProjectionSpec spec = ClusteredValueGroupsBaseTableProjectionSpec.builder()
127+
.virtualColumns(VirtualColumns.create(
128+
new ExpressionVirtualColumn("region_upper", "upper(region)", ColumnType.STRING, TestExprMacroTable.INSTANCE)
129+
))
130+
.columns(
131+
new StringDimensionSchema("tenant"),
132+
new StringDimensionSchema("region"),
133+
new StringDimensionSchema("region_upper"),
134+
new LongDimensionSchema("__time")
135+
)
136+
.clusteringColumns("tenant")
137+
.build();
138+
final IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
139+
.withMinTimestamp(T0)
140+
.withTimestampSpec(TIMESTAMP_SPEC)
141+
.withQueryGranularity(Granularities.NONE)
142+
.withDimensionsSpec(spec.getDimensionsSpec())
143+
.withRollup(false)
144+
.withClusterSpec(spec)
145+
.build();
146+
try (OnheapIncrementalIndex index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
147+
.setIndexSchema(schema)
148+
.setMaxRowCount(10_000)
149+
.build()) {
150+
index.add(row(T0, "acme", "us-east-1"));
151+
index.add(row(T0 + 1, "acme", "us-west-2"));
152+
153+
final IncrementalIndexCursorFactory factory = new IncrementalIndexCursorFactory(index);
154+
try (CursorHolder holder = factory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
155+
final Cursor cursor = holder.asCursor();
156+
final DimensionSelector regionSel =
157+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region"));
158+
final DimensionSelector upperSel =
159+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of("region_upper"));
160+
final List<List<String>> out = new ArrayList<>();
161+
while (!cursor.isDone()) {
162+
out.add(Arrays.asList(
163+
regionSel.lookupName(regionSel.getRow().get(0)),
164+
upperSel.lookupName(upperSel.getRow().get(0))
165+
));
166+
cursor.advance();
167+
}
168+
Assertions.assertEquals(
169+
List.of(
170+
List.of("us-east-1", "US-EAST-1"),
171+
List.of("us-west-2", "US-WEST-2")
172+
),
173+
out
174+
);
175+
}
176+
}
177+
}
178+
117179
@Test
118180
void testRowSignatureExposesClusteringAndNonClusteringColumns()
119181
{

processing/src/test/java/org/apache/druid/segment/incremental/OnHeapClusteredBaseTableTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import org.apache.druid.data.input.impl.LongDimensionSchema;
2828
import org.apache.druid.data.input.impl.StringDimensionSchema;
2929
import org.apache.druid.data.input.impl.TimestampSpec;
30+
import org.apache.druid.error.DruidException;
3031
import org.apache.druid.java.util.common.DateTimes;
3132
import org.apache.druid.java.util.common.Intervals;
3233
import org.apache.druid.java.util.common.granularity.Granularities;
3334
import org.apache.druid.query.expression.TestExprMacroTable;
35+
import org.apache.druid.segment.AutoTypeColumnSchema;
3436
import org.apache.druid.segment.IndexableAdapter;
3537
import org.apache.druid.segment.Metadata;
3638
import org.apache.druid.segment.VirtualColumns;
@@ -145,6 +147,61 @@ void testNewClusterGroupsGrowHeapEstimate()
145147
}
146148
}
147149

150+
@Test
151+
void testQueryGranularityColumnRejectedInColumns()
152+
{
153+
// The query-granularity virtual column is a granularity carrier (it floors the stored __time column), not a stored
154+
// column, so it must not be declared in 'columns' — only __time defines the time position.
155+
final DruidException e = Assertions.assertThrows(
156+
DruidException.class,
157+
() -> ClusteredValueGroupsBaseTableProjectionSpec.builder()
158+
.columns(
159+
new StringDimensionSchema("tenant"),
160+
new LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
161+
)
162+
.clusteringColumns("tenant")
163+
.build()
164+
);
165+
Assertions.assertTrue(
166+
e.getMessage().contains(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME) && e.getMessage().contains("virtualColumns"),
167+
e.getMessage()
168+
);
169+
}
170+
171+
@Test
172+
void testUnsupportedClusteringColumnTypeRejected()
173+
{
174+
// Clustering values are dictionary-encoded by scalar type on the write side, so a non-scalar (here array) type is
175+
// rejected at spec-validation time rather than failing later at ingest.
176+
final DruidException e = Assertions.assertThrows(
177+
DruidException.class,
178+
() -> ClusteredValueGroupsBaseTableProjectionSpec.builder()
179+
.columns(
180+
new AutoTypeColumnSchema("tags", ColumnType.STRING_ARRAY, null),
181+
new LongDimensionSchema("__time")
182+
)
183+
.clusteringColumns("tags")
184+
.build()
185+
);
186+
Assertions.assertTrue(
187+
e.getMessage().contains("tags") && e.getMessage().contains("STRING, LONG, DOUBLE"),
188+
e.getMessage()
189+
);
190+
}
191+
192+
@Test
193+
void testMissingTimeColumnRejected()
194+
{
195+
final DruidException e = Assertions.assertThrows(
196+
DruidException.class,
197+
() -> ClusteredValueGroupsBaseTableProjectionSpec.builder()
198+
.columns(new StringDimensionSchema("tenant"), new StringDimensionSchema("region"))
199+
.clusteringColumns("tenant")
200+
.build()
201+
);
202+
Assertions.assertTrue(e.getMessage().contains("__time"), e.getMessage());
203+
}
204+
148205
@Test
149206
void testTwoColumnClusteringDistinguishesTuples()
150207
{

0 commit comments

Comments
 (0)