Skip to content

Commit 18eb214

Browse files
committed
Merge branch 'SPARK-3861-broadcast-hash' into SPARK-3861-broadcast-hash-1
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
2 parents bcb1ae0 + 4b9d0c9 commit 18eb214

File tree

5 files changed

+187
-32
lines changed

5 files changed

+187
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
2222
import scala.concurrent.ExecutionContext.Implicits.global
2323

2424
import org.apache.spark.annotation.DeveloperApi
25-
import org.apache.spark.sql.catalyst.expressions.Expression
25+
import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
2626
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnspecifiedDistribution}
2727
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
2828

@@ -49,14 +49,16 @@ case class BroadcastHashJoin(
4949

5050
@transient
5151
private val broadcastFuture = future {
52-
sparkContext.broadcast(buildPlan.executeCollect())
52+
val input: Array[Row] = buildPlan.executeCollect()
53+
val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.length)
54+
sparkContext.broadcast(hashed)
5355
}
5456

5557
override def execute() = {
5658
val broadcastRelation = Await.result(broadcastFuture, 5.minute)
5759

5860
streamedPlan.execute().mapPartitions { streamedIter =>
59-
joinIterators(broadcastRelation.value.iterator, streamedIter)
61+
hashJoin(streamedIter, broadcastRelation.value)
6062
}
6163
}
6264
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.joins
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Expression, JoinedRow2, Row}
20+
import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.execution.SparkPlan
2222
import org.apache.spark.util.collection.CompactBuffer
2323

