Skip to content

Commit 1642de6

Browse files
authored
Merge pull request apache#635 from palantir/jbaker/adaptive
Cherry pick adaptive query execution
2 parents cc41115 + 9be3f6b commit 1642de6

File tree

68 files changed

+4287
-1140
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+4287
-1140
lines changed

core/src/test/scala/org/apache/spark/SparkFunSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
// scalastyle:off
2121
import java.io.File
2222

23+
import org.apache.log4j.{Appender, Level, Logger}
2324
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
2425

2526
import org.apache.spark.internal.Logging
@@ -117,4 +118,28 @@ abstract class SparkFunSuite
117118
Utils.deleteRecursively(dir)
118119
}
119120
}
121+
122+
/**
123+
* Adds a log appender and optionally sets a log level to the root logger or the logger with
124+
* the specified name, then executes the specified function, and in the end removes the log
125+
* appender and restores the log level if necessary.
126+
*/
127+
protected def withLogAppender(
128+
appender: Appender,
129+
loggerName: Option[String] = None,
130+
level: Option[Level] = None)(
131+
f: => Unit): Unit = {
132+
val logger = loggerName.map(Logger.getLogger).getOrElse(Logger.getRootLogger)
133+
val restoreLevel = logger.getLevel
134+
logger.addAppender(appender)
135+
if (level.isDefined) {
136+
logger.setLevel(level.get)
137+
}
138+
try f finally {
139+
logger.removeAppender(appender)
140+
if (level.isDefined) {
141+
logger.setLevel(restoreLevel)
142+
}
143+
}
144+
}
120145
}

docs/sql-performance-tuning.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,22 @@ that these options will be deprecated in future release as more optimizations ar
9292
</tr>
9393
</table>
9494

