@@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning
19
19
20
20
import scala .annotation .tailrec
21
21
22
+ import org .apache .spark .sql .Logging
23
+
22
24
import org .apache .spark .sql .catalyst .expressions ._
25
+ import org .apache .spark .sql .catalyst .plans ._
23
26
import org .apache .spark .sql .catalyst .plans .logical ._
24
27
25
28
/**
@@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper {
101
104
}
102
105
}
103
106
107
+ /**
108
+ * A pattern that finds joins with equality conditions that can be evaluated using hashing
109
+ * techniques. For inner joins, any filters on top of the join operator are also matched.
110
+ */
111
+ object HashFilteredJoin extends Logging with PredicateHelper {
112
+ /** (joinType, rightKeys, leftKeys, condition, left, right) */
113
+ type ReturnType =
114
+ (JoinType , Seq [Expression ], Seq [Expression ], Option [Expression ], LogicalPlan , LogicalPlan )
115
+
116
+ def unapply (plan : LogicalPlan ): Option [ReturnType ] = plan match {
117
+ // All predicates can be evaluated for inner join (i.e., those that are in the ON
118
+ // clause and WHERE clause.)
119
+ case FilteredOperation (predicates, join @ Join (left, right, Inner , condition)) =>
120
+ logger.debug(s " Considering hash inner join on: ${predicates ++ condition}" )
121
+ splitPredicates(predicates ++ condition, join)
122
+ case join @ Join (left, right, joinType, condition) =>
123
+ logger.debug(s " Considering hash join on: $condition" )
124
+ splitPredicates(condition.toSeq, join)
125
+ case _ => None
126
+ }
127
+
128
+ // Find equi-join predicates that can be evaluated before the join, and thus can be used
129
+ // as join keys.
130
+ def splitPredicates (allPredicates : Seq [Expression ], join : Join ): Option [ReturnType ] = {
131
+ val Join (left, right, joinType, _) = join
132
+ val (joinPredicates, otherPredicates) = allPredicates.partition {
133
+ case Equals (l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
134
+ (canEvaluate(l, right) && canEvaluate(r, left)) => true
135
+ case _ => false
136
+ }
137
+
138
+ val joinKeys = joinPredicates.map {
139
+ case Equals (l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
140
+ case Equals (l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
141
+ }
142
+
143
+ // Do not consider this strategy if there are no join keys.
144
+ if (joinKeys.nonEmpty) {
145
+ val leftKeys = joinKeys.map(_._1)
146
+ val rightKeys = joinKeys.map(_._2)
147
+
148
+ Some ((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And ), left, right))
149
+ } else {
150
+ logger.debug(s " Avoiding hash join with no join keys. " )
151
+ None
152
+ }
153
+ }
154
+ }
155
+
104
156
/**
105
157
* A pattern that collects all adjacent unions and returns their children as a Seq.
106
158
*/
0 commit comments