File tree Expand file tree Collapse file tree 1 file changed +5
-5
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 1 file changed +5
-5
lines changed Original file line number Diff line number Diff line change @@ -169,24 +169,24 @@ case class LeftSemiJoinHash(
169
169
def execute () = {
170
170
171
171
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
172
- val hashTable = new java.util.HashSet [Row ]()
172
+ val hashSet = new java.util.HashSet [Row ]()
173
173
var currentRow : Row = null
174
174
175
175
// Create a Hash set of buildKeys
176
176
while (buildIter.hasNext) {
177
177
currentRow = buildIter.next()
178
178
val rowKey = buildSideKeyGenerator(currentRow)
179
179
if (! rowKey.anyNull) {
180
- val keyExists = hashTable .contains(rowKey)
180
+ val keyExists = hashSet .contains(rowKey)
181
181
if (! keyExists) {
182
- hashTable .add(rowKey)
182
+ hashSet .add(rowKey)
183
183
}
184
184
}
185
185
}
186
186
187
+ val joinKeys = streamSideKeyGenerator()
187
188
streamIter.filter(current => {
188
- val joinKeys = streamSideKeyGenerator()
189
- ! joinKeys(current).anyNull && hashTable.contains(joinKeys.currentValue)
189
+ ! joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
190
190
})
191
191
}
192
192
}
You can’t perform that action at this time.
0 commit comments