-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-23034][SQL] Override nodeName
for all *ScanExec operators
#20226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
It looks useful, @tejasapatil . Given that this is one line addition, why don't you handle the others?
BTW, can we have a title prefix |
@dongjoon-hyun : For Spark native tables, the table scan node is abstracted out as a |
Then, specifically, what happens in this PR for Parquet/ORC Hive table which is converted to data source tables with |
@@ -62,6 +62,8 @@ case class HiveTableScanExec( | |||
|
|||
override def conf: SQLConf = sparkSession.sessionState.conf | |||
|
|||
override def nodeName: String = s"${super.nodeName}-${relation.tableMeta.qualifiedName}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"${super.nodeName}(${relation.tableMeta.qualifiedName})"
looks clearer to me, but up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this format. I added a space in between for better readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(updated the screenshot in the PR description)
HiveTableScan
node in UIHiveTableScan
node in UI
@dongjoon-hyun : I tried it out over master and since the table scan goes via codegen, it wont show the table name. Will update the PR description with this finding. Lets move this discussion to the JIRA and see what people have to say about the concern I had speculated. |
Test build #85941 has finished for PR 20226 at commit
|
Jenkins retest this please. |
Thank you, @tejasapatil . I see. |
Test build #85943 has finished for PR 20226 at commit
|
@dongjoon-hyun : I have updated the PR description |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Test build #85945 has finished for PR 20226 at commit
|
LGTM too |
@@ -62,6 +62,8 @@ case class HiveTableScanExec( | |||
|
|||
override def conf: SQLConf = sparkSession.sessionState.conf | |||
|
|||
override def nodeName: String = s"${super.nodeName} (${relation.tableMeta.qualifiedName})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our DataSourceScanExec
is using unquotedString
in nodeName. We need to make these LeafNode consistent. Could you check all the other LeafExecNode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceScanExec is using unquotedString
and so does this PR. Rest implementations of LeafExecNode
do not override nodeName
and / or depend on base class (ie. TreeNode
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
s"Scan HiveTable ${relation.tableMeta.qualifiedName}"
Just to be more consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceV2ScanExec
faces the same issue. How about InMemoryTableScanExec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile : I have updated the PR after going through all the *ScanExec implementations
Changes introduced in this PR:
Scan impl | overridden nodeName |
---|---|
DataSourceV2ScanExec | Scan DataSourceV2 [output_attribute1, output_attribute2, ..] |
ExternalRDDScanExec | Scan ExternalRDD [output_attribute1, output_attribute2, ..] |
FileSourceScanExec | Scan FileSource ${tableIdentifier.map(_.unquotedString).getOrElse(relation.location)}" |
HiveTableScanExec | Scan HiveTable relation.tableMeta.qualifiedName |
InMemoryTableScanExec | Scan In-memory relation.tableName |
LocalTableScanExec | Scan LocalTable [output_attribute1, output_attribute2, ..] |
RDDScanExec | Scan RDD name [output_attribute1, output_attribute2, ..] |
RowDataSourceScanExec | Scan FileSource ${tableIdentifier.map(_.unquotedString).getOrElse(relation)} |
Things not affected:
- DataSourceScanExec : already uses
Scan relation tableIdentifier.unquotedString
- RDDScanExec forces clients to specify the
nodeName
6271804
to
9bcd905
Compare
Test build #86022 has finished for PR 20226 at commit
|
HiveTableScan
node in UInodeName
for all *ScanExec operators
Test build #86044 has finished for PR 20226 at commit
|
Overall, the fixes looks good to me. We just need to resolve the test cases. Thanks for improving it! |
The test failure does look legit to me. I have been not able to repro it on my laptop. Intellij doesn't treat it as a test case. Command-line does recognize it as test case but hits runtime failure with jar mismatch. I am using this to run the test:
Is there special setup needed to run these tests ? |
Try this? It worked for me before. |
f16b73b
to
65ec7a2
Compare
Jenkins retest this please. |
@@ -45,7 +46,12 @@ trait CodegenSupport extends SparkPlan { | |||
case _: SortMergeJoinExec => "smj" | |||
case _: RDDScanExec => "rdd" | |||
case _: DataSourceScanExec => "scan" | |||
case _ => nodeName.toLowerCase(Locale.ROOT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This caused one of the tests to fail as the nodeName
generated was not a single word (like before) but something like scan in-memory my_table
... which does not compile with codegen. The change done was to retain only the alpha-numeric characters in the nodeName
while generating variablePrefix
Test build #86300 has finished for PR 20226 at commit
|
Test build #86299 has finished for PR 20226 at commit
|
65ec7a2
to
0c0aa94
Compare
@@ -30,6 +30,8 @@ case class LocalTableScanExec( | |||
output: Seq[Attribute], | |||
@transient rows: Seq[InternalRow]) extends LeafExecNode { | |||
|
|||
override val nodeName: String = s"Scan LocalTable ${output.map(_.name).mkString("[", ",", "]")}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like we have duplicate info about output
in stringArgs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you are referring to the duplication at :
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
Line 466 in 3f958a9
def simpleString: String = s"$nodeName $argString".trim |
Am changing this line to just have Scan LocalTable
Test build #86353 has finished for PR 20226 at commit
|
Test build #86407 has finished for PR 20226 at commit
|
@@ -233,7 +233,7 @@ struct<plan:string> | |||
-- !query 28 output | |||
== Physical Plan == | |||
*Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x] | |||
+- Scan OneRowRelation[] | |||
+- Scan Scan RDD OneRowRelation [][] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
Test build #86850 has finished for PR 20226 at commit
|
It sounds like we still need to fix a test in PySpark. Thanks! |
@@ -86,6 +86,9 @@ case class RowDataSourceScanExec( | |||
|
|||
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) | |||
|
|||
override val nodeName: String = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceScanExec.nodeName
is defined as s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
, do we really need to overwrite it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intent was to be able to distinguish between RowDataSourceScan and FileSourceScan. Removing those overrides.
By default Can we just change the UI code to put |
@@ -103,6 +103,8 @@ case class ExternalRDDScanExec[T]( | |||
override lazy val metrics = Map( | |||
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) | |||
|
|||
override val nodeName: String = s"Scan ExternalRDD ${output.map(_.name).mkString("[", ",", "]")}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think including the output in the node name is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention here was to be able to distinguish between ExternalRDDScanExec
nodes. If we remove the output
part from nodename
, then these nodes would be named as Scan ExternalRDD
which is generic.
override val outputPartitioning: Partitioning = UnknownPartitioning(0), | ||
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode { | ||
|
||
override val nodeName: String = s"Scan RDD $name ${output.map(_.name).mkString("[", ",", "]")}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed output
. The name
in there would help in identifying the nodes uniquely
After went through the changes here, I think we only need to update 2 nodes to include table name in |
Test build #94786 has finished for PR 20226 at commit
|
@maropu Could you take this over? |
sure, will do, too. |
What changes were proposed in this pull request?
For queries which scan multiple tables, it will be convenient if the DAG shown in Spark UI also showed which table is being scanned. This will make debugging easier. For this JIRA, I am scoping those for hive table scans only. In case table scans which happen via codegen (eg. convertMetastore and spark native tables), this PR will not affect things.
How was this patch tested?