95-
## Broadcast Hint for SQL Queries
96-
97-
The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view.
98-
When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred,
99-
even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
100-
When both sides of a join are specified, Spark broadcasts the one having the lower statistics.
101-
Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join)
102-
support BHJ. When the broadcast nested loop join is selected, we still respect the hint.
95+
## Join Strategy Hints for SQL Queries
96+
97+
The join strategy hints, namely `BROADCAST`, `MERGE`, `SHUFFLE_HASH` and `SHUFFLE_REPLICATE_NL`,
98+
instruct Spark to use the hinted strategy on each specified relation when joining them with another
99+
relation. For example, when the `BROADCAST` hint is used on table 't1', broadcast join (either
100+
broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
101+
with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested
102+
by the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
103+
104+
When different join strategy hints are specified on both sides of a join, Spark prioritizes the
105+
`BROADCAST` hint over the `MERGE` hint over the `SHUFFLE_HASH` hint over the `SHUFFLE_REPLICATE_NL`
106+
hint. When both sides are specified with the `BROADCAST` hint or the `SHUFFLE_HASH` hint, Spark will
107+
pick the build side based on the join type and the sizes of the relations.
108+
109+
Note that there is no guarantee that Spark will choose the join strategy specified in the hint since
110+
a specific strategy may not support all join types.
103111

104112
<div class="codetabs">
105113

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ class Analyzer(
146146

147147
lazy val batches: Seq[Batch] = Seq(
148148
Batch("Hints", fixedPoint,
149-
new ResolveHints.ResolveBroadcastHints(conf),
149+
new ResolveHints.ResolveJoinStrategyHints(conf),
150150
ResolveHints.ResolveCoalesceHints,
151-
ResolveHints.RemoveAllHints),
151+
new ResolveHints.RemoveAllHints(conf)),
152152
Batch("Simple Sanity Check", Once,
153153
LookupFunctions),
154154
Batch("Substitution", fixedPoint,
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.catalyst.plans.logical.{HintErrorHandler, HintInfo}
22+
23+
/**
24+
* The hint error handler that logs warnings for each hint error.
25+
*/
26+
object HintErrorLogger extends HintErrorHandler with Logging {
27+
28+
override def hintNotRecognized(name: String, parameters: Seq[Any]): Unit = {
29+
logWarning(s"Unrecognized hint: ${hintToPrettyString(name, parameters)}")
30+
}
31+
32+
override def hintRelationsNotFound(
33+
name: String, parameters: Seq[Any], invalidRelations: Set[String]): Unit = {
34+
invalidRelations.foreach { n =>
35+
logWarning(s"Count not find relation '$n' specified in hint " +
36+
s"'${hintToPrettyString(name, parameters)}'.")
37+
}
38+
}
39+
40+
override def joinNotFoundForJoinHint(hint: HintInfo): Unit = {
41+
logWarning(s"A join hint $hint is specified but it is not part of a join relation.")
42+
}
43+
44+
override def hintOverridden(hint: HintInfo): Unit = {
45+
logWarning(s"Hint $hint is overridden by another hint and will not take effect.")
46+
}
47+
48+
private def hintToPrettyString(name: String, parameters: Seq[Any]): String = {
49+
val prettyParameters = parameters.map {
50+
case a: UnresolvedAttribute => a.nameParts.mkString(".")
51+
case e: Any => e.toString
52+
}
53+
s"$name${prettyParameters.mkString("(", ", ", ")")}"
54+
}
55+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import java.util.Locale
2121

22+
import scala.collection.mutable
23+
2224
import org.apache.spark.sql.AnalysisException
2325
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
2426
import org.apache.spark.sql.catalyst.plans.logical._
@@ -28,45 +30,69 @@ import org.apache.spark.sql.internal.SQLConf
2830

2931

3032
/**
31-
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
33+
* Collection of rules related to hints. The only hint currently available is join strategy hint.
3234
*
3335
* Note that this is separately into two rules because in the future we might introduce new hint
34-
* rules that have different ordering requirements from broadcast.
36+
* rules that have different ordering requirements from join strategies.
3537
*/
3638
object ResolveHints {
3739

3840
/**
39-
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
40-
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
41-
* on top of any relation (that is not aliased differently), subquery, or common table expression
42-
* that match the specified name.
41+
* The list of allowed join strategy hints is defined in [[JoinStrategyHint.strategies]], and a
42+
* sequence of relation aliases can be specified with a join strategy hint, e.g., "MERGE(a, c)",
43+
* "BROADCAST(a)". A join strategy hint plan node will be inserted on top of any relation (that
44+
* is not aliased differently), subquery, or common table expression that match the specified
45+
* name.
4346
*
4447
* The hint resolution works by recursively traversing down the query plan to find a relation or
45-
* subquery that matches one of the specified broadcast aliases. The traversal does not go past
46-
* beyond any existing broadcast hints, subquery aliases.
48+
* subquery that matches one of the specified relation aliases. The traversal does not go past
49+
* beyond any view reference, with clause or subquery alias.
4750
*
4851
* This rule must happen before common table expressions.
4952
*/
50-
class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
51-
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
53+
class ResolveJoinStrategyHints(conf: SQLConf) extends Rule[LogicalPlan] {
54+
private val STRATEGY_HINT_NAMES = JoinStrategyHint.strategies.flatMap(_.hintAliases)
55+
56+
private val hintErrorHandler = conf.hintErrorHandler
5257

5358
def resolver: Resolver = conf.resolver
5459

55-
private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
60+
private def createHintInfo(hintName: String): HintInfo = {
61+
HintInfo(strategy =
62+
JoinStrategyHint.strategies.find(
63+
_.hintAliases.map(
64+
_.toUpperCase(Locale.ROOT)).contains(hintName.toUpperCase(Locale.ROOT))))
65+
}
66+
67+
private def applyJoinStrategyHint(
68+
plan: LogicalPlan,
69+
relations: mutable.HashSet[String],
70+
hintName: String): LogicalPlan = {
5671
// Whether to continue recursing down the tree
5772
var recurse = true
5873

5974
val newNode = CurrentOrigin.withOrigin(plan.origin) {
6075
plan match {
61-
case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
62-
ResolvedHint(plan, HintInfo(broadcast = true))
63-
case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
64-
ResolvedHint(plan, HintInfo(broadcast = true))
76+
case ResolvedHint(u @ UnresolvedRelation(ident), hint)
77+
if relations.exists(resolver(_, ident.table)) =>
78+
relations.remove(ident.table)
79+
ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))
80+
81+
case ResolvedHint(r: SubqueryAlias, hint)
82+
if relations.exists(resolver(_, r.alias)) =>
83+
relations.remove(r.alias)
84+
ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))
85+
86+
case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
87+
relations.remove(u.tableIdentifier.table)
88+
ResolvedHint(plan, createHintInfo(hintName))
89+
case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
90+
relations.remove(r.alias)
91+
ResolvedHint(plan, createHintInfo(hintName))
6592

6693
case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
6794
// Don't traverse down these nodes.
68-
// For an existing broadcast hint, there is no point going down (if we do, we either
69-
// won't change the structure, or will introduce another broadcast hint that is useless.
95+
// For an existing strategy hint, there is no chance for a match from this point down.
7096
// The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
7197
// down. Note that technically when this rule is executed, we haven't completed view
7298
// resolution yet and as a result the view part should be deadcode. I'm leaving it here
@@ -80,25 +106,31 @@ object ResolveHints {
80106
}
81107

82108
if ((plan fastEquals newNode) && recurse) {
83-
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
109+
newNode.mapChildren(child => applyJoinStrategyHint(child, relations, hintName))
84110
} else {
85111
newNode
86112
}
87113
}
88114

89115
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
90-
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
116+
case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
91117
if (h.parameters.isEmpty) {
92-
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
93-
ResolvedHint(h.child, HintInfo(broadcast = true))
118+
// If there is no table alias specified, apply the hint on the entire subtree.
119+
ResolvedHint(h.child, createHintInfo(h.name))
94120
} else {
95-
// Otherwise, find within the subtree query plans that should be broadcasted.
96-
applyBroadcastHint(h.child, h.parameters.map {
121+
// Otherwise, find within the subtree query plans to apply the hint.
122+
val relationNames = h.parameters.map {
97123
case tableName: String => tableName
98124
case tableId: UnresolvedAttribute => tableId.name
99-
case unsupported => throw new AnalysisException("Broadcast hint parameter should be " +
100-
s"an identifier or string but was $unsupported (${unsupported.getClass}")
101-
}.toSet)
125+
case unsupported => throw new AnalysisException("Join strategy hint parameter " +
126+
s"should be an identifier or string but was $unsupported (${unsupported.getClass}")
127+
}
128+
val relationNameSet = new mutable.HashSet[String]
129+
relationNames.foreach(relationNameSet.add)
130+
131+
val applied = applyJoinStrategyHint(h.child, relationNameSet, h.name)
132+
hintErrorHandler.hintRelationsNotFound(h.name, h.parameters, relationNameSet.toSet)
133+
applied
102134
}
103135
}
104136
}
@@ -133,9 +165,14 @@ object ResolveHints {
133165
* Removes all the hints, used to remove invalid hints provided by the user.
134166
* This must be executed after all the other hint rules are executed.
135167
*/
136-
object RemoveAllHints extends Rule[LogicalPlan] {
168+
class RemoveAllHints(conf: SQLConf) extends Rule[LogicalPlan] {
169+
170+
private val hintErrorHandler = conf.hintErrorHandler
171+
137172
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
138-
case h: UnresolvedHint => h.child
173+
case h: UnresolvedHint =>
174+
hintErrorHandler.hintNotRecognized(h.name, h.parameters)
175+
h.child
139176
}
140177
}
141178

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ object CatalogTable {
373373
/**
374374
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
375375
* We define this new class instead of directly using [[Statistics]] here because there are no
376-
* concepts of attributes or broadcast hint in catalog.
376+
* concepts of attributes in catalog.
377377
*/
378378
case class CatalogStatistics(
379379
sizeInBytes: BigInt,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraintExpressions.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,21 @@ import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral}
2222
import org.apache.spark.sql.types.DataType
2323

24-
case class KnownNotNull(child: Expression) extends UnaryExpression {
25-
override def nullable: Boolean = false
24+
trait TaggingExpression extends UnaryExpression {
25+
override def nullable: Boolean = child.nullable
2626
override def dataType: DataType = child.dataType
2727

28+
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = child.genCode(ctx)
29+
30+
override def eval(input: InternalRow): Any = child.eval(input)
31+
}
32+
33+
case class KnownNotNull(child: Expression) extends TaggingExpression {
34+
override def nullable: Boolean = false
35+
2836
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
2937
child.genCode(ctx).copy(isNull = FalseLiteral)
3038
}
31-
32-
override def eval(input: InternalRow): Any = {
33-
child.eval(input)
34-
}
3539
}
40+
41+
case class KnownFloatingPointNormalized(child: Expression) extends TaggingExpression

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression {
3737
/** Updates the expression with a new plan. */
3838
def withNewPlan(plan: T): PlanExpression[T]
3939

40+
/** Defines how the canonicalization should work for this expression. */
41+
def canonicalize(attrs: AttributeSeq): PlanExpression[T]
42+
4043
protected def conditionString: String = children.mkString("[", " && ", "]")
4144
}
4245

@@ -58,7 +61,7 @@ abstract class SubqueryExpression(
5861
children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
5962
case _ => false
6063
}
61-
def canonicalize(attrs: AttributeSeq): SubqueryExpression = {
64+
override def canonicalize(attrs: AttributeSeq): SubqueryExpression = {
6265
// Normalize the outer references in the subquery plan.
6366
val normalizedPlan = plan.transformAllExpressions {
6467
case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
4343
val result = plan transformDown {
4444
// Start reordering with a joinable item, which is an InnerLike join with conditions.
4545
// Avoid reordering if a join hint is present.
46-
case j @ Join(_, _, _: InnerLike, Some(cond), hint) if hint == JoinHint.NONE =>
46+
case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
4747
reorder(j, j.output)
48-
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), hint))
49-
if projectList.forall(_.isInstanceOf[Attribute]) && hint == JoinHint.NONE =>
48+
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE))
49+
if projectList.forall(_.isInstanceOf[Attribute]) =>
5050
reorder(p, p.output)
5151
}
5252
// After reordering is finished, convert OrderedJoin back to Join.
@@ -77,12 +77,12 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
7777
*/
7878
private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = {
7979
plan match {
80-
case Join(left, right, _: InnerLike, Some(cond), _) =>
80+
case Join(left, right, _: InnerLike, Some(cond), JoinHint.NONE) =>
8181
val (leftPlans, leftConditions) = extractInnerJoins(left)
8282
val (rightPlans, rightConditions) = extractInnerJoins(right)
8383
(leftPlans ++ rightPlans, splitConjunctivePredicates(cond).toSet ++
8484
leftConditions ++ rightConditions)
85-
case Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), _))
85+
case Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE))
8686
if projectList.forall(_.isInstanceOf[Attribute]) =>
8787
extractInnerJoins(j)
8888
case _ =>
@@ -91,11 +91,11 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
9191
}
9292

9393
private def replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan = plan match {
94-
case j @ Join(left, right, jt: InnerLike, Some(cond), _) =>
94+
case j @ Join(left, right, jt: InnerLike, Some(cond), JoinHint.NONE) =>
9595
val replacedLeft = replaceWithOrderedJoin(left)
9696
val replacedRight = replaceWithOrderedJoin(right)
9797
OrderedJoin(replacedLeft, replacedRight, jt, Some(cond))
98-
case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), _)) =>
98+
case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE)) =>
9999
p.copy(child = replaceWithOrderedJoin(j))
100100
case _ =>
101101
plan

0 commit comments

Comments
 (0)