feat(optimizer): Push HAVING filter through MAX/MIN/ARBITRARY aggregations#27712
feat(optimizer): Push HAVING filter through MAX/MIN/ARBITRARY aggregations#27712kaikalur wants to merge 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideAdds a new iterative optimizer rule that pushes certain HAVING filters on MAX/MIN/ARBITRARY aggregates below the aggregation, wires it into the optimizer guarded by a new session property, and introduces unit, plan, and end-to-end tests plus documentation for the feature. Sequence diagram for applying PushFilterThroughSelectingAggregationsequenceDiagram
participant Sess as Session
participant Planner as PlanOptimizers
participant IterOpt as IterativeOptimizer_PushFilterThroughSelectingAggregation
participant Rule as PushFilterThroughSelectingAggregation
participant Lookup as Lookup
Sess->>Planner: create query session
Planner->>IterOpt: optimize(plan)
loop over FilterNode candidates
IterOpt->>Rule: getPattern()
IterOpt->>Rule: isEnabled(Sess)
Rule-->>IterOpt: boolean enabled
alt enabled
IterOpt->>Rule: apply(FilterNode, Captures, Context)
Rule->>Lookup: resolve(filterNode.source)
Lookup-->>Rule: ProjectNode or AggregationNode
Rule->>Lookup: walk through ProjectNode chain
Lookup-->>Rule: AggregationNode
Rule-->>IterOpt: Result(newPlan or empty)
else disabled
IterOpt-->>IterOpt: skip rule
end
end
IterOpt-->>Planner: optimized plan with pushed filters
Class diagram for PushFilterThroughSelectingAggregation ruleclassDiagram
class Rule_FilterNode {
<<interface>>
}
class PushFilterThroughSelectingAggregation {
- Metadata metadata
- FunctionResolution functionResolution
- RowExpressionDeterminismEvaluator determinismEvaluator
+ PushFilterThroughSelectingAggregation(Metadata metadata)
+ Pattern~FilterNode~ getPattern()
+ boolean isEnabled(Session session)
+ Result apply(FilterNode filterNode, Captures captures, Context context)
- Optional~EligibleAggregate~ extractEligibleAggregate(AggregationNode aggregationNode)
- Optional~PushdownResult~ tryPushdown(RowExpression conjunct, EligibleAggregate eligible, Set~VariableReferenceExpression~ sourceOutputs)
- static boolean isAnyComparison(OperatorType op)
- static boolean isEvaluableBelow(RowExpression expression, Set~VariableReferenceExpression~ sourceOutputs)
- boolean isArbitraryFunction(FunctionHandle handle)
- static OperatorType flip(OperatorType op)
}
class EligibleAggregate {
- VariableReferenceExpression aggOutput
- VariableReferenceExpression argVar
- AggKind kind
+ EligibleAggregate(VariableReferenceExpression aggOutput, VariableReferenceExpression argVar, AggKind kind)
}
class PushdownResult {
- RowExpression pushed
- boolean keepOriginal
+ PushdownResult(RowExpression pushed, boolean keepOriginal)
}
class AggKind {
<<enum>>
MAX
MIN
ARBITRARY
}
class Metadata {
}
class FunctionResolution {
}
class RowExpressionDeterminismEvaluator {
}
class FilterNode {
}
class AggregationNode {
}
class ProjectNode {
}
class PlanNode {
}
class Session {
}
class OperatorType {
}
class FunctionHandle {
}
class RowExpression {
}
class VariableReferenceExpression {
}
class Pattern_FilterNode {
}
class Captures {
}
class Context {
+ Lookup getLookup()
+ IdAllocator getIdAllocator()
}
class Result {
}
class Lookup {
+ PlanNode resolve(PlanNode node)
}
class IdAllocator {
+ PlanNodeId getNextId()
}
PushFilterThroughSelectingAggregation ..|> Rule_FilterNode
PushFilterThroughSelectingAggregation --> Metadata
PushFilterThroughSelectingAggregation --> FunctionResolution
PushFilterThroughSelectingAggregation --> RowExpressionDeterminismEvaluator
PushFilterThroughSelectingAggregation --> EligibleAggregate
PushFilterThroughSelectingAggregation --> PushdownResult
PushFilterThroughSelectingAggregation --> AggKind
PushFilterThroughSelectingAggregation --> FilterNode
PushFilterThroughSelectingAggregation --> AggregationNode
PushFilterThroughSelectingAggregation --> ProjectNode
PushFilterThroughSelectingAggregation --> PlanNode
PushFilterThroughSelectingAggregation --> Session
PushFilterThroughSelectingAggregation --> OperatorType
PushFilterThroughSelectingAggregation --> FunctionHandle
PushFilterThroughSelectingAggregation --> RowExpression
PushFilterThroughSelectingAggregation --> VariableReferenceExpression
PushFilterThroughSelectingAggregation --> Pattern_FilterNode
PushFilterThroughSelectingAggregation --> Captures
PushFilterThroughSelectingAggregation --> Context
PushFilterThroughSelectingAggregation --> Result
Context --> Lookup
Context --> IdAllocator
Lookup --> PlanNode
IdAllocator --> PlanNode
EligibleAggregate --> VariableReferenceExpression
EligibleAggregate --> AggKind
PushdownResult --> RowExpression
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In
PushFilterThroughSelectingAggregation.flip, the default branch returns the original operator which silently treats unsupported operator types as self-inverse; consider either explicitly handling all supported comparison operators or throwing for unexpected values to avoid masking planner bugs. - The plan tests in
TestPushFilterThroughSelectingAggregationPlansrely ontoString().contains("totalprice"/"extendedprice")to detect predicates, which is brittle; usingPlanMatchPatternor inspecting the row expressions’ structure/types would make these assertions more robust to formatting changes. isArbitraryFunctioncurrently compares the function name string against"arbitrary"/"any_value"; if possible, prefer usingFunctionResolutionhelpers or exactQualifiedObjectNamematching (including catalog/schema) to avoid misidentifying functions with the same short name from other namespaces.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `PushFilterThroughSelectingAggregation.flip`, the default branch returns the original operator which silently treats unsupported operator types as self-inverse; consider either explicitly handling all supported comparison operators or throwing for unexpected values to avoid masking planner bugs.
- The plan tests in `TestPushFilterThroughSelectingAggregationPlans` rely on `toString().contains("totalprice"/"extendedprice")` to detect predicates, which is brittle; using `PlanMatchPattern` or inspecting the row expressions’ structure/types would make these assertions more robust to formatting changes.
- `isArbitraryFunction` currently compares the function name string against `"arbitrary"`/`"any_value"`; if possible, prefer using `FunctionResolution` helpers or exact `QualifiedObjectName` matching (including catalog/schema) to avoid misidentifying functions with the same short name from other namespaces.
## Individual Comments
### Comment 1
<location path="presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushFilterThroughSelectingAggregation.java" line_range="397-414" />
<code_context>
+ return name.getObjectName().equalsIgnoreCase("arbitrary") || name.getObjectName().equalsIgnoreCase("any_value");
+ }
+
+ private static OperatorType flip(OperatorType op)
+ {
+ switch (op) {
+ case LESS_THAN:
+ return OperatorType.GREATER_THAN;
+ case LESS_THAN_OR_EQUAL:
+ return OperatorType.GREATER_THAN_OR_EQUAL;
+ case GREATER_THAN:
+ return OperatorType.LESS_THAN;
+ case GREATER_THAN_OR_EQUAL:
+ return OperatorType.LESS_THAN_OR_EQUAL;
+ case EQUAL:
+ case NOT_EQUAL:
+ return op;
+ default:
+ return op;
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Defaulting to returning the original operator in flip() can hide unsupported operators
In `flip`, the default branch returns the original operator for any unsupported value. If a new comparison operator is added later and used here, the expression may be left unflipped when the aggregate is on the RHS, causing incorrect rewrites without any visible failure.
Instead of returning `op` in the default branch, consider failing fast for unsupported operators (e.g., throwing, or returning an Optional and aborting in `tryPushdown`).
```suggestion
private static OperatorType flip(OperatorType op)
{
switch (op) {
case LESS_THAN:
return OperatorType.GREATER_THAN;
case LESS_THAN_OR_EQUAL:
return OperatorType.GREATER_THAN_OR_EQUAL;
case GREATER_THAN:
return OperatorType.LESS_THAN;
case GREATER_THAN_OR_EQUAL:
return OperatorType.LESS_THAN_OR_EQUAL;
case EQUAL:
case NOT_EQUAL:
return op;
default:
throw new IllegalArgumentException("Unsupported operator for flip: " + op);
}
}
```
</issue_to_address>
### Comment 2
<location path="presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java" line_range="8621-8624" />
<code_context>
+public class TestPushFilterThroughSelectingAggregation
+ extends BaseRuleTest
+{
+ @Test
+ public void testPushesMaxGreaterThanOrEqual()
+ {
</code_context>
<issue_to_address>
**suggestion (testing):** Add end-to-end coverage for ARBITRARY aggregates and additional comparison operators
These tests cover MAX/MIN and equality-based HAVING cases, but they don’t exercise the ARBITRARY path or non-equality comparisons (e.g., <, >, <>, !=). Please add at least one E2E query using HAVING arbitrary(col) <> value (and ideally another with > or <) and verify behavior with the optimization enabled/disabled to cover the ARBITRARY-specific branch of the rule.
```suggestion
// MAX with >= : pushdown safe
assertQueryWithSameQueryRunner(enabled,
"SELECT orderstatus, max(totalprice) FROM orders GROUP BY orderstatus HAVING max(totalprice) >= 100000",
disabled);
// ARBITRARY with <> : pushdown safe, exercises ARBITRARY-specific path with non-equality comparison
assertQueryWithSameQueryRunner(enabled,
"SELECT orderstatus, arbitrary(totalprice) FROM orders GROUP BY orderstatus HAVING arbitrary(totalprice) <> 0.0",
disabled);
// ARBITRARY with > : pushdown safe, exercises another non-equality comparison operator
assertQueryWithSameQueryRunner(enabled,
"SELECT orderstatus, arbitrary(totalprice) FROM orders GROUP BY orderstatus HAVING arbitrary(totalprice) > 0.0",
disabled);
```
</issue_to_address>
### Comment 3
<location path="presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestPushFilterThroughSelectingAggregation.java" line_range="36-45" />
<code_context>
+ values("x", "g"))));
+ }
+
+ @Test
+ public void testPushesArbitraryEquals()
+ {
+ tester().assertThat(new PushFilterThroughSelectingAggregation(tester().getMetadata()))
</code_context>
<issue_to_address>
**suggestion (testing):** Extend rule-level tests to cover ARBITRARY with non-equality comparisons and flipped operands
The ARBITRARY branch in PushFilterThroughSelectingAggregation accepts any comparison operator and normalizes cases where the aggregate appears on the right. Currently only equality with the aggregate on the left is tested. Please add: (1) a test using a non-equality predicate like `arb_x <> BIGINT '5'`, and (2) a test with flipped operands like `BIGINT '5' <= arb_x` to cover generic comparison handling and normalization for ARBITRARY.
Suggested implementation:
```java
@Test
public void testPushesArbitraryNotEquals()
{
tester().assertThat(new PushFilterThroughSelectingAggregation(tester().getMetadata()))
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true")
.on(p -> {
VariableReferenceExpression x = p.variable("x", BIGINT);
VariableReferenceExpression g = p.variable("g", BIGINT);
VariableReferenceExpression arbX = p.variable("arb_x", BIGINT);
return p.filter(
p.rowExpression("arb_x <> BIGINT '5'"),
p.aggregation(ab -> ab
.source(p.values(x, g))
.addAggregation(arbX, p.rowExpression("arbitrary(x)"))
.singleGroupingSet(g)));
})
.matches(
aggregation(
singleGroupingSet("g"),
ImmutableMap.of(
"arb_x", functionCall("arbitrary", ImmutableList.of("x"))),
filter("x <> BIGINT '5'", values("x", "g"))));
}
@Test
public void testPushesArbitraryFlippedLessThanOrEqual()
{
tester().assertThat(new PushFilterThroughSelectingAggregation(tester().getMetadata()))
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true")
.on(p -> {
VariableReferenceExpression x = p.variable("x", BIGINT);
VariableReferenceExpression g = p.variable("g", BIGINT);
VariableReferenceExpression arbX = p.variable("arb_x", BIGINT);
return p.filter(
p.rowExpression("BIGINT '5' <= arb_x"),
p.aggregation(ab -> ab
.source(p.values(x, g))
.addAggregation(arbX, p.rowExpression("arbitrary(x)"))
.singleGroupingSet(g)));
})
.matches(
aggregation(
singleGroupingSet("g"),
ImmutableMap.of(
"arb_x", functionCall("arbitrary", ImmutableList.of("x"))),
filter("BIGINT '5' <= x", values("x", "g"))));
}
}
```
1. Ensure the following imports are present at the top of the file (they are typically already used elsewhere in this test class, but add them if missing):
- `import com.facebook.presto.spi.relation.VariableReferenceExpression;`
- `import com.facebook.presto.spi.type.BigintType;` or the static `BIGINT` import used elsewhere.
- `import com.facebook.presto.sql.planner.iterative.rule.PushFilterThroughSelectingAggregation;`
- `import static com.facebook.presto.SystemSessionProperties.PUSH_FILTER_THROUGH_SELECTING_AGGREGATION;`
- `import com.google.common.collect.ImmutableList;`
- `import com.google.common.collect.ImmutableMap;`
2. If the existing `testPushesArbitraryEquals` test uses a different variable name for the arbitrary aggregation output than `arb_x`, align these new tests to use the same name for consistency (update both the `VariableReferenceExpression` name and its use in the row expressions and match patterns).
3. If the existing tests use a different style for building `ImmutableMap` or `ImmutableList` (e.g., `ImmutableMap.builder()`), adjust the `matches(...)` expressions accordingly to match the existing convention.
</issue_to_address>
### Comment 4
<location path="presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestPushFilterThroughSelectingAggregationPlans.java" line_range="50-59" />
<code_context>
+public class TestPushFilterThroughSelectingAggregation
+ extends BaseRuleTest
+{
+ @Test
+ public void testPushesMaxGreaterThanOrEqual()
+ {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a distributed plan test for MIN or ARBITRARY to complement the MAX-focused scenarios
The existing distributed plan tests only cover MAX-based rewrites. Please add at least one distributed plan test for MIN (e.g., a MIN-based HAVING with a join chain to validate <=/< pushdown to the correct scan) and one for ARBITRARY (to confirm the filter is pushed through joins correctly), so that distributed planning is exercised consistently for all supported aggregate kinds.
Suggested implementation:
```java
@Test
public void testDistributedMinHavingPushdown()
{
// MIN-based HAVING with a join chain to validate <=/< pushdown to the correct scan
assertDistributedPlan(
enabled(),
"SELECT o.orderkey, MIN(l.quantity) AS min_qty " +
"FROM orders o " +
"JOIN lineitem l ON o.orderkey = l.orderkey " +
"JOIN customer c ON o.custkey = c.custkey " +
"GROUP BY o.orderkey " +
"HAVING MIN(l.quantity) <= 5",
plan -> {
// Expect the MIN-based HAVING predicate to be pushed below the selecting aggregation
// and into the appropriate leaf scan in the distributed plan
assertPlan(
plan,
anyTree(
aggregation(
anyTree(
filter(
// The filter should reference the same symbol used
// for the MIN aggregation argument (l.quantity)
"quantity <= 5",
join(
anyTree(tableScan("orders")),
anyTree(tableScan("lineitem")),
anyTree(tableScan("customer"))))))));
});
}
@Test
public void testDistributedArbitraryHavingPushdown()
{
// ARBITRARY-based HAVING to confirm filter is pushed through joins correctly
assertDistributedPlan(
enabled(),
"SELECT o.orderkey, ARBITRARY(l.comment) AS any_comment " +
"FROM orders o " +
"JOIN lineitem l ON o.orderkey = l.orderkey " +
"JOIN customer c ON o.custkey = c.custkey " +
"GROUP BY o.orderkey " +
"HAVING ARBITRARY(l.comment) IS NOT NULL",
plan -> {
// The predicate on ARBITRARY's argument should be pushed through the selecting
// aggregation and applied on the correct leaf side of the join chain
assertPlan(
plan,
anyTree(
aggregation(
anyTree(
filter(
"comment IS NOT NULL",
join(
anyTree(tableScan("orders")),
anyTree(tableScan("lineitem")),
anyTree(tableScan("customer"))))))));
});
}
private Session disabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "false")
.build();
}
private Session enabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true")
.build();
}
```
1. Align helper usage: The example above assumes `assertDistributedPlan`, `assertPlan`, `anyTree`, `aggregation`, `filter`, `join`, and `tableScan` helpers are already used elsewhere in this test class. If the existing distributed MAX tests use different helpers or patterns (e.g., `assertDistributedPlan("sql", enabled(), plan -> {...})` with a different argument order or different matchers), mirror that style instead of the one shown.
2. Adjust SQL and table names: This snippet uses TPCH-style tables (`orders`, `lineitem`, `customer`). If this test class uses different schemas or fully qualified names (e.g., `SELECT ... FROM lineitem` without joins, or `tpch.tiny.orders`), change the queries accordingly to match the existing MAX distributed tests and the catalog in `BaseRuleTest`'s `getQueryRunner`.
3. Tweak predicates and symbols: Update `"quantity <= 5"` and `"comment IS NOT NULL"` to match the actual symbol names used in your existing plan matchers (for example, they might be aliased as `Q` or `c_comment`). Ensure the HAVING predicates and the filter matchers refer to the same expressions so that the matcher accurately verifies pushdown to the correct scan.
4. Place the tests consistently: If the current class groups all distributed-plan tests together (e.g., near the existing MAX distributed tests), move these new test methods next to them, keeping the `disabled()` / `enabled()` session helpers at the bottom of the class as in the existing structure.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| private static OperatorType flip(OperatorType op) | ||
| { | ||
| switch (op) { | ||
| case LESS_THAN: | ||
| return OperatorType.GREATER_THAN; | ||
| case LESS_THAN_OR_EQUAL: | ||
| return OperatorType.GREATER_THAN_OR_EQUAL; | ||
| case GREATER_THAN: | ||
| return OperatorType.LESS_THAN; | ||
| case GREATER_THAN_OR_EQUAL: | ||
| return OperatorType.LESS_THAN_OR_EQUAL; | ||
| case EQUAL: | ||
| case NOT_EQUAL: | ||
| return op; | ||
| default: | ||
| return op; | ||
| } | ||
| } |
There was a problem hiding this comment.
suggestion (bug_risk): Defaulting to returning the original operator in flip() can hide unsupported operators
In flip, the default branch returns the original operator for any unsupported value. If a new comparison operator is added later and used here, the expression may be left unflipped when the aggregate is on the RHS, causing incorrect rewrites without any visible failure.
Instead of returning op in the default branch, consider failing fast for unsupported operators (e.g., throwing, or returning an Optional and aborting in tryPushdown).
| private static OperatorType flip(OperatorType op) | |
| { | |
| switch (op) { | |
| case LESS_THAN: | |
| return OperatorType.GREATER_THAN; | |
| case LESS_THAN_OR_EQUAL: | |
| return OperatorType.GREATER_THAN_OR_EQUAL; | |
| case GREATER_THAN: | |
| return OperatorType.LESS_THAN; | |
| case GREATER_THAN_OR_EQUAL: | |
| return OperatorType.LESS_THAN_OR_EQUAL; | |
| case EQUAL: | |
| case NOT_EQUAL: | |
| return op; | |
| default: | |
| return op; | |
| } | |
| } | |
| private static OperatorType flip(OperatorType op) | |
| { | |
| switch (op) { | |
| case LESS_THAN: | |
| return OperatorType.GREATER_THAN; | |
| case LESS_THAN_OR_EQUAL: | |
| return OperatorType.GREATER_THAN_OR_EQUAL; | |
| case GREATER_THAN: | |
| return OperatorType.LESS_THAN; | |
| case GREATER_THAN_OR_EQUAL: | |
| return OperatorType.LESS_THAN_OR_EQUAL; | |
| case EQUAL: | |
| case NOT_EQUAL: | |
| return op; | |
| default: | |
| throw new IllegalArgumentException("Unsupported operator for flip: " + op); | |
| } | |
| } |
| // MAX with >= : pushdown safe | ||
| assertQueryWithSameQueryRunner(enabled, | ||
| "SELECT orderstatus, max(totalprice) FROM orders GROUP BY orderstatus HAVING max(totalprice) >= 100000", | ||
| disabled); |
There was a problem hiding this comment.
suggestion (testing): Add end-to-end coverage for ARBITRARY aggregates and additional comparison operators
These tests cover MAX/MIN and equality-based HAVING cases, but they don’t exercise the ARBITRARY path or non-equality comparisons (e.g., <, >, <>, !=). Please add at least one E2E query using HAVING arbitrary(col) <> value (and ideally another with > or <) and verify behavior with the optimization enabled/disabled to cover the ARBITRARY-specific branch of the rule.
| // MAX with >= : pushdown safe | |
| assertQueryWithSameQueryRunner(enabled, | |
| "SELECT orderstatus, max(totalprice) FROM orders GROUP BY orderstatus HAVING max(totalprice) >= 100000", | |
| disabled); | |
| // MAX with >= : pushdown safe | |
| assertQueryWithSameQueryRunner(enabled, | |
| "SELECT orderstatus, max(totalprice) FROM orders GROUP BY orderstatus HAVING max(totalprice) >= 100000", | |
| disabled); | |
| // ARBITRARY with <> : pushdown safe, exercises ARBITRARY-specific path with non-equality comparison | |
| assertQueryWithSameQueryRunner(enabled, | |
| "SELECT orderstatus, arbitrary(totalprice) FROM orders GROUP BY orderstatus HAVING arbitrary(totalprice) <> 0.0", | |
| disabled); | |
| // ARBITRARY with > : pushdown safe, exercises another non-equality comparison operator | |
| assertQueryWithSameQueryRunner(enabled, | |
| "SELECT orderstatus, arbitrary(totalprice) FROM orders GROUP BY orderstatus HAVING arbitrary(totalprice) > 0.0", | |
| disabled); |
| @Test | ||
| public void testPushesMaxGreaterThanOrEqual() | ||
| { | ||
| tester().assertThat(new PushFilterThroughSelectingAggregation(tester().getMetadata())) | ||
| .setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true") | ||
| .on(p -> { | ||
| VariableReferenceExpression x = p.variable("x", BIGINT); | ||
| VariableReferenceExpression g = p.variable("g", BIGINT); | ||
| VariableReferenceExpression maxX = p.variable("max_x", BIGINT); | ||
| return p.filter( |
There was a problem hiding this comment.
suggestion (testing): Extend rule-level tests to cover ARBITRARY with non-equality comparisons and flipped operands
The ARBITRARY branch in PushFilterThroughSelectingAggregation accepts any comparison operator and normalizes cases where the aggregate appears on the right. Currently only equality with the aggregate on the left is tested. Please add: (1) a test using a non-equality predicate like arb_x <> BIGINT '5', and (2) a test with flipped operands like BIGINT '5' <= arb_x to cover generic comparison handling and normalization for ARBITRARY.
Suggested implementation:
@Test
public void testPushesArbitraryNotEquals()
{
tester().assertThat(new PushFilterThroughSelectingAggregation(tester().getMetadata()))
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true")
.on(p -> {
VariableReferenceExpression x = p.variable("x", BIGINT);
VariableReferenceExpression g = p.variable("g", BIGINT);
VariableReferenceExpression arbX = p.variable("arb_x", BIGINT);
return p.filter(
p.rowExpression("arb_x <> BIGINT '5'"),
p.aggregation(ab -> ab
.source(p.values(x, g))
.addAggregation(arbX, p.rowExpression("arbitrary(x)"))
.singleGroupingSet(g)));
})
.matches(
aggregation(
singleGroupingSet("g"),
ImmutableMap.of(
"arb_x", functionCall("arbitrary", ImmutableList.of("x"))),
filter("x <> BIGINT '5'", values("x", "g"))));
}
@Test
public void testPushesArbitraryFlippedLessThanOrEqual()
{
tester().assertThat(new PushFilterThroughSelectingAggregation(tester().getMetadata()))
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true")
.on(p -> {
VariableReferenceExpression x = p.variable("x", BIGINT);
VariableReferenceExpression g = p.variable("g", BIGINT);
VariableReferenceExpression arbX = p.variable("arb_x", BIGINT);
return p.filter(
p.rowExpression("BIGINT '5' <= arb_x"),
p.aggregation(ab -> ab
.source(p.values(x, g))
.addAggregation(arbX, p.rowExpression("arbitrary(x)"))
.singleGroupingSet(g)));
})
.matches(
aggregation(
singleGroupingSet("g"),
ImmutableMap.of(
"arb_x", functionCall("arbitrary", ImmutableList.of("x"))),
filter("BIGINT '5' <= x", values("x", "g"))));
}
}- Ensure the following imports are present at the top of the file (they are typically already used elsewhere in this test class, but add them if missing):
import com.facebook.presto.spi.relation.VariableReferenceExpression;import com.facebook.presto.spi.type.BigintType;or the staticBIGINTimport used elsewhere.import com.facebook.presto.sql.planner.iterative.rule.PushFilterThroughSelectingAggregation;import static com.facebook.presto.SystemSessionProperties.PUSH_FILTER_THROUGH_SELECTING_AGGREGATION;import com.google.common.collect.ImmutableList;import com.google.common.collect.ImmutableMap;
- If the existing
testPushesArbitraryEqualstest uses a different variable name for the arbitrary aggregation output thanarb_x, align these new tests to use the same name for consistency (update both theVariableReferenceExpressionname and its use in the row expressions and match patterns). - If the existing tests use a different style for building
ImmutableMaporImmutableList(e.g.,ImmutableMap.builder()), adjust thematches(...)expressions accordingly to match the existing convention.
| @Test | ||
| public void testFilterOnMaxPushesAllTheWayDownToScan() | ||
| { | ||
| // Simpler case (no join): the pushed-down filter on totalprice should reach the orders | ||
| // table scan as a pre-aggregation filter. | ||
| String sql = "SELECT custkey, MAX(totalprice) FROM orders GROUP BY custkey HAVING MAX(totalprice) >= 100000"; | ||
|
|
||
| assertDistributedPlan(sql, enabled(), | ||
| anyTree(filter("totalprice >= DOUBLE '100000.0'", | ||
| tableScan("orders", ImmutableMap.of("totalprice", "totalprice", "custkey", "custkey"))))); |
There was a problem hiding this comment.
suggestion (testing): Add a distributed plan test for MIN or ARBITRARY to complement the MAX-focused scenarios
The existing distributed plan tests only cover MAX-based rewrites. Please add at least one distributed plan test for MIN (e.g., a MIN-based HAVING with a join chain to validate <=/< pushdown to the correct scan) and one for ARBITRARY (to confirm the filter is pushed through joins correctly), so that distributed planning is exercised consistently for all supported aggregate kinds.
Suggested implementation:
@Test
public void testDistributedMinHavingPushdown()
{
// MIN-based HAVING with a join chain to validate <=/< pushdown to the correct scan
assertDistributedPlan(
enabled(),
"SELECT o.orderkey, MIN(l.quantity) AS min_qty " +
"FROM orders o " +
"JOIN lineitem l ON o.orderkey = l.orderkey " +
"JOIN customer c ON o.custkey = c.custkey " +
"GROUP BY o.orderkey " +
"HAVING MIN(l.quantity) <= 5",
plan -> {
// Expect the MIN-based HAVING predicate to be pushed below the selecting aggregation
// and into the appropriate leaf scan in the distributed plan
assertPlan(
plan,
anyTree(
aggregation(
anyTree(
filter(
// The filter should reference the same symbol used
// for the MIN aggregation argument (l.quantity)
"quantity <= 5",
join(
anyTree(tableScan("orders")),
anyTree(tableScan("lineitem")),
anyTree(tableScan("customer"))))))));
});
}
@Test
public void testDistributedArbitraryHavingPushdown()
{
// ARBITRARY-based HAVING to confirm filter is pushed through joins correctly
assertDistributedPlan(
enabled(),
"SELECT o.orderkey, ARBITRARY(l.comment) AS any_comment " +
"FROM orders o " +
"JOIN lineitem l ON o.orderkey = l.orderkey " +
"JOIN customer c ON o.custkey = c.custkey " +
"GROUP BY o.orderkey " +
"HAVING ARBITRARY(l.comment) IS NOT NULL",
plan -> {
// The predicate on ARBITRARY's argument should be pushed through the selecting
// aggregation and applied on the correct leaf side of the join chain
assertPlan(
plan,
anyTree(
aggregation(
anyTree(
filter(
"comment IS NOT NULL",
join(
anyTree(tableScan("orders")),
anyTree(tableScan("lineitem")),
anyTree(tableScan("customer"))))))));
});
}
private Session disabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "false")
.build();
}
private Session enabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(PUSH_FILTER_THROUGH_SELECTING_AGGREGATION, "true")
.build();
}- Align helper usage: The example above assumes
assertDistributedPlan,assertPlan,anyTree,aggregation,filter,join, andtableScanhelpers are already used elsewhere in this test class. If the existing distributed MAX tests use different helpers or patterns (e.g.,assertDistributedPlan("sql", enabled(), plan -> {...})with a different argument order or different matchers), mirror that style instead of the one shown. - Adjust SQL and table names: This snippet uses TPCH-style tables (
orders,lineitem,customer). If this test class uses different schemas or fully qualified names (e.g.,SELECT ... FROM lineitemwithout joins, ortpch.tiny.orders), change the queries accordingly to match the existing MAX distributed tests and the catalog inBaseRuleTest'sgetQueryRunner. - Tweak predicates and symbols: Update
"quantity <= 5"and"comment IS NOT NULL"to match the actual symbol names used in your existing plan matchers (for example, they might be aliased asQorc_comment). Ensure the HAVING predicates and the filter matchers refer to the same expressions so that the matcher accurately verifies pushdown to the correct scan. - Place the tests consistently: If the current class groups all distributed-plan tests together (e.g., near the existing MAX distributed tests), move these new test methods next to them, keeping the
disabled()/enabled()session helpers at the bottom of the class as in the existing structure.
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull branch, local doc build, looks good. Thanks!
…tions
Add PushFilterThroughSelectingAggregation, an iterative rule that pushes
HAVING-style filters on the output of single-value-selecting aggregates
(MAX, MIN, ARBITRARY) below the aggregation when the predicate direction
matches the aggregate's selection semantics. Predicate pushdown then carries
the rewritten filter through any joins/projections all the way to the
underlying table scan, enabling much earlier row reduction.
Strict-direction REPLACE pushdown (drop the HAVING):
HAVING max(x) >= c → WHERE x >= c
HAVING max(x) > c → WHERE x > c
HAVING min(x) <= c → WHERE x <= c
HAVING min(x) < c → WHERE x < c
HAVING arbitrary(x) op c → WHERE x op c (any comparison; nondeterministic)
Equality on MAX/MIN gets ADD-pre-filter + KEEP-HAVING (a direct WHERE x = c
would accept spurious groups):
HAVING max(x) = c → WHERE x >= c + HAVING max(x) = c
HAVING min(x) = c → WHERE x <= c + HAVING min(x) = c
Safety conditions:
* Only one aggregate in the AggregationNode (otherwise pushing the filter
below changes the row set seen by other aggregates)
* Aggregate argument is a direct VariableReferenceExpression (no expressions,
DISTINCT, FILTER, MASK, ORDER BY)
* Comparison is deterministic
* Non-aggregate side references only variables available below the aggregation
(so it can be evaluated below; predicate pushdown can then push it further)
* Walks through any chain of identity-passing ProjectNodes between the
Filter and the Aggregation (the planner inserts these for hash columns)
* Resolves GroupReferences via Lookup and dedupes against existing source
filters to avoid re-firing on subsequent iterations
Registered in PlanOptimizers between EliminateCrossJoins and predicatePushDown
so the new below-aggregation filter can be pushed through joins, projections,
and into the table scan.
Gated by the optimize_row_in_predicate-style session property
push_filter_through_selecting_aggregation, disabled by default.
== RELEASE NOTES ==
General Changes
* Add :func:`push_filter_through_selecting_aggregation` session property that
pushes HAVING-style filters on MAX/MIN/ARBITRARY aggregate outputs below
the aggregation when the predicate direction matches, enabling earlier row
reduction by allowing predicate pushdown to carry the rewritten filter all
the way to the table scan.
1a06cfb to
9978a1d
Compare
jja725
left a comment
There was a problem hiding this comment.
Thanks for this great optimization. Could you help me understand the following situation?
-- no rows satisfy x >= 100
-- Original
SELECT MAX(x) FROM t HAVING MAX(x) >= 100
-- Rule's rewrite (REPLACE form, drops the HAVING)
SELECT MAX(x) FROM t WHERE x >= 100
Summary
PushFilterThroughSelectingAggregation, an iterative rule that pushes HAVING-style filters on the output of single-value-selecting aggregates (MAX,MIN,ARBITRARY) below the aggregation when the predicate direction matches the aggregate's selection semantics. Predicate pushdown then carries the rewritten filter through any joins/projections all the way to the underlying table scan.HAVING max(x) >= c/> c→WHERE x >= c/> cHAVING min(x) <= c/< c→WHERE x <= c/< cHAVING arbitrary(x) op c(any comparison) →WHERE x op cMAX/MINuses ADD-pre-filter + KEEP-HAVING (a directWHERE x = cwould accept spurious groups):HAVING max(x) = c→WHERE x >= c+HAVING max(x) = cHAVING min(x) = c→WHERE x <= c+HAVING min(x) = cProjectNodes between the Filter and the Aggregation (the planner inserts these for hash columns), and dedupes against existing source filters to avoid re-firing.EliminateCrossJoinsandpredicatePushDownso the new below-aggregation filter can be pushed through joins/projections into the scan. Gated bypush_filter_through_selecting_aggregationsession property, disabled by default.Test plan
TestPushFilterThroughSelectingAggregation(14 tests): all five matching-direction REPLACE cases, both=keep-HAVING shapes (MAX & MIN), unsafe directions (no-fire),=no-fire pre-rule semantics, multi-agg no-fire, expression-arg no-fire, sum no-fire, gating, and partial-pushdown of mixed conjunctionsTestFeaturesConfigcovers the newpush_filter_through_selecting_aggregationsession propertyTestPushFilterThroughSelectingAggregationPlans(5 tests): pushdown reachesordersscan in the no-join, single-join (CTE+JOIN), and two-join (orders×lineitem×customer) cases; pushdown stays as the orders/lineitem inner-join filter when the aggregated expression pulls from both sides; gating off keeps the predicate above the aggregationAbstractTestQueries(testPushFilterThroughSelectingAggregation): 11 SQL cases comparing optimization on/off across all rewrite shapes including=, multi-agg no-fire, reversed-comparison, mixed conjunction, and CTE-MAX-then-WHERE patterns. 529/529TestLocalQueriesgreenpresto-docs/.../properties-session.rst== RELEASE NOTES ==
General Changes
push_filter_through_selecting_aggregationsession property that pushes HAVING-style filters on MAX/MIN/ARBITRARY aggregate outputs below the aggregation when the predicate direction matches, enabling earlier row reduction by allowing predicate pushdown to carry the rewritten filter all the way to the table scan.