@@ -264,7 +264,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
264
264
.setMaster(" local[*]" )
265
265
.setAppName(" test" )
266
266
.set(" spark.ui.enabled" , " false" )
267
- .set(SQLConf .SHUFFLE_PARTITIONS .key, " 5" )
267
+ .set(SQLConf .SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS .key, " 5" )
268
268
.set(SQLConf .ADAPTIVE_EXECUTION_ENABLED .key, " true" )
269
269
.set(SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key, " -1" )
270
270
.set(
@@ -484,8 +484,10 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
484
484
val df = spark.range(1 ).selectExpr(" id AS key" , " id AS value" )
485
485
val resultDf = df.join(df, " key" ).join(df, " key" )
486
486
val sparkPlan = resultDf.queryExecution.executedPlan
487
- assert(sparkPlan.collect { case p : ReusedExchangeExec => p }.length == 1 )
488
- assert(sparkPlan.collect { case p @ ShuffleExchangeExec (_, _, Some (c)) => p }.length == 3 )
487
+ val queryStageInputs = sparkPlan.collect { case p : ShuffleQueryStageInput => p }
488
+ assert(queryStageInputs.length === 3 )
489
+ assert(queryStageInputs(0 ).childStage === queryStageInputs(1 ).childStage)
490
+ assert(queryStageInputs(1 ).childStage === queryStageInputs(2 ).childStage)
489
491
checkAnswer(resultDf, Row (0 , 0 , 0 , 0 ) :: Nil )
490
492
}
491
493
withSparkSession(test, 4 , None )
0 commit comments