-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes #18421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes #18421
Conversation
ok to test |
add to whitelist |
cc @wzhfy |
Test build #78663 has finished for PR 18421 at commit
|
Please update the PR title.
|
|
||
val table = visitTableIdentifier(ctx.tableIdentifier) | ||
if (ctx.identifierSeq() == null) { | ||
AnalyzeTableCommand(table, noscan, partitionSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnalyzeTableCommand(table, noscan = ctx.identifier != null, partitionSpec)
true | ||
} else { | ||
false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}
} else { | ||
None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val partitionSpec = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
import org.apache.spark.sql.internal.SessionState | ||
|
||
|
||
/** | ||
* Analyzes the given table to generate statistics, which will be used in query optimizations. | ||
* Analyzes the given table or partition to generate statistics, which will be used in | ||
* query optimizations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add the description about partitionSpec
?
If certain partition specs are specified, then statistics are gathered for only those partitions.
AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) | ||
AnalyzeTableCommand(TableIdentifier("t"), noscan = true, | ||
partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) | ||
intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be legal based on the description of Hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for pointing that out. Currently, this PR supports only exact partition spec. It doesn't support partial partition specs describing the document above. My preference would be to keep it simple for this PR and support only exact spec and add support for partial specs in a follow up PR. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine about this. Please update your PR description, the class description of visitAnalyze
and AnalyzeTableCommand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we first supports only exact partition spec, in the near future we will change (replace) the syntax and also the related implementation. I mean we are not doing it incrementally, so I prefer support the right syntax in this single pr. Would support that syntax require lots of work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wzhfy, I expect syntax change to be quite small and incremental. Currently, it is necessary to specify all partition columns along with values. The change will be to allow only a subset of partition columns and allow partition columns without values.
PARTITION (partcol1=val1,...) -> PARTITION (partcol1[=val1],...)
That said, I want to try to allow partial partition specs in this PR. Let me spend some time on it and report back my findings.
f9327b0
to
090be78
Compare
@@ -95,25 +95,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |||
* {{{ | |||
* ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; | |||
* }}} | |||
* Example SQL for analyzing a single partition : | |||
* {{{ | |||
* ANALYZE TABLE table PARTITION (key=value,..) COMPUTE STATISTICS [NOSCAN]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing syntax is not very clear. Could you improve them?
ANALYZE TABLE [db_name.]tablename [PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [NOSCAN]
In addition, since we have a restriction, please do not call visitNonOptionalPartitionSpec
. Instead, we can capture the non-set partition column and issue a more user-friendly exception message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing that visitNonOptionalPartitionSpec detects unset partition columns and throws an exception: Found an empty partition key '$key'. Is this sufficient or do you have something else in mind?
/**
- Create a partition specification map without optional values.
*/
protected def visitNonOptionalPartitionSpec(
ctx: PartitionSpecContext): Map[String, String] = withOrigin(ctx) {
visitPartitionSpec(ctx).map {
case (key, None) => throw new ParseException(s"Found an empty partition key '$key'.", ctx)
case (key, Some(value)) => key -> value
}
}
@@ -95,25 +95,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |||
* {{{ | |||
* ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here.
Test build #78750 has finished for PR 18421 at commit
|
Test build #78749 has finished for PR 18421 at commit
|
Test build #78809 has finished for PR 18421 at commit
|
2d1e817
to
70b35ce
Compare
@wzhfy, @gatorsmile, I updated this PR to support partial partition specs where values are defined only for a subset of partition columns. For example, if table has 2 partition columns ds and hr, both PARTITION (ds='2010-01-01', hr=10) and PARTITION (hr=10) specs are valid. I did not provide support for partition specs where partition column is mentioned without any value. E.g. PARTITION (ds, hr=10) spec is still not allowed. I didn't implement this because I realized that I don't understand why this syntax is useful. My understanding is that (ds, hr=10) is equivalent to (hr=10). Do you think it is important to implement this syntax? If so, would you help me understand why? Is this because we want to support HQL at the same level as Hive to allow seamless transition from Hive to Spark? |
Test build #78923 has finished for PR 18421 at commit
|
@wzhfy, @gatorsmile, I updated PR to add full support for partition partition specs. This version supports a subset of partition columns with or without values specified. I'm going away for the long weekend tomorrow and will be back next Wed (July 5). I may not be able to respond to comments or provide updates to this PR until then. |
Test build #78927 has finished for PR 18421 at commit
|
Test build #78929 has finished for PR 18421 at commit
|
Test build #78930 has finished for PR 18421 at commit
|
if (filteredSpec.isEmpty) { | ||
None | ||
} else { | ||
Some(filteredSpec.mapValues(v => v.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mapValues(_.get)
|
||
val partitionSpec = | ||
if (ctx.partitionSpec != null) { | ||
val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(x => x._2.isDefined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_._2.isDefined
*/ | ||
case class AnalyzeTableCommand( | ||
tableIdent: TableIdentifier, | ||
noscan: Boolean = true) extends RunnableCommand { | ||
noscan: Boolean = true, | ||
partitionSpec: Option[TablePartitionSpec] = None) extends RunnableCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a new AnalyzePartitionCommand
? We can put all partition-level logic there (including partition-level column stats in the future). I think that would make the logic clearer.
val newRowCount = rowCounts.get(p.spec) | ||
|
||
def updateStats(newStats: CatalogStatistics): Unit = { | ||
sessionState.catalog.alterPartitions(tableMeta.identifier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we collect all partitions with new stats and alter them together? Now alterPartitions
is called per partition.
calculateRowCountsPerPartition(sparkSession, tableMeta) | ||
} | ||
|
||
partitions.foreach(p => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitions.foreach { p =>
} | ||
} | ||
|
||
test("analyze single partition noscan") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can combine test cases for noscan and non-noscan by first analyze with noscan and check, then analyze without noscan and check. Currently there are many redundant codes.
@@ -1028,25 +994,115 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |||
currentFullPath | |||
} | |||
|
|||
private def statsToHiveProperties( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about statsToProperties
and statsFromProperties
? the current names seem related to hive stats, it's a little ambiguous.
sessionState: SessionState, | ||
catalogTable: CatalogTable, | ||
partition: CatalogTablePartition): Long = { | ||
calculateLocationSize(sessionState, catalogTable.identifier, partition.storage.locationUri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this method? It's only one line and can be called directly using calculateLocationSize
.
} | ||
} | ||
|
||
test("analyze a set of partitions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also check the following three cases:
ANALYZE TABLE tab PARTITION(ds='2010-01-01', hr) COMPUTE STATISTICS; -- analyze two partitions
ANALYZE TABLE tab PARTITION(ds, hr) COMPUTE STATISTICS; -- analyze four partitions
ANALYZE TABLE tab PARTITION(ds, hr='10') COMPUTE STATISTICS; -- Is this allowed in hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wzhfy, given that these cases are covered in SparkSqlParserSuite, is it still necessary to cover them again in StatisticsSuite? Partition columns without values are removed at parsing stage so that AnalyzePartitionCommand always receives partition column along with a value.
re: (ds, hr='10') - https://cwiki.apache.org/confluence/display/Hive/StatsDev suggests that it is allowed; this PR supports this syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova IIUC, the logic is wrong here. For example, when analyzing partition (ds, hr), we should not remove them in parser. Currently we parse it to AnalyzeTableCommand
, which collects table-level stats. But what we need to do is to collect partition-level stats for all partitions.
Check hive's behavior here
sql( | ||
s""" | ||
|INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02') | ||
|SELECT * FROM src |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using SELECT '1', 'A' in these tests? We don't need to make the table this big.
Test build #79232 has finished for PR 18421 at commit
|
@wzhfy, thank you for review. I created AnalyzePartitionCommand class, modified the logic to update all partitions in a single call to Metastore, combined noscan and non-noscan test cases. I believe I addressed all comments except for requests to make queryStats and assertStats common functions. After merging test cases, each of these functions is used only in one test, hence, there is no copy-pasta anymore. Would you take another look? |
Test build #79248 has finished for PR 18421 at commit
|
Test build #79246 has finished for PR 18421 at commit
|
8c0b61b
to
43f2112
Compare
…ed other review comments
… form a prefix of the partition columns defined in table schema
b1e5079
to
87594d6
Compare
|
||
private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { | ||
val partitionColumnNames = table.partitionColumnNames.toSet | ||
val partitionSpecWithCase = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of changing the case, could you call sparkSession.sessionState.conf.resolver
? There are many examples in the code base. Thanks!
Test build #80806 has finished for PR 18421 at commit
|
…ndle conf.caseSensitiveAnalysis
Test build #80816 has finished for PR 18421 at commit
|
|
||
// Report an error if partition columns in partition specification do not form | ||
// a prefix of the list of partition columns defined in the table schema | ||
val isSpecified = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> isNotSpecified
private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { | ||
val normalizedPartitionSpec = | ||
PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames, | ||
table.identifier.quotedString, conf.resolver); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove ;
LGTM pending Jenkins |
Test build #80846 has finished for PR 18421 at commit
|
Thanks!!! Merging to master. |
@gatorsmile , so excited to see this change merged. Thank you for your careful detailed reviews. |
@mbasmanova Great work! I was really busy in the past two months so I didn't have time to look at this. |
@wzhfy Welcome back! |
What changes were proposed in this pull request?
Added support for ANALYZE TABLE [db_name].tablename PARTITION (partcol1[=val1], partcol2[=val2], ...) COMPUTE STATISTICS [NOSCAN] SQL command to calculate total number of rows and size in bytes for a subset of partitions. Calculated statistics are stored in Hive Metastore as user-defined properties attached to partition objects. Property names are the same as the ones used to store table-level statistics: spark.sql.statistics.totalSize and spark.sql.statistics.numRows.
When partition specification contains all partition columns with values, the command collects statistics for a single partition that matches the specification. When some partition columns are missing or listed without their values, the command collects statistics for all partitions which match a subset of partition column values specified.
For example, table t has 4 partitions with the following specs:
'ANALYZE TABLE t PARTITION (ds='2008-04-09', hr=11)' command will collect statistics only for partition 3.
'ANALYZE TABLE t PARTITION (ds='2008-04-09')' command will collect statistics for partitions 3 and 4.
'ANALYZE TABLE t PARTITION (ds, hr)' command will collect statistics for all four partitions.
When the optional parameter NOSCAN is specified, the command doesn't count number of rows and only gathers size in bytes.
The statistics gathered by ANALYZE TABLE command can be fetched using DESC EXTENDED [db_name.]tablename PARTITION command.
How was this patch tested?
Added tests.