Skip to content

[SPARK-30200][SQL] Add ExplainMode for Dataset.explain #26829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions sql/core/src/main/java/org/apache/spark/sql/ExplainMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql;

import org.apache.spark.annotation.Unstable;

/**
* ExplainMode is used to specify the expected output format of plans (logical and physical)
* for debugging purpose.
*
* @since 3.0.0
*/
@Unstable
public enum ExplainMode {
/**
* Simple mode means that when printing explain for a DataFrame, only a physical plan is
* expected to be printed to the console.
*
* @since 3.0.0
*/
Simple,
/**
* Extended mode means that when printing explain for a DataFrame, both logical and physical
* plans are expected to be printed to the console.
*
* @since 3.0.0
*/
Extended,
/**
* Extended mode means that when printing explain for a DataFrame, if generated codes are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codegen mode ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh... I'll do follow-up, thanks!

* available, a physical plan and the generated codes are expected to be printed to the console.
*
* @since 3.0.0
*/
Codegen,
/**
* Extended mode means that when printing explain for a DataFrame, if plan node statistics are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same.

* available, a logical plan and the statistics are expected to be printed to the console.
*
* @since 3.0.0
*/
Cost,
/**
* Formatted mode means that when printing explain for a DataFrame, explain output is
* expected to be split into two sections: a physical plan outline and node details.
*
* @since 3.0.0
*/
Formatted
}
40 changes: 31 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -522,36 +522,58 @@ class Dataset[T] private[sql](
// scalastyle:on println

/**
* Prints the plans (logical and physical) to the console for debugging purposes.
* Prints the plans (logical and physical) with a format specified by a given explain mode.
*
* @group basic
* @since 1.6.0
* @since 3.0.0
*/
def explain(extended: Boolean): Unit = {
def explain(mode: ExplainMode): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about retain the old api and add a new api ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you . @maropu already did. Please see line 564.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we should keep this. Thanks for the comment, @dongjoon-hyun

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see it.

// Because temporary views are resolved during analysis when we create a Dataset, and
// `ExplainCommand` analyzes input query plan and resolves temporary views again. Using
// `ExplainCommand` here will probably output different query plans, compared to the results
// of evaluation of the Dataset. So just output QueryExecution's query plans here.
val qe = ExplainCommandUtil.explainedQueryExecution(sparkSession, logicalPlan, queryExecution)

val outputString =
if (extended) {
qe.toString
} else {
val outputString = mode match {
case ExplainMode.Simple =>
qe.simpleString
}
case ExplainMode.Extended =>
qe.toString
case ExplainMode.Codegen =>
try {
org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan)
} catch {
case e: AnalysisException => e.toString
}
case ExplainMode.Cost =>
qe.stringWithStats
case ExplainMode.Formatted =>
qe.simpleString(formatted = true)
}
// scalastyle:off println
println(outputString)
// scalastyle:on println
}

/**
* Prints the plans (logical and physical) to the console for debugging purposes.
*
* @group basic
* @since 1.6.0
*/
def explain(extended: Boolean): Unit = if (extended) {
explain(ExplainMode.Extended)
} else {
explain(ExplainMode.Simple)
}

/**
* Prints the physical plan to the console for debugging purposes.
*
* @group basic
* @since 1.6.0
*/
def explain(): Unit = explain(extended = false)
def explain(): Unit = explain(ExplainMode.Simple)

/**
* Returns all column names and their data types as an array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.command
import java.util.UUID

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -132,13 +131,15 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
* (but do NOT actually execute it).
*
* {{{
* EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ...
* EXPLAIN (EXTENDED | CODEGEN | COST | FORMATTED) SELECT * FROM ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this together.

* }}}
*
* @param logicalPlan plan to explain
* @param extended whether to do extended explain or not
* @param codegen whether to output generated code from whole-stage codegen or not
* @param cost whether to show cost information for operators.
* @param formatted whether to split explain output into two sections: a physical plan outline
* and node details.
*/
case class ExplainCommand(
logicalPlan: LogicalPlan,
Expand Down
67 changes: 57 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@ import org.apache.spark.sql.types.StructType
class ExplainSuite extends QueryTest with SharedSparkSession {
import testImplicits._

/**
* Get the explain from a DataFrame and run the specified action on it.
*/
private def withNormalizedExplain(df: DataFrame, extended: Boolean)(f: String => Unit) = {
private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = {
val output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
df.explain(extended = extended)
df.explain(mode)
}
val normalizedOutput = output.toString.replaceAll("#\\d+", "#x")
f(normalizedOutput)
output.toString.replaceAll("#\\d+", "#x")
}

/**
* Get the explain from a DataFrame and run the specified action on it.
*/
private def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: String => Unit) = {
f(getNormalizedExplain(df, mode))
}

/**
Expand All @@ -53,14 +56,19 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
/**
* Runs the plan and makes sure the plans contains all of the keywords.
*/
private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = {
withNormalizedExplain(df, extended = true) { normalizedOutput =>
private def checkKeywordsExistsInExplain(
df: DataFrame, mode: ExplainMode, keywords: String*): Unit = {
withNormalizedExplain(df, mode) { normalizedOutput =>
for (key <- keywords) {
assert(normalizedOutput.contains(key))
}
}
}

private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = {
checkKeywordsExistsInExplain(df, ExplainMode.Extended, keywords: _*)
}

test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") {
val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd")
val df = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, c1 string"))
Expand Down Expand Up @@ -209,7 +217,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
test("SPARK-26659: explain of DataWritingCommandExec should not contain duplicate cmd.nodeName") {
withTable("temptable") {
val df = sql("create table temptable using parquet as select * from range(2)")
withNormalizedExplain(df, extended = false) { normalizedOutput =>
withNormalizedExplain(df, ExplainMode.Simple) { normalizedOutput =>
assert("Create\\w*?TableAsSelectCommand".r.findAllMatchIn(normalizedOutput).length == 1)
}
}
Expand Down Expand Up @@ -262,6 +270,45 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
}
}
}

test("Support ExplainMode in Dataset.explain") {
val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1")
val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")
val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2"))

val simpleExplainOutput = getNormalizedExplain(testDf, ExplainMode.Simple)
assert(simpleExplainOutput.startsWith("== Physical Plan =="))
Seq("== Parsed Logical Plan ==",
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==").foreach { planType =>
assert(!simpleExplainOutput.contains(planType))
}
checkKeywordsExistsInExplain(
testDf,
ExplainMode.Extended,
"== Parsed Logical Plan ==" ::
"== Analyzed Logical Plan ==" ::
"== Optimized Logical Plan ==" ::
"== Physical Plan ==" ::
Nil: _*)
checkKeywordsExistsInExplain(
testDf,
ExplainMode.Cost,
"Statistics(sizeInBytes=" ::
Nil: _*)
checkKeywordsExistsInExplain(
testDf,
ExplainMode.Codegen,
"WholeStageCodegen subtrees" ::
"Generated code:" ::
Nil: _*)
checkKeywordsExistsInExplain(
testDf,
ExplainMode.Formatted,
"* LocalTableScan (1)" ::
"(1) LocalTableScan [codegen id :" ::
Nil: _*)
}
}

case class ExplainSingleData(id: Int)