Skip to content

Commit cce0048

Browse files
c21maropu
authored andcommitted
[SPARK-35351][SQL] Add code-gen for left anti sort merge join
### What changes were proposed in this pull request? As title. This PR is to add code-gen support for LEFT ANTI sort merge join. The main change is to extract `loadStreamed` in `SortMergeJoinExec.doProduce()`. That is to set all columns values for streamed row, when the streamed row has no output row. Example query: ``` val df1 = spark.range(10).select($"id".as("k1")) val df2 = spark.range(4).select($"id".as("k2")) df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") ``` Example generated code: ``` == Subtree 5 / 5 (maxMethodCodeSize:296; maxConstantPoolSize:156(0.24% used); numInnerClasses:0) == *(5) Project [id#0L AS k1#2L] +- *(5) SortMergeJoin [id#0L], [k2#6L], LeftAnti :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 5), ENSURE_REQUIREMENTS, [id=#27] : +- *(1) Range (0, 10, step=1, splits=2) +- *(4) Sort [k2#6L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k2#6L, 5), ENSURE_REQUIREMENTS, [id=#33] +- *(3) Project [id#4L AS k2#6L] +- *(3) Range (0, 4, step=1, splits=2) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage5(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=5 /* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator smj_streamedInput_0; /* 010 */ private scala.collection.Iterator smj_bufferedInput_0; /* 011 */ private InternalRow smj_streamedRow_0; /* 012 */ private InternalRow smj_bufferedRow_0; /* 013 */ private long smj_value_2; /* 014 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches_0; /* 015 */ private long smj_value_3; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage5(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ smj_streamedInput_0 = inputs[0]; /* 026 */ smj_bufferedInput_0 = inputs[1]; /* 027 */ /* 028 */ smj_matches_0 = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(1, 2147483647); /* 029 */ smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ smj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ /* 032 */ } /* 033 */ /* 034 */ private boolean findNextJoinRows( /* 035 */ scala.collection.Iterator streamedIter, /* 036 */ scala.collection.Iterator bufferedIter) { /* 037 */ smj_streamedRow_0 = null; /* 038 */ int comp = 0; /* 039 */ while (smj_streamedRow_0 == null) { /* 040 */ if (!streamedIter.hasNext()) return false; /* 041 */ smj_streamedRow_0 = (InternalRow) streamedIter.next(); /* 042 */ long smj_value_0 = smj_streamedRow_0.getLong(0); /* 043 */ if (false) { /* 044 */ if (!smj_matches_0.isEmpty()) { /* 045 */ smj_matches_0.clear(); /* 046 */ } /* 047 */ return false; /* 048 */ /* 049 */ } /* 050 */ if (!smj_matches_0.isEmpty()) { /* 051 */ comp = 0; /* 052 */ if (comp == 0) { /* 053 */ comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0); /* 054 */ } /* 055 */ /* 056 */ if (comp == 0) { /* 057 */ return true; /* 058 */ } /* 059 */ smj_matches_0.clear(); /* 060 */ } /* 061 */ /* 062 */ do { /* 063 */ if (smj_bufferedRow_0 == null) { /* 064 */ if (!bufferedIter.hasNext()) { /* 065 */ smj_value_3 = smj_value_0; /* 066 */ return !smj_matches_0.isEmpty(); /* 067 */ } /* 068 */ smj_bufferedRow_0 = (InternalRow) bufferedIter.next(); /* 069 */ long smj_value_1 = smj_bufferedRow_0.getLong(0); /* 070 */ if (false) { /* 071 */ smj_bufferedRow_0 = null; /* 072 */ continue; /* 073 */ } /* 074 */ smj_value_2 = smj_value_1; /* 075 */ } /* 076 */ /* 077 */ comp = 0; /* 078 */ if (comp == 0) { /* 079 */ comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0); /* 080 */ } /* 081 */ /* 082 */ if (comp > 0) { /* 083 */ smj_bufferedRow_0 = null; /* 084 */ } else if (comp < 0) { /* 085 */ if (!smj_matches_0.isEmpty()) { /* 086 */ smj_value_3 = smj_value_0; /* 087 */ return true; /* 088 */ } else { /* 089 */ return false; /* 090 */ } /* 091 */ } else { /* 092 */ if (smj_matches_0.isEmpty()) { /* 093 */ smj_matches_0.add((UnsafeRow) smj_bufferedRow_0); /* 094 */ } /* 095 */ /* 096 */ smj_bufferedRow_0 = null; /* 097 */ } /* 098 */ } while (smj_streamedRow_0 != null); /* 099 */ } /* 100 */ return false; // unreachable /* 101 */ } /* 102 */ /* 103 */ protected void processNext() throws java.io.IOException { /* 104 */ while (smj_streamedInput_0.hasNext()) { /* 105 */ findNextJoinRows(smj_streamedInput_0, smj_bufferedInput_0); /* 106 */ /* 107 */ long smj_value_4 = -1L; /* 108 */ smj_value_4 = smj_streamedRow_0.getLong(0); /* 109 */ scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator(); /* 110 */ /* 111 */ boolean wholestagecodegen_hasOutputRow_0 = false; /* 112 */ /* 113 */ while (!wholestagecodegen_hasOutputRow_0 && smj_iterator_0.hasNext()) { /* 114 */ InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next(); /* 115 */ /* 116 */ wholestagecodegen_hasOutputRow_0 = true; /* 117 */ } /* 118 */ /* 119 */ if (!wholestagecodegen_hasOutputRow_0) { /* 120 */ // load all values of streamed row, because the values not in join condition are not /* 121 */ // loaded yet. /* 122 */ /* 123 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 124 */ /* 125 */ // common sub-expressions /* 126 */ /* 127 */ smj_mutableStateArray_0[1].reset(); /* 128 */ /* 129 */ smj_mutableStateArray_0[1].write(0, smj_value_4); /* 130 */ append((smj_mutableStateArray_0[1].getRow()).copy()); /* 131 */ /* 132 */ } /* 133 */ if (shouldStop()) return; /* 134 */ } /* 135 */ ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources(); /* 136 */ } /* 137 */ /* 138 */ } ``` ### Why are the changes needed? Improve the query CPU performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `WholeStageCodegenSuite.scala`, and existed unit test in `ExistenceJoinSuite.scala`. Closes #32547 from c21/smj-left-anti. Authored-by: Cheng Su <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 7b942d5 commit cce0048

