diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 7c7d79c2bbd7c..0f365f2141674 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -86,6 +86,9 @@ case class RowDataSourceScanExec( def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) + override val nodeName: String = + s"Scan FileSource ${tableIdentifier.map(_.unquotedString).getOrElse(relation)}" + override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -175,6 +178,9 @@ case class FileSourceScanExec( } } + override val nodeName: String = + s"Scan FileSource ${tableIdentifier.map(_.unquotedString).getOrElse(relation.location)}" + override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( requiredSchema = requiredSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index f3555508185fe..89ddd6754ba7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -103,6 +103,8 @@ case class ExternalRDDScanExec[T]( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override val nodeName: String = s"Scan ExternalRDD ${output.map(_.name).mkString("[", ",", "]")}" + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val outputDataType = outputObjAttr.dataType @@ -116,7 +118,7 @@ case class ExternalRDDScanExec[T]( } override def simpleString: String = { - s"Scan $nodeName${output.mkString("[", ",", "]")}" + s"Scan ${super.nodeName}${output.mkString("[", ",", "]")}" } } @@ -169,10 +171,12 @@ case class LogicalRDD( case class RDDScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], - override val nodeName: String, + name: String, override val outputPartitioning: Partitioning = UnknownPartitioning(0), override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode { + override val nodeName: String = s"Scan RDD $name ${output.map(_.name).mkString("[", ",", "]")}" + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -189,6 +193,6 @@ case class RDDScanExec( } override def simpleString: String = { - s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" + s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 514ad7018d8c7..c0ab81911bdff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -30,6 +30,8 @@ case class LocalTableScanExec( output: Seq[Attribute], @transient rows: Seq[InternalRow]) extends LeafExecNode { + override val nodeName: String = s"Scan LocalTable" + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 065954559e487..36baa82db7417 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -45,7 +46,12 @@ trait CodegenSupport extends SparkPlan { case _: SortMergeJoinExec => "smj" case _: RDDScanExec => "rdd" case _: DataSourceScanExec => "scan" - case _ => nodeName.toLowerCase(Locale.ROOT) + case _: LocalTableScanExec => "local_scan" + case _: InMemoryTableScanExec => "in_mem_scan" + case _ => + // Java variable names can only have alpha-numeric characters, underscores and `$` (the use of + // later two is discouraged) + nodeName.toLowerCase(Locale.ROOT).replaceAll("\\P{Alnum}", "") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 3565ee3af1b9f..a61b5a287457c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -36,6 +36,8 @@ case class InMemoryTableScanExec( @transient relation: InMemoryRelation) extends LeafExecNode with ColumnarBatchScan { + override val nodeName: String = s"Scan In-memory ${relation.tableName.getOrElse("")}" + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def vectorTypes: Option[Seq[String]] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index beb66738732be..d978c423d49c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -38,6 +38,9 @@ case class DataSourceV2ScanExec( @transient reader: DataSourceV2Reader) extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan { + override val nodeName: String = + s"Scan DataSourceV2 ${fullOutput.map(_.name).mkString("[", ",", "]")}" + override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] override def producedAttributes: AttributeSet = AttributeSet(fullOutput) diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index c065ce5012929..f504ef4014808 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -178,6 +178,6 @@ Join Cross == Physical Plan == BroadcastNestedLoopJoin BuildRight, Cross -:- LocalTableScan [col1#x, col2#x] +:- Scan LocalTable [col1#x, col2#x] +- BroadcastExchange IdentityBroadcastMode - +- LocalTableScan [col1#x, col2#x] + +- Scan LocalTable [col1#x, col2#x] diff --git a/sql/core/src/test/resources/sql-tests/results/operators.sql.out b/sql/core/src/test/resources/sql-tests/results/operators.sql.out index 237b618a8b904..515373ffe8128 100644 --- a/sql/core/src/test/resources/sql-tests/results/operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/operators.sql.out @@ -233,7 +233,7 @@ struct -- !query 28 output == Physical Plan == *Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x] -+- Scan OneRowRelation[] ++- Scan RDD OneRowRelation [][] -- !query 29 @@ -243,7 +243,7 @@ struct -- !query 29 output == Physical Plan == *Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x] -+- Scan OneRowRelation[] ++- Scan RDD OneRowRelation [][] -- !query 30 @@ -253,7 +253,7 @@ struct -- !query 30 output == Physical Plan == *Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x] -+- Scan OneRowRelation[] ++- Scan RDD OneRowRelation [][] -- !query 31 @@ -263,7 +263,7 @@ struct -- !query 31 output == Physical Plan == *Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x] -+- Scan OneRowRelation[] ++- Scan RDD OneRowRelation [][] -- !query 32 @@ -273,7 +273,7 @@ struct -- !query 32 output == Physical Plan == *Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x] -+- Scan OneRowRelation[] ++- Scan RDD OneRowRelation [][] -- !query 33 @@ -283,7 +283,7 @@ struct -- !query 33 output == Physical Plan == *Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x] -+- Scan OneRowRelation[] ++- Scan RDD OneRowRelation [][] -- !query 34 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c65e5d3dd75c2..27493803bb300 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -476,7 +476,7 @@ class StreamSuite extends StreamTest { .mkString("\n") assert(explainString.contains("StateStoreRestore")) assert(explainString.contains("StreamingRelation")) - assert(!explainString.contains("LocalTableScan")) + assert(!explainString.contains("Scan LocalTable")) // Test StreamingQuery.display val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory") @@ -493,7 +493,7 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0) - assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1) + assert("Scan LocalTable".r.findAllMatchIn(explainWithoutExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithoutExtended.contains("StateStoreRestore")) @@ -501,7 +501,7 @@ class StreamSuite extends StreamTest { // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3) - assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1) + assert("Scan LocalTable".r.findAllMatchIn(explainWithExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithExtended.contains("StateStoreRestore")) } finally { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 7289da71a3365..212627fd09f88 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -278,7 +278,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select * from test_table") plan.next() plan.next() - assert(plan.getString(1).contains("InMemoryTableScan")) + assert(plan.getString(1).contains("Scan In-memory")) val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") val buf1 = new collection.mutable.ArrayBuffer[Int]() @@ -364,7 +364,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC") plan.next() plan.next() - assert(plan.getString(1).contains("InMemoryTableScan")) + assert(plan.getString(1).contains("Scan In-memory")) val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") val buf = new collection.mutable.ArrayBuffer[Int]() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 7dcaf170f9693..7cc0891b2207f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -62,6 +62,8 @@ case class HiveTableScanExec( override def conf: SQLConf = sparkSession.sessionState.conf + override def nodeName: String = s"Scan HiveTable ${relation.tableMeta.qualifiedName}" + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))