Skip to content

Commit bcb1ae0

Browse files
rxinmarmbrus
authored andcommitted
[SPARK-3857] Create joins package for various join operators.
Author: Reynold Xin <[email protected]> Closes apache#2719 from rxin/sql-join-break and squashes the following commits: 0c0082b [Reynold Xin] Fix line length. cbc664c [Reynold Xin] Rename join -> joins package. a070d44 [Reynold Xin] Fix line length in HashJoin a39be8c [Reynold Xin] [SPARK-3857] Create a join package for various join operators.
1 parent 3e4f09d commit bcb1ae0

File tree

15 files changed

+844
-646
lines changed

15 files changed

+844
-646
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ import org.apache.spark.sql.catalyst.types._
2727
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
2828
import org.apache.spark.sql.parquet._
2929

30+
3031
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3132
self: SQLContext#SparkPlanner =>
3233

3334
object LeftSemiJoin extends Strategy with PredicateHelper {
3435
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3536
// Find left semi joins where at least some predicates can be evaluated by matching join keys
3637
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
37-
val semiJoin = execution.LeftSemiJoinHash(
38+
val semiJoin = joins.LeftSemiJoinHash(
3839
leftKeys, rightKeys, planLater(left), planLater(right))
3940
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
4041
// no predicate can be evaluated by matching hash keys
4142
case logical.Join(left, right, LeftSemi, condition) =>
42-
execution.LeftSemiJoinBNL(
43-
planLater(left), planLater(right), condition) :: Nil
43+
joins.LeftSemiJoinBNL(planLater(left), planLater(right), condition) :: Nil
4444
case _ => Nil
4545
}
4646
}
@@ -50,13 +50,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
5050
* evaluated by matching hash keys.
5151
*
5252
* This strategy applies a simple optimization based on the estimates of the physical sizes of
53-
* the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an
53+
* the two join sides. When planning a [[joins.BroadcastHashJoin]], if one side has an
5454
* estimated physical size smaller than the user-settable threshold
5555
* [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
5656
* ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
5757
* ''broadcasted'' to all of the executors involved in the join, as a
5858
* [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they
59-
* will instead be used to decide the build side in a [[execution.ShuffledHashJoin]].
59+
* will instead be used to decide the build side in a [[joins.ShuffledHashJoin]].
6060
*/
6161
object HashJoin extends Strategy with PredicateHelper {
6262

@@ -66,8 +66,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
6666
left: LogicalPlan,
6767
right: LogicalPlan,
6868
condition: Option[Expression],
69-
side: BuildSide) = {
70-
val broadcastHashJoin = execution.BroadcastHashJoin(
69+
side: joins.BuildSide) = {
70+
val broadcastHashJoin = execution.joins.BroadcastHashJoin(
7171
leftKeys, rightKeys, side, planLater(left), planLater(right))
7272
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
7373
}
@@ -76,27 +76,26 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7676
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
7777
if sqlContext.autoBroadcastJoinThreshold > 0 &&
7878
right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
79-
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
79+
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
8080

8181
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
8282
if sqlContext.autoBroadcastJoinThreshold > 0 &&
8383
left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
84-
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
84+
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
8585

8686
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
8787
val buildSide =
8888
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
89-
BuildRight
89+
joins.BuildRight
9090
} else {
91-
BuildLeft
91+
joins.BuildLeft
9292
}
93-
val hashJoin =
94-
execution.ShuffledHashJoin(
95-
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
93+
val hashJoin = joins.ShuffledHashJoin(
94+
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
9695
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
9796

9897
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
99-
execution.HashOuterJoin(
98+
joins.HashOuterJoin(
10099
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
101100

102101
case _ => Nil
@@ -164,8 +163,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
164163
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
165164
case logical.Join(left, right, joinType, condition) =>
166165
val buildSide =
167-
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) BuildRight else BuildLeft
168-
execution.BroadcastNestedLoopJoin(
166+
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
167+
joins.BuildRight
168+
} else {
169+
joins.BuildLeft
170+
}
171+
joins.BroadcastNestedLoopJoin(
169172
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
170173
case _ => Nil
171174
}
@@ -174,10 +177,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
174177
object CartesianProduct extends Strategy {
175178
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
176179
case logical.Join(left, right, _, None) =>
177-
execution.CartesianProduct(planLater(left), planLater(right)) :: Nil
180+
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
178181
case logical.Join(left, right, Inner, Some(condition)) =>
179182
execution.Filter(condition,
180-
execution.CartesianProduct(planLater(left), planLater(right))) :: Nil
183+
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
181184
case _ => Nil
182185
}
183186
}

0 commit comments

Comments
 (0)