Skip to content

Commit 2a0a8f7

Browse files
maroputejasapatil
authored andcommitted
[SPARK-23034][SQL] Show RDD/relation names in RDD/Hive table scan nodes
## What changes were proposed in this pull request? This pr proposed to show RDD/relation names in RDD/Hive table scan nodes. This change made these names show up in the webUI and explain results. For example; ``` scala> sql("CREATE TABLE t(c1 int) USING hive") scala> sql("INSERT INTO t VALUES(1)") scala> spark.table("t").explain() == Physical Plan == Scan hive default.t [c1#8], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#8] ^^^^^^^^^^^ ``` <img width="212" alt="spark-pr-hive" src="https://user-images.githubusercontent.com/692303/44501013-51264c80-a6c6-11e8-94f8-0704aee83bb6.png"> Closes #20226 ## How was this patch tested? Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite` Closes #22153 from maropu/pr20226. Lead-authored-by: Takeshi Yamamuro <[email protected]> Co-authored-by: Tejas Patil <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4972090 commit 2a0a8f7

File tree

10 files changed

+56
-11
lines changed

10 files changed

+56
-11
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private[kafka010] class KafkaRelation(
117117
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
118118
cr.timestampType.id)
119119
}
120-
sqlContext.internalCreateDataFrame(rdd, schema).rdd
120+
sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema).rdd
121121
}
122122

