fix: Enable partition-aware grouped execution for index joins (#27701)#27701
fix: Enable partition-aware grouped execution for index joins (#27701)#27701zacw7 wants to merge 2 commits intoprestodb:masterfrom
Conversation
Summary: ## Problem In standard grouped execution, Presto schedules one lifespan per bucket. Each lifespan processes **all** Hive partitions for that bucket. For a query like: ``` SELECT * FROM t1 JOIN t2 ON t1.col = t2.col AND t1.ds = t2.ds ``` where both tables are bucketed on `col` and partitioned by `ds`, each bucket's hash table contains rows from every `ds` value. This wastes memory and requires evaluating partition columns in the join condition. ## Solution This diff introduces **partition-aware grouped execution**, which schedules at `(bucket, partition)` granularity instead of just bucket granularity. Each lifespan processes one bucket for one set of partition values, reducing per-lifespan hash table size. The feature is controlled by session property `partition_aware_grouped_execution` (default `false`) and works for joins, aggregations, and window functions on bucketed+partitioned tables. ## Design ### Planner (GroupedExecutionTagger + PlanFragmenterUtils) `GroupedExecutionTagger` tracks usable partition columns through the plan tree using a union-find data structure. For each equi-join clause where both sides have partition columns (`t1.ds = t2.ds`), the union-find links the corresponding `TableScanColumn` entries, establishing equivalence classes. This handles cross-table column name differences (e.g., `t1.ds = t2.ts`) by grouping them into equivalence classes. `PlanFragmenterUtils.computePartitionValues()` resolves each equivalence class to a **canonical column name** (alphabetically first), builds per-scan column mappings (`actual -> canonical`), and extracts distinct partition value combinations from `DiscretePredicates`. The result is stored in `StageExecutionDescriptor` and flows to the scheduler and connector. ### SPI Extensions - `StageExecutionDescriptor`: carries `groupedExecutionPartitionValues` and `partitionColumnMappings` - `ConnectorNodePartitioningProvider.listPartitionHandles(...)`: new overload accepting `List<Map<String, String>> partitionValues` - `SplitSchedulingContext`: carries `partitionColumnMapping` (actual -> canonical) per scan node ### Hive Connector - `CompoundPartitionHandle`: wraps a bucket handle + partition values map - `HiveNodePartitioningProvider`: creates compound handles with **interleaved ordering** -- `[b0/p0, b1/p0, ..., bN/p0, b0/p1, ...]` -- so `physicalBucket = lifespanId % bucketCount` - `HiveSplitSource.bucketedPartitionAware()`: per-`(bucket, partition)` queues using composite string keys. Splits are routed to the correct queue at `addToQueue` time using column name mapping; `getNextBatch` uses the same key from `CompoundPartitionHandle`. No filtering needed -- each queue contains exactly the splits for that `(bucket, partition)` pair ### Schedulers `FixedLifespanScheduler`, `DynamicLifespanScheduler`, and `FixedSourcePartitionedScheduler` derive the physical bucket from compound lifespan IDs via `lifespanId % bucketCount`. `EmptySplit` (used for non-grouped scan nodes in grouped stages) uses the same formula in `NodePartitioningManager`. ## Key Design Decisions 1. **Connector-controlled handles**: The engine passes partition values to the connector, which creates opaque compound handles and matching split queues. The engine never constructs or inspects `CompoundPartitionHandle` -- it treats handles as opaque, following the existing bucket pattern. 2. **Interleaved handle ordering**: `[b0/p0, b1/p0, ..., bN/p0, b0/p1, ...]` instead of `[b0/p0, b0/p1, ..., b0/pN, b1/p0, ...]`. This gives `physicalBucket = lifespanId % bucketCount` (simple modulo), provides better parallelism (first batch of concurrent lifespans touches all buckets), and simplifies `EmptySplit` bucket derivation. 3. **Canonical column names**: When two tables use different names for the same partition concept (`t1.ds = t2.ts`), the engine picks a canonical name per equivalence class (alphabetically first: `"ds"`). Per-scan mappings translate actual names to canonical names, ensuring queue keys match across tables. 4. **Union semantics for partition values**: Partition values from all scans are combined using union (not intersection). This is correct for `LEFT`/`RIGHT`/`FULL OUTER` joins where one side may have partitions the other lacks -- `EmptySplit` ensures drivers are created for unmatched partitions, producing NULL-extended rows. Differential Revision: D100408549
There was a problem hiding this comment.
Sorry @zacw7, your pull request is larger than the review limit of 150000 diff characters
…db#27701) Summary: Pull Request resolved: prestodb#27701 Partition-aware grouped execution schedules lifespans at (bucket, partition) granularity instead of bucket-only, so each lifespan processes one partition. This was implemented for TableScanNode and hash/merge joins but not for IndexSourceNode and IndexJoinNode, causing index joins to receive all partitions in a single lifespan. This change extends partition-aware grouped execution to index joins: GroupedExecutionTagger: - Extract getSourceNodeProperties() shared by visitTableScan and visitIndexSource to collect partition columns from DiscretePredicates - visitIndexJoin: merge partition columns from both sides via mergeJoinSides when partition-aware is enabled, matching visitJoin behavior - visitProject: preserve child partition columns that participate in a joined equivalence class even when not projected in output. Without this, partition columns consumed by an IndexJoinNode below the ProjectNode are dropped before reaching analyzeGroupedExecution PlanFragmenterUtils: - Add findSourceNodeTableHandle() that resolves both TableScanNode and IndexSourceNode by ID, replacing findTableScanNode() which only matched TableScanNode. Without this, computePartitionValues() aborted when encountering an IndexSourceNode ID in the table scan scheduling order SplitSourceFactory: - visitIndexSource: pass partitionColumnMapping from StageExecutionDescriptor to getSplits, matching visitTableScan. Without this, HiveSplitManager created a PerBucket split source instead of PerBucketPartitionAware for the index table Differential Revision: D103429655
steveburnett
left a comment
There was a problem hiding this comment.
Thank you for the documentation! Looks good. Please add links as commented but everything you have looks great.
| in lifespans (one per bucket), reducing memory usage by processing a subset of data at a time. | ||
| This is required for queries on bucketed tables that use join, aggregation, or window functions | ||
| with compatible bucket layouts. | ||
|
|
There was a problem hiding this comment.
Please add a sentence mentioning the corresponding configuration property. Here's an example from earlier in this file showing the formatting for the link.
The corresponding configuration property is :ref:`admin/properties:\`\`materialized-view-stale-read-behavior\`\``.
| * At least 2 distinct partition values exist (single partition provides no benefit) | ||
|
|
||
| When partition-aware execution is not applicable (e.g., non-partitioned tables, no partition | ||
| columns in join conditions), the query falls back to standard grouped execution automatically. |
There was a problem hiding this comment.
Please add a sentence mentioning the corresponding configuration property. Here's an example from earlier in this file showing the formatting for the link.
The corresponding configuration property is :ref:`admin/properties:\`\`materialized-view-stale-read-behavior\`\``.
| operations in lifespans (one per bucket), processing a subset of data at a time to | ||
| reduce memory usage. | ||
|
|
||
| The corresponding session property is ``grouped_execution``. |
There was a problem hiding this comment.
Please link to the corresponding session property. Here's an example.
The corresponding session property is :ref:`admin/properties-session:\`\`materialized_view_stale_read_behavior\`\``.
| least 2 distinct partition values exist. When not applicable, queries fall back to standard | ||
| grouped execution automatically. | ||
|
|
||
| The corresponding session property is ``partition_aware_grouped_execution``. |
There was a problem hiding this comment.
Please link to the corresponding session property. Here's an example.
The corresponding session property is :ref:`admin/properties-session:\`\`materialized_view_stale_read_behavior\`\``.
jja725
left a comment
There was a problem hiding this comment.
can we add some test for index join?
Summary:
Partition-aware grouped execution schedules lifespans at (bucket, partition)
granularity instead of bucket-only, so each lifespan processes one partition. This was
implemented for TableScanNode and hash/merge joins but not for IndexSourceNode and
IndexJoinNode, causing index joins to receive all partitions in a single lifespan.
This change extends partition-aware grouped execution to index joins:
GroupedExecutionTagger:
collect partition columns from DiscretePredicates
partition-aware is enabled, matching visitJoin behavior
equivalence class even when not projected in output. Without this, partition columns
consumed by an IndexJoinNode below the ProjectNode are dropped before reaching
analyzeGroupedExecution
PlanFragmenterUtils:
by ID, replacing findTableScanNode() which only matched TableScanNode. Without this,
computePartitionValues() aborted when encountering an IndexSourceNode ID in the table
scan scheduling order
SplitSourceFactory:
getSplits, matching visitTableScan. Without this, HiveSplitManager created a PerBucket
split source instead of PerBucketPartitionAware for the index table
Differential Revision: D103429655