From af1b66ffe8fdb67443e7aebf570409452ba0d87b Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Fri, 11 Jul 2025 17:34:40 -0700 Subject: [PATCH 1/4] SPARK-52777: Enable shuffle cleanup mode configuration in Spark SQL --- .../execution/SparkConnectPlanExecution.scala | 11 ++-------- .../spark/sql/execution/QueryExecution.scala | 14 ++++++++++++- .../sql/execution/QueryExecutionSuite.scala | 21 +++++++++++++++++++ 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala index 65b9863ca9543..f9faf9261adfe 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_ import org.apache.spark.sql.connect.planner.SparkConnectPlanner import org.apache.spark.sql.connect.service.ExecuteHolder import org.apache.spark.sql.connect.utils.MetricGenerator -import org.apache.spark.sql.execution.{DoNotCleanup, LocalTableScanExec, RemoveShuffleFiles, SkipMigration, SQLExecution} +import org.apache.spark.sql.execution.{LocalTableScanExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -60,14 +60,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) val planner = new SparkConnectPlanner(executeHolder) val tracker = executeHolder.eventsManager.createQueryPlanningTracker() val conf = session.sessionState.conf - val shuffleCleanupMode = - if (conf.getConf(SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) { - RemoveShuffleFiles - } else if (conf.getConf(SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED)) { - SkipMigration - } else { - DoNotCleanup - } + val shuffleCleanupMode = QueryExecution.determineShuffleCleanupMode(conf) val dataframe = Dataset.ofRows( sessionHolder.session, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9e5264d8d4f31..68c0df2fb30dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.SparkSession +import org.apache.spark.sql.execution.QueryExecution.determineShuffleCleanupMode import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters @@ -63,7 +64,8 @@ class QueryExecution( val logical: LogicalPlan, val tracker: QueryPlanningTracker = new QueryPlanningTracker, val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL, - val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup) extends Logging { + val shuffleCleanupMode: ShuffleCleanupMode = + determineShuffleCleanupMode(SQLConf.get)) extends Logging { val id: Long = QueryExecution.nextExecutionId @@ -683,4 +685,14 @@ object QueryExecution { normalized } } + + def determineShuffleCleanupMode(conf: SQLConf): ShuffleCleanupMode = { + if (conf.getConf(SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) { + RemoveShuffleFiles + } else if (conf.getConf(SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED)) { + SkipMigration + } else { + DoNotCleanup + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 1aab2a855bb4a..726b1715bd647 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -441,6 +441,27 @@ class QueryExecutionSuite extends SharedSparkSession { } } + test("determineShuffleCleanupMode should return correct mode based on SQL configuration") { + val conf = new SQLConf() + + // Defaults to doNotCleanup + conf.setConf(SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) + conf.setConf(SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED, false) + assert(QueryExecution.determineShuffleCleanupMode(conf) === DoNotCleanup) + + // Test RemoveShuffleFiles + conf.setConf(SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, true) + conf.setConf(SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED, false) + assert(QueryExecution.determineShuffleCleanupMode(conf) === RemoveShuffleFiles) + + // Test SkipMigration + conf.setConf(SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) + conf.setConf(SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED, true) + assert(QueryExecution.determineShuffleCleanupMode(conf) === SkipMigration) + + // TODO, when both enabled, RemoveShuffle tasks Precedence, log a warning? + } + case class MockCallbackEagerCommand( var trackerAnalyzed: QueryPlanningTracker = null, var trackerReadyForExecution: QueryPlanningTracker = null) From 20d2671fe5d8d3c528818fa633d83ef24e9f6cf8 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 16 Jul 2025 18:25:07 -0700 Subject: [PATCH 2/4] Fix build: Remove unused import --- .../spark/sql/connect/execution/SparkConnectPlanExecution.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala index f9faf9261adfe..aa8a8109257a8 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.connect.service.ExecuteHolder import org.apache.spark.sql.connect.utils.MetricGenerator import org.apache.spark.sql.execution.{LocalTableScanExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.arrow.ArrowConverters -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils From d079ffb6bd263511ae623ea7041db2493616444a Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Fri, 18 Jul 2025 15:18:01 -0700 Subject: [PATCH 3/4] Fix test --- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 5d81239d023e9..bced8321de443 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1871,7 +1871,9 @@ class AdaptiveQueryExecSuite } withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED.key -> "false", + SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { val df = sql( """ |SELECT * FROM ( From 192e202e277e6c48034473628fb6efd53e16dc8c Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Mon, 21 Jul 2025 12:31:58 -0700 Subject: [PATCH 4/4] Fix test --- .../spark/sql/util/DataFrameCallbackSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 1ec9aca857e22..f47782a64e9cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -51,6 +51,19 @@ class DataFrameCallbackSuite extends QueryTest super.sparkConf.set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES, false) } + override def beforeAll(): Unit = { + super.beforeAll() + + val confsToSet = Map( + SQLConf.SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED -> false, + SQLConf.SHUFFLE_DEPENDENCY_SKIP_MIGRATION_ENABLED -> false + ) + + confsToSet.foreach { case (key, newValue) => + spark.sessionState.conf.setConf(key, newValue) + } + } + test("execute callback functions when a DataFrame action finished successfully") { val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] val listener = new QueryExecutionListener {