123123
private def getPartitionOffsets(

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ private[kafka010] class KafkaSource(
215215
}
216216
if (start.isDefined && start.get == end) {
217217
return sqlContext.internalCreateDataFrame(
218-
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
218+
sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
219219
}
220220
val fromPartitionOffsets = start match {
221221
case Some(prevBatchEndOffset) =>
@@ -299,7 +299,7 @@ private[kafka010] class KafkaSource(
299299
logInfo("GetBatch generating RDD of offset range: " +
300300
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
301301

302-
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
302+
sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema, isStreaming = true)
303303
}
304304

305305
/** Stop this source and free any resources it has allocated. */

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class SparkSession private(
270270
*/
271271
@transient
272272
lazy val emptyDataFrame: DataFrame = {
273-
createDataFrame(sparkContext.emptyRDD[Row], StructType(Nil))
273+
createDataFrame(sparkContext.emptyRDD[Row].setName("empty"), StructType(Nil))
274274
}
275275

276276
/**
@@ -395,7 +395,7 @@ class SparkSession private(
395395
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
396396
SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
397397
}
398-
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self))
398+
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self))
399399
}
400400

401401
/**
@@ -594,7 +594,7 @@ class SparkSession private(
594594
} else {
595595
rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
596596
}
597-
internalCreateDataFrame(catalystRows, schema)
597+
internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
598598
}
599599

600600

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ case class ExternalRDDScanExec[T](
103103
override lazy val metrics = Map(
104104
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
105105

106+
private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")
107+
108+
override val nodeName: String = s"Scan$rddName"
109+
106110
protected override def doExecute(): RDD[InternalRow] = {
107111
val numOutputRows = longMetric("numOutputRows")
108112
val outputDataType = outputObjAttr.dataType
@@ -116,7 +120,7 @@ case class ExternalRDDScanExec[T](
116120
}
117121

118122
override def simpleString: String = {
119-
s"Scan $nodeName${output.mkString("[", ",", "]")}"
123+
s"$nodeName${output.mkString("[", ",", "]")}"
120124
}
121125
}
122126

@@ -169,10 +173,14 @@ case class LogicalRDD(
169173
case class RDDScanExec(
170174
output: Seq[Attribute],
171175
rdd: RDD[InternalRow],
172-
override val nodeName: String,
176+
name: String,
173177
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
174178
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode {
175179

180+
private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")
181+
182+
override val nodeName: String = s"Scan $name$rddName"
183+
176184
override lazy val metrics = Map(
177185
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
178186

@@ -189,6 +197,6 @@ case class RDDScanExec(
189197
}
190198

191199
override def simpleString: String = {
192-
s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
200+
s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}"
193201
}
194202
}

sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,6 @@ private[sql] object ArrowConverters {
211211
ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context)
212212
}
213213
val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
214-
sqlContext.internalCreateDataFrame(rdd, schema)
214+
sqlContext.internalCreateDataFrame(rdd.setName("arrow"), schema)
215215
}
216216
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,4 +2552,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
25522552
assert(numJobs == 1)
25532553
}
25542554
}
2555+
2556+
test("SPARK-23034 show rdd names in RDD scan nodes") {
2557+
val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd")
2558+
val df2 = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, c1 string"))
2559+
val output2 = new java.io.ByteArrayOutputStream()
2560+
Console.withOut(output2) {
2561+
df2.explain(extended = false)
2562+
}
2563+
assert(output2.toString.contains("Scan ExistingRDD testRdd"))
2564+
}
25552565
}

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
14981498
df.where($"city".contains(new java.lang.Character('A'))),
14991499
Seq(Row("Amsterdam")))
15001500
}
1501+
1502+
test("SPARK-23034 show rdd names in RDD scan nodes") {
1503+
val rddWithName = spark.sparkContext.parallelize(SingleData(1) :: Nil).setName("testRdd")
1504+
val df = spark.createDataFrame(rddWithName)
1505+
val output = new java.io.ByteArrayOutputStream()
1506+
Console.withOut(output) {
1507+
df.explain(extended = false)
1508+
}
1509+
assert(output.toString.contains("Scan testRdd"))
1510+
}
15011511
}
15021512

15031513
case class TestDataUnion(x: Int, y: Int, z: Int)

sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.scalatest.exceptions.TestFailedException
2727
import org.apache.spark.SparkException
2828
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
2929
import org.apache.spark.sql.Encoder
30+
import org.apache.spark.sql.catalyst.InternalRow
3031
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
3132
import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
3233
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
@@ -1229,6 +1230,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest
12291230
timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout,
12301231
batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = {
12311232
val stateFormatVersion = spark.conf.get(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
1233+
val emptyRdd = spark.sparkContext.emptyRDD[InternalRow]
12321234
MemoryStream[Int]
12331235
.toDS
12341236
.groupByKey(x => x)
@@ -1237,7 +1239,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest
12371239
case FlatMapGroupsWithState(f, k, v, g, d, o, s, m, _, t, _) =>
12381240
FlatMapGroupsWithStateExec(
12391241
f, k, v, g, d, o, None, s, stateFormatVersion, m, t,
1240-
Some(currentBatchTimestamp), Some(currentBatchWatermark), RDDScanExec(g, null, "rdd"))
1242+
Some(currentBatchTimestamp), Some(currentBatchWatermark),
1243+
RDDScanExec(g, emptyRdd, "rdd"))
12411244
}.get
12421245
}
12431246

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ case class HiveTableScanExec(
6262

6363
override def conf: SQLConf = sparkSession.sessionState.conf
6464

65+
override def nodeName: String = s"Scan hive ${relation.tableMeta.qualifiedName}"
66+
6567
override lazy val metrics = Map(
6668
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
6769

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,16 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
170170
sql("EXPLAIN EXTENDED CODEGEN SELECT 1")
171171
}
172172
}
173+
174+
test("SPARK-23034 show relation names in Hive table scan nodes") {
175+
val tableName = "tab"
176+
withTable(tableName) {
177+
sql(s"CREATE TABLE $tableName(c1 int) USING hive")
178+
val output = new java.io.ByteArrayOutputStream()
179+
Console.withOut(output) {
180+
spark.table(tableName).explain(extended = false)
181+
}
182+
assert(output.toString.contains(s"Scan hive default.$tableName"))
183+
}
184+
}
173185
}

0 commit comments

Comments
 (0)