@@ -43,34 +43,14 @@ trait HashJoin {
4343

4444
override def output = left.output ++ right.output
4545

46-
@transient protected lazy val buildSideKeyGenerator = newProjection(buildKeys, buildPlan.output)
47-
@transient protected lazy val streamSideKeyGenerator =
46+
@transient protected lazy val buildSideKeyGenerator: Projection =
47+
newProjection(buildKeys, buildPlan.output)
48+
49+
@transient protected lazy val streamSideKeyGenerator: () => MutableProjection =
4850
newMutableProjection(streamedKeys, streamedPlan.output)
4951

50-
protected def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] =
52+
protected def hashJoin(streamIter: Iterator[Row], hashedRelation: HashedRelation): Iterator[Row] =
5153
{
52-
// TODO: Use Spark's HashMap implementation.
53-
54-
val hashTable = new java.util.HashMap[Row, CompactBuffer[Row]]()
55-
var currentRow: Row = null
56-
57-
// Create a mapping of buildKeys -> rows
58-
while (buildIter.hasNext) {
59-
currentRow = buildIter.next()
60-
val rowKey = buildSideKeyGenerator(currentRow)
61-
if (!rowKey.anyNull) {
62-
val existingMatchList = hashTable.get(rowKey)
63-
val matchList = if (existingMatchList == null) {
64-
val newMatchList = new CompactBuffer[Row]()
65-
hashTable.put(rowKey, newMatchList)
66-
newMatchList
67-
} else {
68-
existingMatchList
69-
}
70-
matchList += currentRow.copy()
71-
}
72-
}
73-
7454
new Iterator[Row] {
7555
private[this] var currentStreamedRow: Row = _
7656
private[this] var currentHashMatches: CompactBuffer[Row] = _
@@ -107,7 +87,7 @@ trait HashJoin {
10787
while (currentHashMatches == null && streamIter.hasNext) {
10888
currentStreamedRow = streamIter.next()
10989
if (!joinKeys(currentStreamedRow).anyNull) {
110-
currentHashMatches = hashTable.get(joinKeys.currentValue)
90+
currentHashMatches = hashedRelation.get(joinKeys.currentValue)
11191
}
11292
}
11393

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.joins
19+
20+
import java.util.{HashMap => JavaHashMap}
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
23+
import org.apache.spark.util.collection.CompactBuffer
24+
25+
26+
/**
27+
* Interface for a hashed relation by some key. Use [[HashedRelation.apply]] to create a concrete
28+
* object.
29+
*/
30+
private[joins] sealed trait HashedRelation {
31+
def get(key: Row): CompactBuffer[Row]
32+
}
33+
34+
35+
/**
36+
* A general [[HashedRelation]] backed by a hash map that maps the key into a sequence of values.
37+
*/
38+
private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, CompactBuffer[Row]])
39+
extends HashedRelation with Serializable {
40+
41+
override def get(key: Row) = hashTable.get(key)
42+
}
43+
44+
45+
/**
46+
* A specialized [[HashedRelation]] that maps key into a single value. This implementation
47+
* assumes the key is unique.
48+
*/
49+
final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, Row])
50+
extends HashedRelation with Serializable {
51+
52+
override def get(key: Row) = {
53+
val v = hashTable.get(key)
54+
if (v eq null) null else CompactBuffer(v)
55+
}
56+
57+
def getValue(key: Row): Row = hashTable.get(key)
58+
}
59+
60+
61+
// TODO(rxin): a version of [[HashedRelation]] backed by arrays for consecutive integer keys.
62+
63+
64+
private[joins] object HashedRelation {
65+
66+
def apply(
67+
input: Iterator[Row],
68+
keyGenerator: Projection,
69+
sizeEstimate: Int = 64): HashedRelation = {
70+
71+
// TODO: Use Spark's HashMap implementation.
72+
val hashTable = new JavaHashMap[Row, CompactBuffer[Row]](sizeEstimate)
73+
var currentRow: Row = null
74+
75+
// Whether the join key is unique. If the key is unique, we can convert the underlying
76+
// hash map into one specialized for this.
77+
var keyIsUnique = true
78+
79+
// Create a mapping of buildKeys -> rows
80+
while (input.hasNext) {
81+
currentRow = input.next()
82+
val rowKey = keyGenerator(currentRow)
83+
if (!rowKey.anyNull) {
84+
val existingMatchList = hashTable.get(rowKey)
85+
val matchList = if (existingMatchList == null) {
86+
val newMatchList = new CompactBuffer[Row]()
87+
hashTable.put(rowKey, newMatchList)
88+
newMatchList
89+
} else {
90+
keyIsUnique = false
91+
existingMatchList
92+
}
93+
matchList += currentRow.copy()
94+
}
95+
}
96+
97+
if (keyIsUnique) {
98+
val uniqHashTable = new JavaHashMap[Row, Row](hashTable.size)
99+
val iter = hashTable.entrySet().iterator()
100+
while (iter.hasNext) {
101+
val entry = iter.next()
102+
uniqHashTable.put(entry.getKey, entry.getValue()(0))
103+
}
104+
new UniqueKeyHashedRelation(uniqHashTable)
105+
} else {
106+
new GeneralHashedRelation(hashTable)
107+
}
108+
}
109+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ case class ShuffledHashJoin(
4242
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
4343

4444
override def execute() = {
45-
buildPlan.execute().zipPartitions(streamedPlan.execute()) {
46-
(buildIter, streamIter) => joinIterators(buildIter, streamIter)
45+
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
46+
val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
47+
hashJoin(streamIter, hashed)
4748
}
4849
}
4950
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.joins
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
23+
import org.apache.spark.util.collection.CompactBuffer
24+
25+
26+
class HashedRelationSuite extends FunSuite {
27+
28+
// Key is simply the record itself
29+
private val keyProjection = new Projection {
30+
override def apply(row: Row): Row = row
31+
}
32+
33+
test("GeneralHashedRelation") {
34+
val data = Array(Row(0), Row(1), Row(2), Row(2))
35+
val hashed = HashedRelation(data.iterator, keyProjection)
36+
assert(hashed.isInstanceOf[GeneralHashedRelation])
37+
38+
assert(hashed.get(data(0)) == CompactBuffer[Row](data(0)))
39+
assert(hashed.get(data(1)) == CompactBuffer[Row](data(1)))
40+
assert(hashed.get(Row(10)) === null)
41+
42+
val data2 = CompactBuffer[Row](data(2))
43+
data2 += data(2)
44+
assert(hashed.get(data(2)) == data2)
45+
}
46+
47+
test("UniqueKeyHashedRelation") {
48+
val data = Array(Row(0), Row(1), Row(2))
49+
val hashed = HashedRelation(data.iterator, keyProjection)
50+
assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
51+
52+
assert(hashed.get(data(0)) == CompactBuffer[Row](data(0)))
53+
assert(hashed.get(data(1)) == CompactBuffer[Row](data(1)))
54+
assert(hashed.get(data(2)) == CompactBuffer[Row](data(2)))
55+
assert(hashed.get(Row(10)) === null)
56+
57+
val uniqHashed = hashed.asInstanceOf[UniqueKeyHashedRelation]
58+
assert(uniqHashed.getValue(data(0)) == data(0))
59+
assert(uniqHashed.getValue(data(1)) == data(1))
60+
assert(uniqHashed.getValue(data(2)) == data(2))
61+
assert(uniqHashed.getValue(Row(10)) == null)
62+
}
63+
}

0 commit comments

Comments
 (0)