Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 0f80990

Browse files
yhuairxin
authored andcommitted
[SPARK-8023][SQL] Add "deterministic" attribute to Expression to avoid collapsing nondeterministic projects.
This closes apache#6570. Author: Yin Huai <[email protected]> Author: Reynold Xin <[email protected]> Closes apache#6573 from rxin/deterministic and squashes the following commits: 356cd22 [Reynold Xin] Added unit test for the optimizer. da3fde1 [Reynold Xin] Merge pull request apache#6570 from yhuai/SPARK-8023 da56200 [Yin Huai] Comments. e38f264 [Yin Huai] Comment. f9d6a73 [Yin Huai] Add a deterministic method to Expression.
1 parent 7b7f7b6 commit 0f80990

File tree

6 files changed

+137
-2
lines changed

6 files changed

+137
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ abstract class Expression extends TreeNode[Expression] {
3737
* - A [[Cast]] or [[UnaryMinus]] is foldable if its child is foldable
3838
*/
3939
def foldable: Boolean = false
40+
41+
/**
42+
* Returns true when the current expression always return the same result for fixed input values.
43+
*/
44+
// TODO: Need to define explicit input values vs implicit input values.
45+
def deterministic: Boolean = true
46+
4047
def nullable: Boolean
48+
4149
def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator))
4250

4351
/** Returns the result of evaluating this expression on a given input Row */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable {
3838
*/
3939
@transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.get().partitionId())
4040

41+
override def deterministic: Boolean = false
42+
4143
override def nullable: Boolean = false
4244

