From 14cff800ff2c33ffcb1eaa70bab8109b163cc9d0 Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Tue, 20 May 2014 17:33:32 +0800 Subject: [PATCH 1/7] add support for left semi join --- .../spark/sql/catalyst/plans/joinTypes.scala | 1 + .../org/apache/spark/sql/SQLContext.scala | 1 + .../spark/sql/execution/SparkStrategies.scala | 16 ++ .../apache/spark/sql/execution/joins.scala | 144 ++++++++++++++++++ .../apache/spark/sql/hive/HiveContext.scala | 1 + .../org/apache/spark/sql/hive/HiveQl.scala | 1 + ...emijoin-0-80b6466213face7fbcb0de044611e1f5 | 0 ...emijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 | 0 ...emijoin-2-43d53504df013e6b35f81811138a167a | 1 + ...emijoin-3-b07d292423312aafa5e5762a579decd2 | 0 ...emijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be | 0 ...emijoin-5-9c307c0559d735960ce77efa95b2b17b | 0 ...emijoin-6-82921fc96eef547ec0f71027ee88298c | 0 ...emijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 | 0 ...join_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 | 0 ...join_mr-1-aa3f07f028027ffd13ab5535dc821593 | 0 ...oin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 | 1 + ...oin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 | 2 + ...join_mr-2-3f65953ae60375156367c54533978782 | 0 ...join_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 | 0 ...join_mr-4-333895fe6abca27c8edb5c91bfe10d2f | 2 + ...join_mr-5-896d0948c1df849df9764a6d8ad8fff9 | 20 +++ ...join_mr-6-b1e2ade89ae898650f0be4f796d8947b | 1 + ...join_mr-7-8e9c2969b999557363e40f9ebb3f6d7c | 1 + ...join_mr-8-c61b972d4409babe41d8963e841af45b | 1 + ...join_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 | 2 + .../execution/HiveCompatibilitySuite.scala | 2 + 27 files changed, 197 insertions(+) create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-0-80b6466213face7fbcb0de044611e1f5 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-3-b07d292423312aafa5e5762a579decd2 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-5-9c307c0559d735960ce77efa95b2b17b create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-6-82921fc96eef547ec0f71027ee88298c create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-1-aa3f07f028027ffd13ab5535dc821593 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-2-3f65953ae60375156367c54533978782 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index ae8d7d3e4257f..593be6ca5e40c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -22,3 +22,4 @@ case object Inner extends JoinType case object LeftOuter extends JoinType case object RightOuter extends JoinType case object FullOuter extends JoinType +case object LeftSemi extends JoinType \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bfebfa0c28c52..5ebdffae30ad0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -193,6 +193,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val strategies: Seq[Strategy] = TakeOrdered :: PartialAggregation :: + LeftSemiJoin :: HashJoin :: ParquetOperations :: BasicOperators :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f763106da4e0e..7a93d2b6ee661 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -28,6 +28,22 @@ import org.apache.spark.sql.parquet._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => + object LeftSemiJoin extends Strategy with PredicateHelper { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // Find leftsemi joins where at least some predicates can be evaluated by matching hash keys + // using the HashFilteredJoin pattern. + case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) => + val semiJoin = + execution.LeftSemiJoinHash(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil + // no predicate can be evaluated by matching hash keys + case logical.Join(left, right, LeftSemi, condition) => + execution.LeftSemiJoinBNL( + planLater(left), planLater(right), LeftSemi, condition)(sparkContext) :: Nil + case _ => Nil + } + } + object HashJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // Find inner joins where at least some predicates can be evaluated by matching hash keys diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 31cc26962ad93..302250a1a6cc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -140,6 +140,150 @@ case class HashJoin( } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class LeftSemiJoinHash( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan) extends BinaryNode { + + override def outputPartitioning: Partitioning = left.outputPartitioning + + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + val (buildPlan, streamedPlan) = buildSide match { + case BuildLeft => (left, right) + case BuildRight => (right, left) + } + + val (buildKeys, streamedKeys) = buildSide match { + case BuildLeft => (leftKeys, rightKeys) + case BuildRight => (rightKeys, leftKeys) + } + + def output = left.output + + @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) + @transient lazy val streamSideKeyGenerator = + () => new MutableProjection(streamedKeys, streamedPlan.output) + + def execute() = { + + buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => + // TODO: Use Spark's HashMap implementation. + val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() + var currentRow: Row = null + + // Create a mapping of buildKeys -> rows + while (buildIter.hasNext) { + currentRow = buildIter.next() + val rowKey = buildSideKeyGenerator(currentRow) + if(!rowKey.anyNull) { + val existingMatchList = hashTable.get(rowKey) + val matchList = if (existingMatchList == null) { + val newMatchList = new ArrayBuffer[Row]() + hashTable.put(rowKey, newMatchList) + newMatchList + } else { + existingMatchList + } + matchList += currentRow.copy() + } + } + + new Iterator[Row] { + private[this] var currentStreamedRow: Row = _ + private[this] var currentHashMatched: Boolean = false + + private[this] val joinKeys = streamSideKeyGenerator() + + override final def hasNext: Boolean = + streamIter.hasNext && fetchNext() + + override final def next() = { + currentStreamedRow + } + + /** + * Searches the streamed iterator for the next row that has at least one match in hashtable. + * + * @return true if the search is successful, and false the streamed iterator runs out of + * tuples. + */ + private final def fetchNext(): Boolean = { + currentHashMatched = false + while (!currentHashMatched && streamIter.hasNext) { + currentStreamedRow = streamIter.next() + if (!joinKeys(currentStreamedRow).anyNull) { + currentHashMatched = true + } + } + currentHashMatched + } + } + } + } +} + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class LeftSemiJoinBNL( + streamed: SparkPlan, broadcast: SparkPlan, joinType: JoinType, condition: Option[Expression]) + (@transient sc: SparkContext) + extends BinaryNode { + // TODO: Override requiredChildDistribution. + + override def outputPartitioning: Partitioning = streamed.outputPartitioning + + override def otherCopyArgs = sc :: Nil + + def output = left.output + + /** The Streamed Relation */ + def left = streamed + /** The Broadcast relation */ + def right = broadcast + + @transient lazy val boundCondition = + InterpretedPredicate( + condition + .map(c => BindReferences.bindReference(c, left.output ++ right.output)) + .getOrElse(Literal(true))) + + + def execute() = { + val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) + + val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter => + val joinedRow = new JoinedRow + + streamedIter.filter(streamedRow => { + var i = 0 + var matched = false + + while (i < broadcastedRelation.value.size && !matched) { + // TODO: One bitset per partition instead of per row. + val broadcastedRow = broadcastedRelation.value(i) + if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { + matched = true + } + i += 1 + } + matched + }).map(streamedRow => (streamedRow, null)) + } + + streamedPlusMatches.map(_._1) + } +} + /** * :: DeveloperApi :: */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b21f24dad785d..fbab2ac16b896 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -224,6 +224,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { DataSinks, Scripts, PartialAggregation, + LeftSemiJoin, HashJoin, BasicOperators, CartesianProduct, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 1f688fe1117fe..ccfaac92a29bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -680,6 +680,7 @@ private[hive] object HiveQl { case "TOK_RIGHTOUTERJOIN" => RightOuter case "TOK_LEFTOUTERJOIN" => LeftOuter case "TOK_FULLOUTERJOIN" => FullOuter + case "TOK_LEFTSEMIJOIN" => LeftSemi } assert(other.size <= 1, "Unhandled join clauses.") Join(nodeToRelation(relation1), diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-0-80b6466213face7fbcb0de044611e1f5 b/sql/hive/src/test/resources/golden/leftsemijoin-0-80b6466213face7fbcb0de044611e1f5 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 b/sql/hive/src/test/resources/golden/leftsemijoin-1-d1f6a3dea28a5f0fee08026bf33d9129 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a b/sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-2-43d53504df013e6b35f81811138a167a @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-3-b07d292423312aafa5e5762a579decd2 b/sql/hive/src/test/resources/golden/leftsemijoin-3-b07d292423312aafa5e5762a579decd2 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be b/sql/hive/src/test/resources/golden/leftsemijoin-4-3ac2226efe7cb5d999c1c5e4ac2114be new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-5-9c307c0559d735960ce77efa95b2b17b b/sql/hive/src/test/resources/golden/leftsemijoin-5-9c307c0559d735960ce77efa95b2b17b new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-6-82921fc96eef547ec0f71027ee88298c b/sql/hive/src/test/resources/golden/leftsemijoin-6-82921fc96eef547ec0f71027ee88298c new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 b/sql/hive/src/test/resources/golden/leftsemijoin-7-b30aa3b4a45db6b64bb46b4d9bd32ff0 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-0-7087fb6281a34d00f1812d2ff4ba8b75 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-1-aa3f07f028027ffd13ab5535dc821593 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-1-aa3f07f028027ffd13ab5535dc821593 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-10-9914f44ecb6ae7587b62e5349ff60d04 @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 new file mode 100644 index 0000000000000..6ed281c757a96 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-11-2027ecb1495d5550c5d56abf6b95b0a7 @@ -0,0 +1,2 @@ +1 +1 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-2-3f65953ae60375156367c54533978782 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-2-3f65953ae60375156367c54533978782 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-3-645cf8b871c9b27418d6fa1d1bda9a52 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f b/sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f new file mode 100644 index 0000000000000..6ed281c757a96 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-4-333895fe6abca27c8edb5c91bfe10d2f @@ -0,0 +1,2 @@ +1 +1 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 new file mode 100644 index 0000000000000..179ef0e0209e9 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-5-896d0948c1df849df9764a6d8ad8fff9 @@ -0,0 +1,20 @@ +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b b/sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-6-b1e2ade89ae898650f0be4f796d8947b @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c b/sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-7-8e9c2969b999557363e40f9ebb3f6d7c @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b b/sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-8-c61b972d4409babe41d8963e841af45b @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 b/sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 new file mode 100644 index 0000000000000..6ed281c757a96 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin_mr-9-2027ecb1495d5550c5d56abf6b95b0a7 @@ -0,0 +1,2 @@ +1 +1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index d83732b51e9c2..a341524b3787e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -478,6 +478,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "lateral_view_cp", "lateral_view_outer", "lateral_view_ppd", + "leftsemijoin", + "leftsemijoin_mr", "lineage1", "literal_double", "literal_ints", From 83a3c8af72237018fb9b746e3e1a2707dfcad004 Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Wed, 21 May 2014 16:48:15 +0800 Subject: [PATCH 2/7] scala style fix --- .../org/apache/spark/sql/catalyst/plans/joinTypes.scala | 2 +- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 +++--- .../main/scala/org/apache/spark/sql/execution/joins.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index 593be6ca5e40c..613f4bb09daf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -22,4 +22,4 @@ case object Inner extends JoinType case object LeftOuter extends JoinType case object RightOuter extends JoinType case object FullOuter extends JoinType -case object LeftSemi extends JoinType \ No newline at end of file +case object LeftSemi extends JoinType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7a93d2b6ee661..1ed2ea709d361 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,13 +33,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Find leftsemi joins where at least some predicates can be evaluated by matching hash keys // using the HashFilteredJoin pattern. case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) => - val semiJoin = - execution.LeftSemiJoinHash(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + val semiJoin = execution.LeftSemiJoinHash( + leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil // no predicate can be evaluated by matching hash keys case logical.Join(left, right, LeftSemi, condition) => execution.LeftSemiJoinBNL( - planLater(left), planLater(right), LeftSemi, condition)(sparkContext) :: Nil + planLater(left), planLater(right), condition)(sparkContext) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 302250a1a6cc0..a503875418674 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -235,7 +235,7 @@ case class LeftSemiJoinHash( */ @DeveloperApi case class LeftSemiJoinBNL( - streamed: SparkPlan, broadcast: SparkPlan, joinType: JoinType, condition: Option[Expression]) + streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression]) (@transient sc: SparkContext) extends BinaryNode { // TODO: Override requiredChildDistribution. From 8d4a121bc4f279a91d67abb0d459fbd1cd25d4c4 Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Wed, 21 May 2014 23:51:00 +0800 Subject: [PATCH 3/7] add golden files for leftsemijoin --- .../sql/catalyst/plans/logical/basicOperators.scala | 9 +++++++-- .../leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea | 4 ++++ .../leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 | 0 .../leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 | 0 .../golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 | 4 ++++ .../leftsemijoin-9-c5efa6b8771a51610d655be461670e1e | 2 ++ 6 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 create mode 100644 sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 732708e146b04..d3347b622f3d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.{LeftSemi, JoinType} import org.apache.spark.sql.catalyst.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { @@ -81,7 +81,12 @@ case class Join( condition: Option[Expression]) extends BinaryNode { def references = condition.map(_.references).getOrElse(Set.empty) - def output = left.output ++ right.output + def output = joinType match { + case LeftSemi => + left.output + case _ => + left.output ++ right.output + } } case class InsertIntoTable( diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea b/sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea new file mode 100644 index 0000000000000..25ce912507d55 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-10-89737a8857b5b61cc909e0c797f86aea @@ -0,0 +1,4 @@ +Hank 2 +Hank 2 +Joe 2 +Joe 2 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 b/sql/hive/src/test/resources/golden/leftsemijoin-11-80b6466213face7fbcb0de044611e1f5 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 b/sql/hive/src/test/resources/golden/leftsemijoin-12-d1f6a3dea28a5f0fee08026bf33d9129 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 b/sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 new file mode 100644 index 0000000000000..25ce912507d55 --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-8-73cad58a10a1483ccb15e94a857013 @@ -0,0 +1,4 @@ +Hank 2 +Hank 2 +Joe 2 +Joe 2 diff --git a/sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e b/sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e new file mode 100644 index 0000000000000..f1470bad5782b --- /dev/null +++ b/sql/hive/src/test/resources/golden/leftsemijoin-9-c5efa6b8771a51610d655be461670e1e @@ -0,0 +1,2 @@ +2 Tie +2 Tie From 4c726e57381afebf37867e2a60c24aa48295e41b Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Tue, 3 Jun 2014 13:57:20 +0800 Subject: [PATCH 4/7] improvement according to Michael --- .../sql/catalyst/planning/patterns.scala | 5 ++ .../spark/sql/execution/SparkStrategies.scala | 6 +-- .../apache/spark/sql/execution/joins.scala | 49 +++++++------------ 3 files changed, 26 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 0e3a8a6bd30a8..f09a9fa91bf05 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -119,6 +119,11 @@ object HashFilteredJoin extends Logging with PredicateHelper { case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) => logger.debug(s"Considering hash inner join on: ${predicates ++ condition}") splitPredicates(predicates ++ condition, join) + // All predicates can be evaluated for left semi join (those that are in the WHERE + // clause can only from left table, so they can all be pushed down.) + case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) => + logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}") + splitPredicates(predicates ++ condition, join) case join @ Join(left, right, joinType, condition) => logger.debug(s"Considering hash join on: $condition") splitPredicates(condition.toSeq, join) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1ed2ea709d361..21a41f266c1ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -30,11 +30,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find leftsemi joins where at least some predicates can be evaluated by matching hash keys - // using the HashFilteredJoin pattern. + // Find left semi joins where at least some predicates can be evaluated by matching hash + // keys using the HashFilteredJoin pattern. case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) => val semiJoin = execution.LeftSemiJoinHash( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + leftKeys, rightKeys, planLater(left), planLater(right)) condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil // no predicate can be evaluated by matching hash keys case logical.Join(left, right, LeftSemi, condition) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index a503875418674..88ff3d49a79b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -142,29 +142,23 @@ case class HashJoin( /** * :: DeveloperApi :: + * Build the right table's join keys into a HashSet, and iteratively go through the left + * table, to find the if join keys are in the Hash set. */ @DeveloperApi case class LeftSemiJoinHash( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - buildSide: BuildSide, - left: SparkPlan, - right: SparkPlan) extends BinaryNode { + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryNode { override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - val (buildPlan, streamedPlan) = buildSide match { - case BuildLeft => (left, right) - case BuildRight => (right, left) - } - - val (buildKeys, streamedKeys) = buildSide match { - case BuildLeft => (leftKeys, rightKeys) - case BuildRight => (rightKeys, leftKeys) - } + val (buildPlan, streamedPlan) = (right, left) + val (buildKeys, streamedKeys) = (rightKeys, leftKeys) def output = left.output @@ -175,24 +169,18 @@ case class LeftSemiJoinHash( def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => - // TODO: Use Spark's HashMap implementation. - val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() + val hashTable = new java.util.HashSet[Row]() var currentRow: Row = null - // Create a mapping of buildKeys -> rows + // Create a Hash set of buildKeys while (buildIter.hasNext) { currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) if(!rowKey.anyNull) { - val existingMatchList = hashTable.get(rowKey) - val matchList = if (existingMatchList == null) { - val newMatchList = new ArrayBuffer[Row]() - hashTable.put(rowKey, newMatchList) - newMatchList - } else { - existingMatchList + val keyExists = hashTable.contains(rowKey) + if (!keyExists) { + hashTable.add(rowKey) } - matchList += currentRow.copy() } } @@ -220,7 +208,7 @@ case class LeftSemiJoinHash( while (!currentHashMatched && streamIter.hasNext) { currentStreamedRow = streamIter.next() if (!joinKeys(currentStreamedRow).anyNull) { - currentHashMatched = true + currentHashMatched = hashTable.contains(joinKeys.currentValue) } } currentHashMatched @@ -232,6 +220,8 @@ case class LeftSemiJoinHash( /** * :: DeveloperApi :: + * Using BroadcastNestedLoopJoin to calculate left semi join result when there's no join keys + * for hash join. */ @DeveloperApi case class LeftSemiJoinBNL( @@ -261,7 +251,7 @@ case class LeftSemiJoinBNL( def execute() = { val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) - val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter => + streamed.execute().mapPartitions { streamedIter => val joinedRow = new JoinedRow streamedIter.filter(streamedRow => { @@ -269,7 +259,6 @@ case class LeftSemiJoinBNL( var matched = false while (i < broadcastedRelation.value.size && !matched) { - // TODO: One bitset per partition instead of per row. val broadcastedRow = broadcastedRelation.value(i) if (boundCondition(joinedRow(streamedRow, broadcastedRow))) { matched = true @@ -277,10 +266,8 @@ case class LeftSemiJoinBNL( i += 1 } matched - }).map(streamedRow => (streamedRow, null)) + }) } - - streamedPlusMatches.map(_._1) } } From 5ec6fa4bfaa2adef795d95f2de3aa88be700e851 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 7 Jun 2014 12:22:17 -0700 Subject: [PATCH 5/7] Add left semi to SQL Parser. --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index b3a3a1ef1b5eb..afe3a531c9c08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -128,6 +128,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val OUTER = Keyword("OUTER") protected val RIGHT = Keyword("RIGHT") protected val SELECT = Keyword("SELECT") + protected val SEMI = Keyword("SEMI") protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") protected val TRUE = Keyword("TRUE") @@ -238,6 +239,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val joinType: Parser[JoinType] = INNER ^^^ Inner | + LEFT ~ SEMI ^^^ LeftSemi | LEFT ~ opt(OUTER) ^^^ LeftOuter | RIGHT ~ opt(OUTER) ^^^ RightOuter | FULL ~ opt(OUTER) ^^^ FullOuter From 035b73eb625aa0c43ac3d5b3cb00a9d60265742b Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 7 Jun 2014 12:22:35 -0700 Subject: [PATCH 6/7] Add test for left semi that can't be done with a hash join. --- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e966d89c30cf5..2b7992e993734 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -34,6 +34,13 @@ class SQLQuerySuite extends QueryTest { arrayData.map(d => (d.data, d.data(0), d.data(0) + d.data(1), d.data(1))).collect().toSeq) } + test("left semi greater than predicate") { + checkAnswer( + sql("SELECT * FROM testData2 x LEFT SEMI JOIN testData2 y ON x.a >= y.a + 2"), + Seq((3,1), (3,2)) + ) + } + test("index into array of arrays") { checkAnswer( sql( From 6713c09998a14e188f969d61eef79fbf30b01ee2 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 7 Jun 2014 12:22:50 -0700 Subject: [PATCH 7/7] Better debugging for failed query tests. --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index d6072b402a044..d7f6abaf5d381 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -44,7 +44,7 @@ class QueryTest extends FunSuite { fail( s""" |Exception thrown while executing query: - |${rdd.logicalPlan} + |${rdd.queryExecution} |== Exception == |$e """.stripMargin)