2525import org .apache .druid .error .DruidException ;
2626import org .apache .druid .java .util .common .parsers .ParseException ;
2727import org .apache .druid .query .OrderBy ;
28- import org .apache .druid .query .aggregation .Aggregator ;
29- import org .apache .druid .query .aggregation .AggregatorAndSize ;
30- import org .apache .druid .query .aggregation .AggregatorFactory ;
3128import org .apache .druid .segment .ColumnSelectorFactory ;
3229import org .apache .druid .segment .DimensionIndexer ;
3330import org .apache .druid .segment .EncodedKeyComponent ;
3835import org .apache .druid .segment .column .ColumnFormat ;
3936import org .apache .druid .segment .column .ColumnHolder ;
4037import org .apache .druid .segment .column .ColumnType ;
41- import org .apache .druid .segment .column .ValueType ;
4238
4339import javax .annotation .Nullable ;
4440import java .util .ArrayList ;
4541import java .util .Collections ;
4642import java .util .LinkedHashMap ;
4743import java .util .List ;
4844import java .util .Map ;
49- import java .util .concurrent .ConcurrentHashMap ;
5045import java .util .concurrent .atomic .AtomicInteger ;
5146import java .util .concurrent .atomic .AtomicLong ;
5247
6156 * what gives the persisted segment the "share nothing across groups" property — at persist time each group's
6257 * dictionaries are written under its per-group file-bundle prefix.
6358 * <p>
64- * Aggregator state is also per-group: rollup (when enabled) deduplicates within a group's facts holder, never
65- * across groups, since two rows with different clustering tuples land in different groups by construction.
59+ * Clustered base tables are never rollup and have no metric columns. When rollup is enabled, the facts holder
60+ * deduplicates rows within a group (never across groups, since two rows with different clustering tuples land in
61+ * different groups by construction); with no aggregators a rollup hit is simply a no-op dedup.
6662 */
6763public final class OnHeapClusterGroup implements IncrementalIndexRowSelector
6864{
@@ -72,24 +68,19 @@ public final class OnHeapClusterGroup implements IncrementalIndexRowSelector
7268
7369 private final List <IncrementalIndex .DimensionDesc > dimensions ;
7470 private final Map <String , IncrementalIndex .DimensionDesc > dimensionsMap ;
75- private final Map <String , IncrementalIndex .MetricDesc > aggregatorsMap ;
7671 private final Map <String , ColumnFormat > columnFormats ;
77- private final AggregatorFactory [] aggregatorFactories ;
7872 private final FactsHolder factsHolder ;
79- private final ConcurrentHashMap <Integer , Aggregator []> aggregators = new ConcurrentHashMap <>();
8073 private final AtomicInteger rowCounter = new AtomicInteger (0 );
8174 private final AtomicInteger numEntries = new AtomicInteger (0 );
8275 private final int groupTimePosition ;
8376
8477 private final ColumnSelectorFactory virtualSelectorFactory ;
85- private final Map <String , ColumnSelectorFactory > aggSelectors ;
8678
8779 OnHeapClusterGroup (
8880 OnHeapClusteredBaseTable parent ,
8981 Object [] clusteringValues ,
9082 List <Integer > clusteringValueIds ,
9183 List <DimensionSchema > nonClusteringDimensions ,
92- AggregatorFactory [] aggregatorFactories ,
9384 VirtualColumns virtualColumns ,
9485 IncrementalIndex .InputRowHolder inputRowHolder ,
9586 boolean rollup ,
@@ -99,7 +90,6 @@ public final class OnHeapClusterGroup implements IncrementalIndexRowSelector
9990 this .parent = parent ;
10091 this .clusteringValues = clusteringValues ;
10192 this .clusteringValueIds = Collections .unmodifiableList (new ArrayList <>(clusteringValueIds ));
102- this .aggregatorFactories = aggregatorFactories ;
10393
10494 this .dimensions = new ArrayList <>(nonClusteringDimensions .size ());
10595 this .dimensionsMap = new LinkedHashMap <>();
@@ -127,9 +117,6 @@ public final class OnHeapClusterGroup implements IncrementalIndexRowSelector
127117 this .virtualSelectorFactory = new OnheapIncrementalIndex .CachingColumnSelectorFactory (
128118 IncrementalIndex .makeColumnSelectorFactory (virtualColumns , inputRowHolder , null )
129119 );
130- this .aggregatorsMap = new LinkedHashMap <>();
131- this .aggSelectors = new LinkedHashMap <>();
132- initializeAggregators (inputRowHolder );
133120 }
134121
135122 /**
@@ -179,7 +166,7 @@ public List<String> getDimensionNames(boolean includeTime)
179166 @ Override
180167 public List <String > getMetricNames ()
181168 {
182- return ImmutableList . copyOf ( aggregatorsMap . keySet () );
169+ return List . of ( );
183170 }
184171
185172 @ Override
@@ -193,7 +180,7 @@ public IncrementalIndex.DimensionDesc getDimension(String columnName)
193180 @ Nullable
194181 public IncrementalIndex .MetricDesc getMetric (String columnName )
195182 {
196- return aggregatorsMap . get ( columnName ) ;
183+ return null ;
197184 }
198185
199186 @ Override
@@ -255,32 +242,32 @@ public int getLastRowIndex()
255242 @ Override
256243 public float getMetricFloatValue (int rowOffset , int aggOffset )
257244 {
258- return aggregators . get ( rowOffset )[ aggOffset ]. getFloat ( );
245+ throw DruidException . defensive ( "clustered base table groups have no metrics" );
259246 }
260247
261248 @ Override
262249 public long getMetricLongValue (int rowOffset , int aggOffset )
263250 {
264- return aggregators . get ( rowOffset )[ aggOffset ]. getLong ( );
251+ throw DruidException . defensive ( "clustered base table groups have no metrics" );
265252 }
266253
267254 @ Override
268255 public double getMetricDoubleValue (int rowOffset , int aggOffset )
269256 {
270- return aggregators . get ( rowOffset )[ aggOffset ]. getDouble ( );
257+ throw DruidException . defensive ( "clustered base table groups have no metrics" );
271258 }
272259
273260 @ Override
274261 @ Nullable
275262 public Object getMetricObjectValue (int rowOffset , int aggOffset )
276263 {
277- return aggregators . get ( rowOffset )[ aggOffset ]. get ( );
264+ throw DruidException . defensive ( "clustered base table groups have no metrics" );
278265 }
279266
280267 @ Override
281268 public boolean isNull (int rowOffset , int aggOffset )
282269 {
283- return aggregators . get ( rowOffset )[ aggOffset ]. isNull ( );
270+ throw DruidException . defensive ( "clustered base table groups have no metrics" );
284271 }
285272
286273 @ Override
@@ -294,18 +281,9 @@ public ColumnCapabilities getColumnCapabilities(String column)
294281 if (dim != null ) {
295282 return dim .getCapabilities ();
296283 }
297- final IncrementalIndex .MetricDesc metric = aggregatorsMap .get (column );
298- if (metric != null ) {
299- return metric .getCapabilities ();
300- }
301284 return null ;
302285 }
303286
304- public AggregatorFactory [] getAggregatorFactories ()
305- {
306- return aggregatorFactories ;
307- }
308-
309287 /**
310288 * Add a row's non-clustering content to this group's facts holder. Clustering values aren't re-processed here —
311289 * they've already been resolved + encoded by the parent {@link OnHeapClusteredBaseTable} as part of selecting
@@ -344,39 +322,18 @@ boolean addToFacts(
344322 );
345323
346324 final int priorIndex = factsHolder .getPriorIndex (subKey );
347- final Aggregator [] aggs ;
348325 if (IncrementalIndexRow .EMPTY_ROW_INDEX != priorIndex ) {
349- aggs = aggregators .get (priorIndex );
350- final long aggSizeDelta = OnheapIncrementalIndex .doAggregate (
351- aggregatorFactories ,
352- aggs ,
353- parent .getInputRowHolder (),
354- parseExceptionMessages ,
355- false
356- );
357- totalSizeInBytes .addAndGet (aggSizeDelta );
326+ // with no metric columns there is nothing to aggregate
358327 return false ;
359328 } else {
360- aggs = new Aggregator [aggregatorFactories .length ];
361- long aggSizeForRow = factorizeAggs (aggs );
362- aggSizeForRow += OnheapIncrementalIndex .doAggregate (
363- aggregatorFactories ,
364- aggs ,
365- parent .getInputRowHolder (),
366- parseExceptionMessages ,
367- false
368- );
369329 final int rowIndex = rowCounter .getAndIncrement ();
370- aggregators .put (rowIndex , aggs );
371330 final int prev = factsHolder .putIfAbsent (subKey , rowIndex );
372331 if (IncrementalIndexRow .EMPTY_ROW_INDEX == prev ) {
373332 numEntries .incrementAndGet ();
374333 } else {
375334 throw DruidException .defensive ("Encountered existing fact entry for new key in cluster group" );
376335 }
377- final long rowSize = subKey .estimateBytesInMemory ()
378- + aggSizeForRow
379- + OnheapIncrementalIndex .ROUGH_OVERHEAD_PER_MAP_ENTRY ;
336+ final long rowSize = subKey .estimateBytesInMemory () + OnheapIncrementalIndex .ROUGH_OVERHEAD_PER_MAP_ENTRY ;
380337 totalSizeInBytes .addAndGet (rowSize );
381338 return true ;
382339 }
@@ -400,39 +357,4 @@ private void initializeDimensions(List<DimensionSchema> nonClusteringDimensions)
400357 columnFormats .put (schema .getName (), desc .getIndexer ().getFormat ());
401358 }
402359 }
403-
404- private void initializeAggregators (IncrementalIndex .InputRowHolder inputRowHolder )
405- {
406- for (AggregatorFactory agg : aggregatorFactories ) {
407- final IncrementalIndex .MetricDesc metricDesc = new IncrementalIndex .MetricDesc (aggregatorsMap .size (), agg );
408- aggregatorsMap .put (metricDesc .getName (), metricDesc );
409- columnFormats .put (
410- metricDesc .getName (),
411- new CapabilitiesBasedFormat (metricDesc .getCapabilities ())
412- );
413- final ColumnSelectorFactory factory ;
414- if (agg .getIntermediateType ().is (ValueType .COMPLEX )) {
415- factory = new OnheapIncrementalIndex .CachingColumnSelectorFactory (
416- IncrementalIndex .makeColumnSelectorFactory (VirtualColumns .EMPTY , inputRowHolder , agg )
417- );
418- } else {
419- factory = virtualSelectorFactory ;
420- }
421- aggSelectors .put (agg .getName (), factory );
422- }
423- }
424-
425- private long factorizeAggs (Aggregator [] aggs )
426- {
427- long totalInitialSizeBytes = 0L ;
428- final long aggReferenceSize = Long .BYTES ;
429- for (int i = 0 ; i < aggregatorFactories .length ; i ++) {
430- final AggregatorFactory agg = aggregatorFactories [i ];
431- final AggregatorAndSize aggregatorAndSize = agg .factorizeWithSize (aggSelectors .get (agg .getName ()));
432- aggs [i ] = aggregatorAndSize .getAggregator ();
433- totalInitialSizeBytes += aggregatorAndSize .getInitialSizeBytes ();
434- totalInitialSizeBytes += aggReferenceSize ;
435- }
436- return totalInitialSizeBytes ;
437- }
438360}
0 commit comments