Skip to content

Commit 19e4c63

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-6352] Adjust scan partition size dynamically considering potential cost (#1110)
* [CARMEL-6352] Adjust scan partition size dynamically considering potential cost * fix ut * Add optimize tag * minor * Add limit for partition number
1 parent d0a000e commit 19e4c63

File tree

11 files changed

+340
-13
lines changed

11 files changed

+340
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
142142
private lazy val numRows = children.head.eval().asInstanceOf[Int]
143143
private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt
144144

145+
def getNumRows: Int = numRows
145146
/**
146147
* Return true iff the first child exists and has a foldable IntegerType.
147148
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20+
import org.apache.spark.sql.catalyst.TableIdentifier
2021
import org.apache.spark.sql.catalyst.expressions.Attribute
21-
import org.apache.spark.util.Utils
2222

2323
/**
2424
* A general hint for the child that is not yet resolved. This node is generated by the parser and
@@ -63,13 +63,16 @@ case class ResolvedParallelHint(child: LogicalPlan,
6363

6464
trait HintPlaceHolderInfo
6565

66-
case class TableParallelInfo(tableIdentifier: String, partitionSize: Option[Long],
67-
partitionNumber: Option[Int]) extends HintPlaceHolderInfo {
66+
case class TableParallelInfo(tableIdentifier: Option[TableIdentifier],
67+
partitionSize: Option[Long],
68+
partitionNumber: Option[Int],
69+
partitionSizeReduceRadio: Int = 1) extends HintPlaceHolderInfo {
6870
override def toString: String = {
6971
Seq(
70-
s"tableIdentifier=${tableIdentifier}",
71-
s"partitionSize=${partitionSize}",
72-
s"partitionNumber=${partitionNumber}")
72+
s"table=${tableIdentifier.getOrElse("None")}",
73+
s"size=${partitionSize}",
74+
s"number=${partitionNumber}",
75+
s"reduceRadio=${partitionSizeReduceRadio}")
7376
.mkString("[", ", ", "]")
7477
}
7578
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,6 +1311,15 @@ object SQLConf {
13111311
"must be greater than 1.0")
13121312
.createWithDefault(10.0)
13131313

1314+
val AUTO_ADJUST_SCAN_SIZE_MAX_PARTITIONS =
1315+
buildConf("spark.sql.sources.autoAdjustScanSize.maxPartitions")
1316+
.doc("The maximum number of partitions allowed when scan size is adjusted.")
1317+
.version("3.0.0")
1318+
.intConf
1319+
.checkValue(_ > 0,
1320+
"the value of spark.sql.sources.autoAdjustScanSize.maxPartitions must be greater than 0")
1321+
.createWithDefault(100000)
1322+
13141323
val AUTO_BUCKETED_SCAN_ENABLED =
13151324
buildConf("spark.sql.sources.bucketing.autoBucketedScan.enabled")
13161325
.doc("When true, decide whether to do bucketed scan on input tables based on query plan " +
@@ -1335,6 +1344,14 @@ object SQLConf {
13351344
"must be great than or equal to 1")
13361345
.createWithDefault(6000)
13371346

1347+
val AUTO_ADJUST_SCAN_PARTITION_SIZE_ENABLED =
1348+
buildConf("spark.sql.autoAdjustScanPartitionSize.enabled")
1349+
.internal()
1350+
.doc("When true, we will adjust scan partition size dynamically.")
1351+
.version("3.0.0")
1352+
.booleanConf
1353+
.createWithDefault(false)
1354+
13381355
val REMOVE_REDUNDANT_PARTIAL_AGGREGATES_ENABLED =
13391356
buildConf("spark.sql.removeRedundantPartialAggregates.enabled")
13401357
.internal()
@@ -4414,6 +4431,8 @@ class SQLConf extends Serializable with Logging {
44144431

44154432
def initPartialListingCount: Int = getConf(SQLConf.INIT_PARTIAL_LISTING_COUNT)
44164433

4434+
def autoAdjustScanSizeMaxPartitions: Int = getConf(SQLConf.AUTO_ADJUST_SCAN_SIZE_MAX_PARTITIONS)
4435+
44174436
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
44184437

44194438
def bucketingRatio: Double = getConf(SQLConf.BUCKETING_RATIO)

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,32 @@ case class FileSourceScanExec(
909909
originSize * fsRelation.sparkSession.sessionState.conf.minParallelHintPartitionSizeRatio),
910910
originSize * fsRelation.sparkSession.sessionState.conf.maxParallelHintPartitionSizeRatio).
911911
longValue()
912+
} else if (tableParallelInfo.nonEmpty
913+
&& tableParallelInfo.get.partitionSizeReduceRadio > 1) {
914+
val specifiedMaxSplitBytes = Math.
915+
max(originSize / tableParallelInfo.get.partitionSizeReduceRadio,
916+
originSize * fsRelation.sparkSession.sessionState.conf.minParallelHintPartitionSizeRatio).
917+
longValue()
918+
919+
val partitionNumber =
920+
FilePartition.minPartitionNumberBySpecifiedSize(fsRelation.sparkSession,
921+
selectedPartitions, specifiedMaxSplitBytes)
922+
923+
val originPartitionNumber =
924+
FilePartition.minPartitionNumberBySpecifiedSize(fsRelation.sparkSession,
925+
selectedPartitions, originSize)
926+
927+
if (partitionNumber > conf.autoAdjustScanSizeMaxPartitions) {
928+
if (originPartitionNumber < conf.autoAdjustScanSizeMaxPartitions) {
929+
FilePartition.maxSplitBytesBySpecifiedNumber(
930+
fsRelation.sparkSession, selectedPartitions, conf.autoAdjustScanSizeMaxPartitions)
931+
} else {
932+
logInfo(s"Fallback to origin scan size for table ${tableParallelInfo}")
933+
originSize
934+
}
935+
} else {
936+
specifiedMaxSplitBytes
937+
}
912938
} else if (relation.sparkSession.sessionState.conf.bucketingEnabled &&
913939
relation.bucketSpec.isDefined &&
914940
disableBucketedScan) { // Check if bucketing scan disabled by planner

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
3535
import org.apache.spark.sql.catalyst.util.truncatedString
3636
import org.apache.spark.sql.execution.QueryExecution.skipAuthTag
3737
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, EnsureRepartitionForWriting, InsertAdaptiveSparkPlan}
38-
import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
38+
import org.apache.spark.sql.execution.bucketing.{AdjustScanPartitionSizeDynamically, DisableUnnecessaryBucketedScan}
3939
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
4040
import org.apache.spark.sql.execution.exchange.{EliminateShuffleExec, EnsureRequirements, ExchangePushDownThroughAggregate}
4141
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
@@ -373,6 +373,7 @@ object QueryExecution {
373373
EnsureRepartitionForWriting,
374374
EliminateShuffleExec,
375375
DisableUnnecessaryBucketedScan,
376+
AdjustScanPartitionSizeDynamically,
376377
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
377378
CollapseCodegenStages()) ++
378379
(if (subquery) {

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
3838
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
3939
import org.apache.spark.sql.execution._
4040
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
41-
import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
41+
import org.apache.spark.sql.execution.bucketing.{AdjustScanPartitionSizeDynamically, DisableUnnecessaryBucketedScan}
4242
import org.apache.spark.sql.execution.exchange._
4343
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
4444
import org.apache.spark.sql.internal.SQLConf
@@ -96,7 +96,8 @@ case class AdaptiveSparkPlanExec(
9696
RemoveRedundantPartialAggregates,
9797
EnsureRepartitionForWriting,
9898
EliminateShuffleExec,
99-
DisableUnnecessaryBucketedScan
99+
DisableUnnecessaryBucketedScan,
100+
AdjustScanPartitionSizeDynamically
100101
) ++ context.session.sessionState.queryStagePrepRules
101102

102103
@transient private val initialPlan = context.session.withActive {

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedRangePartition.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ object OptimizeSkewedRangePartition extends Rule[SparkPlan] {
4343
private def handleSkewedRangePartition(plan: SortExec, child: SparkPlan): SparkPlan = {
4444
val queryStageInfo = child match {
4545
case ShuffleStage(s: ShuffleStageInfo) => Option(s)
46+
case _ => None
4647
}
4748

4849
if (queryStageInfo.isEmpty ||
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.bucketing
19+
20+
import org.apache.spark.sql.catalyst.expressions.Stack
21+
import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, Percentile}
22+
import org.apache.spark.sql.catalyst.plans.logical.TableParallelInfo
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
import org.apache.spark.sql.execution._
25+
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
26+
import org.apache.spark.sql.execution.exchange.Exchange
27+
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, BroadcastRangeJoinExec}
28+
import org.apache.spark.sql.internal.SQLConf
29+
30+
/**
31+
* Adjust scan partition size dynamically considering potential cost.
32+
* We check operators on path from root/exchange to scan operator to calculate the cost
33+
*/
34+
object AdjustScanPartitionSizeDynamically extends Rule[SparkPlan] {
35+
def apply(plan: SparkPlan): SparkPlan = {
36+
if (!conf.getConf(SQLConf.AUTO_ADJUST_SCAN_PARTITION_SIZE_ENABLED)) {
37+
plan
38+
} else {
39+
adjustScanPartitionSize(plan)
40+
}
41+
}
42+
43+
// Visit operators from root/exchange to current scan
44+
private def adjustScanPartitionSize(plan: SparkPlan, inputReduceRadio: Int = 1): SparkPlan = {
45+
var reduceRadio = inputReduceRadio
46+
plan match {
47+
case e: Exchange =>
48+
e.mapChildren(adjustScanPartitionSize(_, 1))
49+
case scan: FileSourceScanExec =>
50+
if (!scan.bucketedScan && reduceRadio > 1) {
51+
if (scan.tableParallelInfo.isEmpty || (scan.tableParallelInfo.get.partitionNumber.isEmpty
52+
&& scan.tableParallelInfo.get.partitionSize.isEmpty)) {
53+
val tableParallelInfo = Some(TableParallelInfo(scan.tableIdentifier, None,
54+
None, reduceRadio))
55+
56+
val newScan = scan.copy(tableParallelInfo = tableParallelInfo)
57+
scan.logicalLink.foreach(newScan.setLogicalLink)
58+
newScan.addOptimizeTag(s"created by ${simpleRuleName}")
59+
newScan
60+
} else {
61+
scan
62+
}
63+
} else {
64+
scan
65+
}
66+
case o =>
67+
reduceRadio = reduceRadio + visitCostInterestingOperator(o)
68+
o.mapChildren(adjustScanPartitionSize(_, reduceRadio))
69+
}
70+
}
71+
72+
private def visitCostInterestingOperator(plan: SparkPlan): Int = {
73+
var reduceRatio = 0
74+
plan match {
75+
case ObjectHashAggregateExec(_, _, aggregateExpr, _, _, _, _) =>
76+
aggregateExpr.foreach(_.aggregateFunction match {
77+
case _: Percentile => reduceRatio += 1
78+
case _: ApproximatePercentile => reduceRatio += 1
79+
case _ =>
80+
})
81+
case _: BroadcastNestedLoopJoinExec => reduceRatio += 3
82+
case expand: ExpandExec => reduceRatio += expand.projections.size
83+
case GenerateExec(stack: Stack, _, _, _, _) => reduceRatio += stack.getNumRows
84+
case _ =>
85+
}
86+
reduceRatio
87+
}
88+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CacheParallelHint.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[sql] object CacheParallelHint
5353
logicalRelation.catalogTable.foreach(catalogTable => {
5454
SparkContext.getActive.foreach(sc => {
5555
val tableParallelInfo =
56-
TableParallelInfo(catalogTable.identifier.toString, r.partitionSize,
56+
TableParallelInfo(Some(catalogTable.identifier), r.partitionSize,
5757
r.partitionNumber)
5858

5959
val queryPlanningContext = if (QueryPlanningTracker.getCurrent.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,20 +216,20 @@ object FileSourceStrategy extends Strategy with SQLConfHelper with Logging {
216216

217217
if (conf.parallelHintEnabled) {
218218
l.catalogTable.foreach(catalogTable => {
219-
val tableIdentifier = catalogTable.identifier.toString
219+
val tableIdentifier = catalogTable.identifier
220220

221221
plan.collectFirst {
222222
case r: ResolvedParallelHint if ((r.partitionSize.nonEmpty ||
223223
r.partitionNumber.nonEmpty)) => r
224224
}.foreach(r => {
225-
tableParallelInfo = Some(TableParallelInfo(tableIdentifier, r.partitionSize,
225+
tableParallelInfo = Some(TableParallelInfo(Some(tableIdentifier), r.partitionSize,
226226
r.partitionNumber))
227227
})
228228

229229
if (tableParallelInfo.isEmpty) {
230230
QueryPlanningTracker.getCurrent.foreach(f => {
231231
tableParallelInfo = Option(
232-
f.queryPlanningContext.tableParallelHintMap.get(tableIdentifier)
232+
f.queryPlanningContext.tableParallelHintMap.get(tableIdentifier.toString)
233233
)
234234
})
235235
}

0 commit comments

Comments
 (0)