Skip to content

Commit 329071d

Browse files
Address review comments; turn config name from string to field in SQLConf.
1 parent 8663e84 commit 329071d

File tree

5 files changed

+30
-29
lines changed

5 files changed

+30
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24+
object SQLConf {
25+
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
26+
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
27+
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
28+
29+
object Deprecated {
30+
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
31+
}
32+
}
33+
2434
/**
2535
* A trait that enables the setting and getting of mutable config parameters/hints.
2636
*
@@ -49,16 +59,16 @@ trait SQLConf {
4959
*
5060
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
5161
*/
52-
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
62+
private[spark] def autoBroadcastJoinThreshold: Int =
63+
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
5364

5465
/**
5566
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
5667
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
5768
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
5869
*/
59-
private[spark] def statsDefaultSizeInBytes: Long =
60-
getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong)
61-
.getOrElse(autoConvertJoinSize + 1)
70+
private[spark] def defaultSizeInBytes: Long =
71+
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)
6272

6373
/** ********************** SQLConf functionality methods ************ */
6474

@@ -99,12 +109,3 @@ trait SQLConf {
99109
}
100110

101111
}
102-
103-
object SQLConf {
104-
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
105-
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
106-
107-
object Deprecated {
108-
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
109-
}
110-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
8484
@transient override lazy val statistics = Statistics(
8585
// TODO: Instead of returning a default value here, find a way to return a meaningful size
8686
// estimate for RDDs. See PR 1238 for more discussions.
87-
sizeInBytes = BigInt(sqlContext.statsDefaultSizeInBytes)
87+
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
8888
)
8989

9090
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
5353
* This strategy applies a simple optimization based on the estimates of the physical sizes of
5454
* the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an
5555
* estimated physical size smaller than the user-settable threshold
56-
* `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and
57-
* mark the other relation as the ''stream'' side. The build table will be ''broadcasted'' to
58-
* all of the executors involved in the join, as a [[org.apache.spark.broadcast.Broadcast]]
59-
* object. If both estimates exceed the threshold, they will instead be used to decide the build
60-
* side in a [[execution.ShuffledHashJoin]].
56+
* [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
57+
* ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
58+
* ''broadcasted'' to all of the executors involved in the join, as a
59+
* [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they
60+
* will instead be used to decide the build side in a [[execution.ShuffledHashJoin]].
6161
*/
6262
object HashJoin extends Strategy with PredicateHelper {
6363
private[this] def makeBroadcastHashJoin(
@@ -74,13 +74,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7474

7575
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
7676
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
77-
if Try(sqlContext.autoConvertJoinSize > 0 &&
78-
right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) =>
77+
if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
78+
right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
7979
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
8080

8181
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
82-
if Try(sqlContext.autoConvertJoinSize > 0 &&
83-
left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) =>
82+
if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
83+
left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
8484
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
8585

8686
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private[hive] case class MetastoreRelation
279279
BigInt(
280280
Option(hiveQlTable.getParameters.get("totalSize"))
281281
.map(_.toLong)
282-
.getOrElse(sqlContext.statsDefaultSizeInBytes))
282+
.getOrElse(sqlContext.defaultSizeInBytes))
283283
}
284284
)
285285

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
1919

2020
import scala.reflect.ClassTag
2121

22-
import org.apache.spark.sql.QueryTest
22+
import org.apache.spark.sql.{SQLConf, QueryTest}
2323
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
2424
import org.apache.spark.sql.hive.test.TestHive
2525
import org.apache.spark.sql.hive.test.TestHive._
@@ -51,7 +51,7 @@ class StatisticsSuite extends QueryTest {
5151
val sizes = rdd.queryExecution.analyzed.collect {
5252
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
5353
}
54-
assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize,
54+
assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold,
5555
s"query should contain two relations, each of which has size smaller than autoConvertSize")
5656

5757
// Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -63,9 +63,9 @@ class StatisticsSuite extends QueryTest {
6363
checkAnswer(rdd, expectedAnswer) // check correctness of output
6464

6565
TestHive.settings.synchronized {
66-
val tmp = autoConvertJoinSize
66+
val tmp = autoBroadcastJoinThreshold
6767

68-
hql("""SET spark.sql.auto.convert.join.size=-1""")
68+
hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
6969
rdd = hql(query)
7070
bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
7171
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
@@ -74,7 +74,7 @@ class StatisticsSuite extends QueryTest {
7474
assert(shj.size === 1,
7575
"ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
7676

77-
hql(s"""SET spark.sql.auto.convert.join.size=$tmp""")
77+
hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
7878
}
7979

8080
after()

0 commit comments

Comments
 (0)