Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import org.apache.spark.sql.catalyst.rules.Rule
* 4) Aggregate
* 5) Other permissible unary operators. please see [[PushPredicateThroughNonJoin.canPushThrough]].
*/
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
with PredicateHelper
with JoinSelectionHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// LeftSemi/LeftAnti over Project
case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
Expand All @@ -51,10 +53,11 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
}

// LeftSemi/LeftAnti over Aggregate
// LeftSemi/LeftAnti over Aggregate, only push down if join can be planned as broadcast join.
case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(_), _, _)
if agg.aggregateExpressions.forall(_.deterministic) && agg.groupingExpressions.nonEmpty &&
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
canPlanAsBroadcastHashJoin(join, conf) =>
val aliasMap = getAliasMap(agg)
val canPushDownPredicate = (predicate: Expression) => {
val replaced = replaceAlias(predicate, aliasMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,13 @@ trait JoinSelectionHelper {
}
}

def canPlanAsBroadcastHashJoin(join: Join, conf: SQLConf): Boolean = {
getBroadcastBuildSide(join.left, join.right, join.joinType,
join.hint, hintOnly = true, conf).isDefined ||
getBroadcastBuildSide(join.left, join.right, join.joinType,
join.hint, hintOnly = false, conf).isDefined
}

def hintToBroadcastLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(BROADCAST))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

class LeftSemiPushdownSuite extends PlanTest {
Expand Down Expand Up @@ -443,4 +444,28 @@ class LeftSemiPushdownSuite extends PlanTest {
}
}

Seq(LeftSemi, LeftAnti).foreach { jt =>
test(s"SPARK-34081: $jt only push down if join can be planned as broadcast join") {
Seq(-1, 100000).foreach { threshold =>
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> threshold.toString) {
val originalQuery = testRelation
.groupBy('b)('b)
.join(testRelation1, joinType = jt, condition = Some('b <=> 'd))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = if (threshold > 0) {
testRelation
.join(testRelation1, joinType = jt, condition = Some('b <=> 'd))
.groupBy('b)('b)
.analyze
} else {
originalQuery.analyze
}

comparePlans(optimized, correctAnswer)
}
}
}
}

}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.