4345
override def dataType: DataType = DoubleType

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,17 @@ object ColumnPruning extends Rule[LogicalPlan] {
179179
* expressions into one single expression.
180180
*/
181181
object ProjectCollapsing extends Rule[LogicalPlan] {
182+
183+
/** Returns true if any expression in projectList is non-deterministic. */
184+
private def hasNondeterministic(projectList: Seq[NamedExpression]): Boolean = {
185+
projectList.exists(expr => expr.find(!_.deterministic).isDefined)
186+
}
187+
182188
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
183-
case Project(projectList1, Project(projectList2, child)) =>
189+
// We only collapse these two Projects if the child Project's expressions are all
190+
// deterministic.
191+
case Project(projectList1, Project(projectList2, child))
192+
if !hasNondeterministic(projectList2) =>
184193
// Create a map of Aliases to their values from the child projection.
185194
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
186195
val aliasMap = AttributeMap(projectList2.collect {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
21+
import org.apache.spark.sql.catalyst.dsl.plans._
22+
import org.apache.spark.sql.catalyst.dsl.expressions._
23+
import org.apache.spark.sql.catalyst.expressions.Rand
24+
import org.apache.spark.sql.catalyst.plans.PlanTest
25+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
26+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
27+
28+
29+
class ProjectCollapsingSuite extends PlanTest {
30+
object Optimize extends RuleExecutor[LogicalPlan] {
31+
val batches =
32+
Batch("Subqueries", FixedPoint(10), EliminateSubQueries) ::
33+
Batch("ProjectCollapsing", Once, ProjectCollapsing) :: Nil
34+
}
35+
36+
val testRelation = LocalRelation('a.int, 'b.int)
37+
38+
test("collapse two deterministic, independent projects into one") {
39+
val query = testRelation
40+
.select(('a + 1).as('a_plus_1), 'b)
41+
.select('a_plus_1, ('b + 1).as('b_plus_1))
42+
43+
val optimized = Optimize.execute(query.analyze)
44+
val correctAnswer = testRelation.select(('a + 1).as('a_plus_1), ('b + 1).as('b_plus_1)).analyze
45+
46+
comparePlans(optimized, correctAnswer)
47+
}
48+
49+
test("collapse two deterministic, dependent projects into one") {
50+
val query = testRelation
51+
.select(('a + 1).as('a_plus_1), 'b)
52+
.select(('a_plus_1 + 1).as('a_plus_2), 'b)
53+
54+
val optimized = Optimize.execute(query.analyze)
55+
56+
val correctAnswer = testRelation.select(
57+
(('a + 1).as('a_plus_1) + 1).as('a_plus_2),
58+
'b).analyze
59+
60+
comparePlans(optimized, correctAnswer)
61+
}
62+
63+
test("do not collapse nondeterministic projects") {
64+
val query = testRelation
65+
.select(Rand(10).as('rand))
66+
.select(('rand + 1).as('rand1), ('rand + 2).as('rand2))
67+
68+
val optimized = Optimize.execute(query.analyze)
69+
val correctAnswer = query.analyze
70+
71+
comparePlans(optimized, correctAnswer)
72+
}
73+
}

sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql
1919

2020
import org.scalatest.Matchers._
2121

22+
import org.apache.spark.sql.execution.Project
2223
import org.apache.spark.sql.functions._
2324
import org.apache.spark.sql.test.TestSQLContext
2425
import org.apache.spark.sql.test.TestSQLContext.implicits._
@@ -452,13 +453,51 @@ class ColumnExpressionSuite extends QueryTest {
452453
}
453454

454455
test("rand") {
455-
val randCol = testData.select('key, rand(5L).as("rand"))
456+
val randCol = testData.select($"key", rand(5L).as("rand"))
456457
randCol.columns.length should be (2)
457458
val rows = randCol.collect()
458459
rows.foreach { row =>
459460
assert(row.getDouble(1) <= 1.0)
460461
assert(row.getDouble(1) >= 0.0)
461462
}
463+
464+
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
465+
val projects = df.queryExecution.executedPlan.collect {
466+
case project: Project => project
467+
}
468+
assert(projects.size === expectedNumProjects)
469+
}
470+
471+
// We first create a plan with two Projects.
472+
// Project [rand + 1 AS rand1, rand - 1 AS rand2]
473+
// Project [key, (Rand 5 + 1) AS rand]
474+
// LogicalRDD [key, value]
475+
// Because Rand function is not deterministic, the column rand is not deterministic.
476+
// So, in the optimizer, we will not collapse Project [rand + 1 AS rand1, rand - 1 AS rand2]
477+
// and Project [key, Rand 5 AS rand]. The final plan still has two Projects.
478+
val dfWithTwoProjects =
479+
testData
480+
.select($"key", (rand(5L) + 1).as("rand"))
481+
.select(($"rand" + 1).as("rand1"), ($"rand" - 1).as("rand2"))
482+
checkNumProjects(dfWithTwoProjects, 2)
483+
484+
// Now, we add one more project rand1 - rand2 on top of the query plan.
485+
// Since rand1 and rand2 are deterministic (they basically apply +/- to the generated
486+
// rand value), we can collapse rand1 - rand2 to the Project generating rand1 and rand2.
487+
// So, the plan will be optimized from ...
488+
// Project [(rand1 - rand2) AS (rand1 - rand2)]
489+
// Project [rand + 1 AS rand1, rand - 1 AS rand2]
490+
// Project [key, (Rand 5 + 1) AS rand]
491+
// LogicalRDD [key, value]
492+
// to ...
493+
// Project [((rand + 1 AS rand1) - (rand - 1 AS rand2)) AS (rand1 - rand2)]
494+
// Project [key, Rand 5 AS rand]
495+
// LogicalRDD [key, value]
496+
val dfWithThreeProjects = dfWithTwoProjects.select($"rand1" - $"rand2")
497+
checkNumProjects(dfWithThreeProjects, 2)
498+
dfWithThreeProjects.collect().foreach { row =>
499+
assert(row.getDouble(0) === 2.0 +- 0.0001)
500+
}
462501
}
463502

464503
test("randn") {

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ private[hive] case class HiveSimpleUdf(funcWrapper: HiveFunctionWrapper, childre
7878

7979
type UDFType = UDF
8080

81+
override def deterministic: Boolean = isUDFDeterministic
82+
8183
override def nullable: Boolean = true
8284

8385
@transient
@@ -140,6 +142,8 @@ private[hive] case class HiveGenericUdf(funcWrapper: HiveFunctionWrapper, childr
140142
extends Expression with HiveInspectors with Logging {
141143
type UDFType = GenericUDF
142144

145+
override def deterministic: Boolean = isUDFDeterministic
146+
143147
override def nullable: Boolean = true
144148

145149
@transient

0 commit comments

Comments
 (0)