File tree

15 files changed

+208
-115
lines changed

15 files changed

+208
-115
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ case class SortMergeJoinExec(
364364
}
365365

366366
private lazy val ((streamedPlan, streamedKeys), (bufferedPlan, bufferedKeys)) = joinType match {
367-
case _: InnerLike | LeftOuter | LeftSemi => ((left, leftKeys), (right, rightKeys))
367+
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => ((left, leftKeys), (right, rightKeys))
368368
case RightOuter => ((right, rightKeys), (left, leftKeys))
369369
case x =>
370370
throw new IllegalArgumentException(
@@ -375,7 +375,7 @@ case class SortMergeJoinExec(
375375
private lazy val bufferedOutput = bufferedPlan.output
376376

377377
override def supportCodegen: Boolean = joinType match {
378-
case _: InnerLike | LeftOuter | RightOuter | LeftSemi => true
378+
case _: InnerLike | LeftOuter | RightOuter | LeftSemi | LeftAnti => true
379379
case _ => false
380380
}
381381

@@ -453,7 +453,7 @@ case class SortMergeJoinExec(
453453
|$streamedRow = null;
454454
|continue;
455455
""".stripMargin
456-
case LeftOuter | RightOuter =>
456+
case LeftOuter | RightOuter | LeftAnti =>
457457
// Eagerly return streamed row. Only call `matches.clear()` when `matches.isEmpty()` is
458458
// false, to reduce unnecessary computation.
459459
s"""
@@ -472,7 +472,7 @@ case class SortMergeJoinExec(
472472
case _: InnerLike | LeftSemi =>
473473
// Skip streamed row.
474474
s"$streamedRow = null;"
475-
case LeftOuter | RightOuter =>
475+
case LeftOuter | RightOuter | LeftAnti =>
476476
// Eagerly return with streamed row.
477477
"return false;"
478478
case x =>
@@ -509,17 +509,17 @@ case class SortMergeJoinExec(
509509
// 1. Inner and Left Semi join: skip the row. `matches` will be cleared later when
510510
// hitting the next `streamedRow` with non-null join
511511
// keys.
512-
// 2. Left/Right Outer join: clear the previous `matches` if needed, keep the row,
513-
// and return false.
512+
// 2. Left/Right Outer and Left Anti join: clear the previous `matches` if needed,
513+
// keep the row, and return false.
514514
//
515515
// - Step 2: Find the `matches` from buffered side having same join keys with `streamedRow`.
516516
// Clear `matches` if we hit a new `streamedRow`, as we need to find new matches.
517517
// Use `bufferedRow` to iterate buffered side to put all matched rows into
518518
// `matches` (`addRowToBuffer`). Return true when getting all matched rows.
519519
// For `streamedRow` without `matches` (`handleStreamedWithoutMatch`):
520520
// 1. Inner and Left Semi join: skip the row.
521-
// 2. Left/Right Outer join: keep the row and return false (with `matches` being
522-
// empty).
521+
// 2. Left/Right Outer and Left Anti join: keep the row and return false (with
522+
// `matches` being empty).
523523
val findNextJoinRowsFuncName = ctx.freshName("findNextJoinRows")
524524
ctx.addNewFunction(findNextJoinRowsFuncName,
525525
s"""
@@ -664,14 +664,14 @@ case class SortMergeJoinExec(
664664
streamedVars ++ bufferedVars
665665
case RightOuter =>
666666
bufferedVars ++ streamedVars
667-
case LeftSemi =>
667+
case LeftSemi | LeftAnti =>
668668
streamedVars
669669
case x =>
670670
throw new IllegalArgumentException(
671671
s"SortMergeJoin.doProduce should not take $x as the JoinType")
672672
}
673673

674-
val (streamedBeforeLoop, condCheck) = if (condition.isDefined) {
674+
val (streamedBeforeLoop, condCheck, loadStreamed) = if (condition.isDefined) {
675675
// Split the code of creating variables based on whether it's used by condition or not.
676676
val loaded = ctx.freshName("loaded")
677677
val (streamedBefore, streamedAfter) = splitVarsByCondition(streamedOutput, streamedVars)
@@ -680,13 +680,36 @@ case class SortMergeJoinExec(
680680
ctx.currentVars = streamedVars ++ bufferedVars
681681
val cond = BindReferences.bindReference(
682682
condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx)
683-
// evaluate the columns those used by condition before loop
683+
// Evaluate the columns those used by condition before loop
684684
val before =
685685
s"""
686686
|boolean $loaded = false;
687687
|$streamedBefore
688688
""".stripMargin
689689

690+
val loadStreamed =
691+
s"""
692+
|if (!$loaded) {
693+
| $loaded = true;
694+
| $streamedAfter
695+
|}
696+
""".stripMargin
697+
698+
val loadStreamedAfterCondition = joinType match {
699+
case LeftAnti =>
700+
// No need to evaluate columns not used by condition from streamed side, as for Left Anti
701+
// join, streamed row with match is not outputted.
702+
""
703+
case _ => loadStreamed
704+
}
705+
706+
val loadBufferedAfterCondition = joinType match {
707+
case LeftSemi | LeftAnti =>
708+
// No need to evaluate columns not used by condition from buffered side
709+
""
710+
case _ => bufferedAfter
711+
}
712+
690713
val checking =
691714
s"""
692715
|$bufferedBefore
@@ -696,15 +719,12 @@ case class SortMergeJoinExec(
696719
| continue;
697720
| }
698721
|}
699-
|if (!$loaded) {
700-
| $loaded = true;
701-
| $streamedAfter
702-
|}
703-
|$bufferedAfter
722+
|$loadStreamedAfterCondition
723+
|$loadBufferedAfterCondition
704724
""".stripMargin
705-
(before, checking.trim)
725+
(before, checking.trim, loadStreamed)
706726
} else {
707-
(evaluateVariables(streamedVars), "")
727+
(evaluateVariables(streamedVars), "", "")
708728
}
709729

710730
val beforeLoop =
@@ -732,6 +752,9 @@ case class SortMergeJoinExec(
732752
case LeftSemi =>
733753
codegenSemi(findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck,
734754
ctx.freshName("hasOutputRow"), outputRow, eagerCleanup)
755+
case LeftAnti =>
756+
codegenAnti(streamedInput, findNextJoinRows, beforeLoop, iterator, bufferedRow, condCheck,
757+
loadStreamed, ctx.freshName("hasMatchedRow"), outputRow, eagerCleanup)
735758
case x =>
736759
throw new IllegalArgumentException(
737760
s"SortMergeJoin.doProduce should not take $x as the JoinType")
@@ -825,6 +848,44 @@ case class SortMergeJoinExec(
825848
""".stripMargin
826849
}
827850

851+
/**
852+
* Generates the code for Left Anti join.
853+
*/
854+
private def codegenAnti(
855+
streamedInput: String,
856+
findNextJoinRows: String,
857+
beforeLoop: String,
858+
matchIterator: String,
859+
bufferedRow: String,
860+
conditionCheck: String,
861+
loadStreamed: String,
862+
hasMatchedRow: String,
863+
outputRow: String,
864+
eagerCleanup: String): String = {
865+
s"""
866+
|while ($streamedInput.hasNext()) {
867+
| $findNextJoinRows;
868+
| $beforeLoop
869+
| boolean $hasMatchedRow = false;
870+
|
871+
| while (!$hasMatchedRow && $matchIterator.hasNext()) {
872+
| InternalRow $bufferedRow = (InternalRow) $matchIterator.next();
873+
| $conditionCheck
874+
| $hasMatchedRow = true;
875+
| }
876+
|
877+
| if (!$hasMatchedRow) {
878+
| // load all values of streamed row, because the values not in join condition are not
879+
| // loaded yet.
880+
| $loadStreamed
881+
| $outputRow
882+
| }
883+
| if (shouldStop()) return;
884+
|}
885+
|$eagerCleanup
886+
""".stripMargin
887+
}
888+
828889
override protected def withNewChildrenInternal(
829890
newLeft: SparkPlan, newRight: SparkPlan): SortMergeJoinExec =
830891
copy(left = newLeft, right = newRight)

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
: +- * BroadcastHashJoin Inner BuildRight (32)
1212
: :- * Project (26)
1313
: : +- * BroadcastHashJoin Inner BuildRight (25)
14-
: : :- SortMergeJoin LeftAnti (19)
14+
: : :- * SortMergeJoin LeftAnti (19)
1515
: : : :- * Project (13)
1616
: : : : +- * SortMergeJoin LeftSemi (12)
1717
: : : : :- * Sort (6)
@@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16
124124
Input [1]: [cr_order_number#14]
125125
Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0
126126

127-
(19) SortMergeJoin
127+
(19) SortMergeJoin [codegen id : 11]
128128
Left keys [1]: [cs_order_number#5]
129129
Right keys [1]: [cr_order_number#14]
130130
Join condition: None

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ WholeStageCodegen (12)
1313
BroadcastHashJoin [cs_call_center_sk,cc_call_center_sk]
1414
Project [cs_ship_date_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
1515
BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk]
16-
InputAdapter
17-
SortMergeJoin [cs_order_number,cr_order_number]
16+
SortMergeJoin [cs_order_number,cr_order_number]
17+
InputAdapter
1818
WholeStageCodegen (5)
1919
Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
2020
SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk]
@@ -39,6 +39,7 @@ WholeStageCodegen (12)
3939
ColumnarToRow
4040
InputAdapter
4141
Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
42+
InputAdapter
4243
WholeStageCodegen (7)
4344
Sort [cr_order_number]
4445
InputAdapter

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/explain.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
: +- * BroadcastHashJoin Inner BuildRight (32)
1212
: :- * Project (26)
1313
: : +- * BroadcastHashJoin Inner BuildRight (25)
14-
: : :- SortMergeJoin LeftAnti (19)
14+
: : :- * SortMergeJoin LeftAnti (19)
1515
: : : :- * Project (13)
1616
: : : : +- * SortMergeJoin LeftSemi (12)
1717
: : : : :- * Sort (6)
@@ -124,7 +124,7 @@ Arguments: hashpartitioning(cr_order_number#14, 5), ENSURE_REQUIREMENTS, [id=#16
124124
Input [1]: [cr_order_number#14]
125125
Arguments: [cr_order_number#14 ASC NULLS FIRST], false, 0
126126

127-
(19) SortMergeJoin
127+
(19) SortMergeJoin [codegen id : 11]
128128
Left keys [1]: [cs_order_number#5]
129129
Right keys [1]: [cr_order_number#14]
130130
Join condition: None

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16/simplified.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ WholeStageCodegen (12)
1313
BroadcastHashJoin [cs_ship_addr_sk,ca_address_sk]
1414
Project [cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
1515
BroadcastHashJoin [cs_ship_date_sk,d_date_sk]
16-
InputAdapter
17-
SortMergeJoin [cs_order_number,cr_order_number]
16+
SortMergeJoin [cs_order_number,cr_order_number]
17+
InputAdapter
1818
WholeStageCodegen (5)
1919
Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
2020
SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk]
@@ -39,6 +39,7 @@ WholeStageCodegen (12)
3939
ColumnarToRow
4040
InputAdapter
4141
Scan parquet default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
42+
InputAdapter
4243
WholeStageCodegen (7)
4344
Sort [cr_order_number]
4445
InputAdapter

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ TakeOrderedAndProject (51)
99
: +- * Project (41)
1010
: +- * BroadcastHashJoin Inner BuildRight (40)
1111
: :- * Project (34)
12-
: : +- SortMergeJoin LeftAnti (33)
13-
: : :- SortMergeJoin LeftAnti (25)
12+
: : +- * SortMergeJoin LeftAnti (33)
13+
: : :- * SortMergeJoin LeftAnti (25)
1414
: : : :- * SortMergeJoin LeftSemi (17)
1515
: : : : :- * Sort (5)
1616
: : : : : +- Exchange (4)
@@ -158,7 +158,7 @@ Arguments: hashpartitioning(ws_bill_customer_sk#13, 5), ENSURE_REQUIREMENTS, [id
158158
Input [1]: [ws_bill_customer_sk#13]
159159
Arguments: [ws_bill_customer_sk#13 ASC NULLS FIRST], false, 0
160160

161-
(25) SortMergeJoin
161+
(25) SortMergeJoin [codegen id : 10]
162162
Left keys [1]: [c_customer_sk#1]
163163
Right keys [1]: [ws_bill_customer_sk#13]
164164
Join condition: None
@@ -170,35 +170,35 @@ Location: InMemoryFileIndex []
170170
PartitionFilters: [isnotnull(cs_sold_date_sk#18), dynamicpruningexpression(cs_sold_date_sk#18 IN dynamicpruning#7)]
171171
ReadSchema: struct<cs_ship_customer_sk:int>
172172

173-
(27) ColumnarToRow [codegen id : 11]
173+
(27) ColumnarToRow [codegen id : 12]
174174
Input [2]: [cs_ship_customer_sk#17, cs_sold_date_sk#18]
175175

176176
(28) ReusedExchange [Reuses operator id: 12]
177177
Output [1]: [d_date_sk#19]
178178

179-
(29) BroadcastHashJoin [codegen id : 11]
179+
(29) BroadcastHashJoin [codegen id : 12]
180180
Left keys [1]: [cs_sold_date_sk#18]
181181
Right keys [1]: [d_date_sk#19]
182182
Join condition: None
183183

184-
(30) Project [codegen id : 11]
184+
(30) Project [codegen id : 12]
185185
Output [1]: [cs_ship_customer_sk#17]
186186
Input [3]: [cs_ship_customer_sk#17, cs_sold_date_sk#18, d_date_sk#19]
187187

188188
(31) Exchange
189189
Input [1]: [cs_ship_customer_sk#17]
190190
Arguments: hashpartitioning(cs_ship_customer_sk#17, 5), ENSURE_REQUIREMENTS, [id=#20]
191191

192-
(32) Sort [codegen id : 12]
192+
(32) Sort [codegen id : 13]
193193
Input [1]: [cs_ship_customer_sk#17]
194194
Arguments: [cs_ship_customer_sk#17 ASC NULLS FIRST], false, 0
195195

196-
(33) SortMergeJoin
196+
(33) SortMergeJoin [codegen id : 15]
197197
Left keys [1]: [c_customer_sk#1]
198198
Right keys [1]: [cs_ship_customer_sk#17]
199199
Join condition: None
200200

201-
(34) Project [codegen id : 14]
201+
(34) Project [codegen id : 15]
202202
Output [2]: [c_current_cdemo_sk#2, c_current_addr_sk#3]
203203
Input [3]: [c_customer_sk#1, c_current_cdemo_sk#2, c_current_addr_sk#3]
204204

@@ -209,27 +209,27 @@ Location [not included in comparison]/{warehouse_dir}/customer_address]
209209
PushedFilters: [In(ca_state, [KY,GA,NM]), IsNotNull(ca_address_sk)]
210210
ReadSchema: struct<ca_address_sk:int,ca_state:string>
211211

212-
(36) ColumnarToRow [codegen id : 13]
212+
(36) ColumnarToRow [codegen id : 14]
213213
Input [2]: [ca_address_sk#21, ca_state#22]
214214

215-
(37) Filter [codegen id : 13]
215+
(37) Filter [codegen id : 14]
216216
Input [2]: [ca_address_sk#21, ca_state#22]
217217
Condition : (ca_state#22 IN (KY,GA,NM) AND isnotnull(ca_address_sk#21))
218218

219-
(38) Project [codegen id : 13]
219+
(38) Project [codegen id : 14]
220220
Output [1]: [ca_address_sk#21]
221221
Input [2]: [ca_address_sk#21, ca_state#22]
222222

223223
(39) BroadcastExchange
224224
Input [1]: [ca_address_sk#21]
225225
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#23]
226226

227-
(40) BroadcastHashJoin [codegen id : 14]
227+
(40) BroadcastHashJoin [codegen id : 15]
228228
Left keys [1]: [c_current_addr_sk#3]
229229
Right keys [1]: [ca_address_sk#21]
230230
Join condition: None
231231

232-
(41) Project [codegen id : 14]
232+
(41) Project [codegen id : 15]
233233
Output [1]: [c_current_cdemo_sk#2]
234234
Input [3]: [c_current_cdemo_sk#2, c_current_addr_sk#3, ca_address_sk#21]
235235

@@ -251,16 +251,16 @@ Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_stat
251251
Input [6]: [cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
252252
Condition : isnotnull(cd_demo_sk#25)
253253

254-
(46) BroadcastHashJoin [codegen id : 15]
254+
(46) BroadcastHashJoin [codegen id : 16]
255255
Left keys [1]: [c_current_cdemo_sk#2]
256256
Right keys [1]: [cd_demo_sk#25]
257257
Join condition: None
258258

259-
(47) Project [codegen id : 15]
259+
(47) Project [codegen id : 16]
260260
Output [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
261261
Input [7]: [c_current_cdemo_sk#2, cd_demo_sk#25, cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
262262

263-
(48) HashAggregate [codegen id : 15]
263+
(48) HashAggregate [codegen id : 16]
264264
Input [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
265265
Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
266266
Functions [1]: [partial_count(1)]
@@ -271,7 +271,7 @@ Results [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_pur
271271
Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32]
272272
Arguments: hashpartitioning(cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, 5), ENSURE_REQUIREMENTS, [id=#33]
273273

274-
(50) HashAggregate [codegen id : 16]
274+
(50) HashAggregate [codegen id : 17]
275275
Input [6]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30, count#32]
276276
Keys [5]: [cd_gender#26, cd_marital_status#27, cd_education_status#28, cd_purchase_estimate#29, cd_credit_rating#30]
277277
Functions [1]: [count(1)]

0 commit comments

Comments
 (0)