Skip to content

Commit 23ea898

Browse files
mbasmanovagatorsmile
authored andcommitted
[SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes
## What changes were proposed in this pull request? Added support for ANALYZE TABLE [db_name].tablename PARTITION (partcol1[=val1], partcol2[=val2], ...) COMPUTE STATISTICS [NOSCAN] SQL command to calculate total number of rows and size in bytes for a subset of partitions. Calculated statistics are stored in Hive Metastore as user-defined properties attached to partition objects. Property names are the same as the ones used to store table-level statistics: spark.sql.statistics.totalSize and spark.sql.statistics.numRows. When partition specification contains all partition columns with values, the command collects statistics for a single partition that matches the specification. When some partition columns are missing or listed without their values, the command collects statistics for all partitions which match a subset of partition column values specified. For example, table t has 4 partitions with the following specs: * Partition1: (ds='2008-04-08', hr=11) * Partition2: (ds='2008-04-08', hr=12) * Partition3: (ds='2008-04-09', hr=11) * Partition4: (ds='2008-04-09', hr=12) 'ANALYZE TABLE t PARTITION (ds='2008-04-09', hr=11)' command will collect statistics only for partition 3. 'ANALYZE TABLE t PARTITION (ds='2008-04-09')' command will collect statistics for partitions 3 and 4. 'ANALYZE TABLE t PARTITION (ds, hr)' command will collect statistics for all four partitions. When the optional parameter NOSCAN is specified, the command doesn't count number of rows and only gathers size in bytes. The statistics gathered by ANALYZE TABLE command can be fetched using DESC EXTENDED [db_name.]tablename PARTITION command. ## How was this patch tested? Added tests. Author: Masha Basmanova <[email protected]> Closes apache#18421 from mbasmanova/mbasmanova-analyze-partition.
1 parent 07a2b87 commit 23ea898

File tree

11 files changed

+888
-95
lines changed

11 files changed

+888
-95
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,14 @@ object CatalogStorageFormat {
9191
*
9292
* @param spec partition spec values indexed by column name
9393
* @param storage storage format of the partition
94-
* @param parameters some parameters for the partition, for example, stats.
94+
* @param parameters some parameters for the partition
95+
* @param stats optional statistics (number of rows, total size, etc.)
9596
*/
9697
case class CatalogTablePartition(
9798
spec: CatalogTypes.TablePartitionSpec,
9899
storage: CatalogStorageFormat,
99-
parameters: Map[String, String] = Map.empty) {
100+
parameters: Map[String, String] = Map.empty,
101+
stats: Option[CatalogStatistics] = None) {
100102

101103
def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
102104
val map = new mutable.LinkedHashMap[String, String]()
@@ -106,6 +108,7 @@ case class CatalogTablePartition(
106108
if (parameters.nonEmpty) {
107109
map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
108110
}
111+
stats.foreach(s => map.put("Partition Statistics", s.simpleString))
109112
map
110113
}
111114

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
9090
}
9191

9292
/**
93-
* Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command.
94-
* Example SQL for analyzing table :
93+
* Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]]
94+
* or an [[AnalyzeColumnCommand]] command.
95+
* Example SQL for analyzing a table or a set of partitions :
9596
* {{{
96-
* ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
97+
* ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
98+
* COMPUTE STATISTICS [NOSCAN];
9799
* }}}
100+
*
98101
* Example SQL for analyzing columns :
99102
* {{{
100-
* ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
103+
* ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
101104
* }}}
102105
*/
103106
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
104-
if (ctx.partitionSpec != null) {
105-
logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}")
107+
if (ctx.identifier != null &&
108+
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
109+
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
106110
}
107-
if (ctx.identifier != null) {
108-
if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
109-
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
111+
112+
val table = visitTableIdentifier(ctx.tableIdentifier)
113+
if (ctx.identifierSeq() == null) {
114+
if (ctx.partitionSpec != null) {
115+
AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
116+
noscan = ctx.identifier != null)
117+
} else {
118+
AnalyzeTableCommand(table, noscan = ctx.identifier != null)
110119
}
111-
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
112-
} else if (ctx.identifierSeq() == null) {
113-
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false)
114120
} else {
121+
if (ctx.partitionSpec != null) {
122+
logWarning("Partition specification is ignored when collecting column statistics: " +
123+
ctx.partitionSpec.getText)
124+
}
115125
AnalyzeColumnCommand(
116-
visitTableIdentifier(ctx.tableIdentifier),
126+
table,
117127
visitIdentifierSeq(ctx.identifierSeq()))
118128
}
119129
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command
19+
20+
import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
23+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
24+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
25+
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
26+
import org.apache.spark.sql.execution.datasources.PartitioningUtils
27+
28+
/**
29+
* Analyzes a given set of partitions to generate per-partition statistics, which will be used in
30+
* query optimizations.
31+
*
32+
* When `partitionSpec` is empty, statistics for all partitions are collected and stored in
33+
* Metastore.
34+
*
35+
* When `partitionSpec` mentions only some of the partition columns, all partitions with
36+
* matching values for specified columns are processed.
37+
*
38+
* If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised.
39+
*
40+
* By default, total number of rows and total size in bytes are calculated. When `noscan`
41+
* is `true`, only total size in bytes is computed.
42+
*/
43+
case class AnalyzePartitionCommand(
44+
tableIdent: TableIdentifier,
45+
partitionSpec: Map[String, Option[String]],
46+
noscan: Boolean = true) extends RunnableCommand {
47+
48+
private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = {
49+
val normalizedPartitionSpec =
50+
PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames,
51+
table.identifier.quotedString, conf.resolver)
52+
53+
// Report an error if partition columns in partition specification do not form
54+
// a prefix of the list of partition columns defined in the table schema
55+
val isNotSpecified =
56+
table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty)
57+
if (isNotSpecified.init.zip(isNotSpecified.tail).contains((true, false))) {
58+
val tableId = table.identifier
59+
val schemaColumns = table.partitionColumnNames.mkString(",")
60+
val specColumns = normalizedPartitionSpec.keys.mkString(",")
61+
throw new AnalysisException("The list of partition columns with values " +
62+
s"in partition specification for table '${tableId.table}' " +
63+
s"in database '${tableId.database.get}' is not a prefix of the list of " +
64+
"partition columns defined in the table schema. " +
65+
s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
66+
}
67+
68+
val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
69+
if (filteredSpec.isEmpty) {
70+
None
71+
} else {
72+
Some(filteredSpec)
73+
}
74+
}
75+
76+
override def run(sparkSession: SparkSession): Seq[Row] = {
77+
val sessionState = sparkSession.sessionState
78+
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
79+
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
80+
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
81+
if (tableMeta.tableType == CatalogTableType.VIEW) {
82+
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
83+
}
84+
85+
val partitionValueSpec = getPartitionSpec(tableMeta)
86+
87+
val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
88+
89+
if (partitions.isEmpty) {
90+
if (partitionValueSpec.isDefined) {
91+
throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
92+
} else {
93+
// the user requested to analyze all partitions for a table which has no partitions
94+
// return normally, since there is nothing to do
95+
return Seq.empty[Row]
96+
}
97+
}
98+
99+
// Compute statistics for individual partitions
100+
val rowCounts: Map[TablePartitionSpec, BigInt] =
101+
if (noscan) {
102+
Map.empty
103+
} else {
104+
calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
105+
}
106+
107+
// Update the metastore if newly computed statistics are different from those
108+
// recorded in the metastore.
109+
val newPartitions = partitions.flatMap { p =>
110+
val newTotalSize = CommandUtils.calculateLocationSize(
111+
sessionState, tableMeta.identifier, p.storage.locationUri)
112+
val newRowCount = rowCounts.get(p.spec)
113+
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
114+
newStats.map(_ => p.copy(stats = newStats))
115+
}
116+
117+
if (newPartitions.nonEmpty) {
118+
sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
119+
}
120+
121+
Seq.empty[Row]
122+
}
123+
124+
private def calculateRowCountsPerPartition(
125+
sparkSession: SparkSession,
126+
tableMeta: CatalogTable,
127+
partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = {
128+
val filter = if (partitionValueSpec.isDefined) {
129+
val filters = partitionValueSpec.get.map {
130+
case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
131+
}
132+
filters.reduce(And)
133+
} else {
134+
Literal.TrueLiteral
135+
}
136+
137+
val tableDf = sparkSession.table(tableMeta.identifier)
138+
val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))
139+
140+
val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count()
141+
142+
df.collect().map { r =>
143+
val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
144+
val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
145+
val count = BigInt(r.getLong(partitionColumns.size))
146+
(spec, count)
147+
}.toMap
148+
}
149+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2121
import org.apache.spark.sql.catalyst.TableIdentifier
22-
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
2323

2424

2525
/**
@@ -37,31 +37,15 @@ case class AnalyzeTableCommand(
3737
if (tableMeta.tableType == CatalogTableType.VIEW) {
3838
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
3939
}
40+
41+
// Compute stats for the whole table
4042
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
43+
val newRowCount =
44+
if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
4145

42-
val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
43-
val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
44-
var newStats: Option[CatalogStatistics] = None
45-
if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
46-
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
47-
}
48-
// We only set rowCount when noscan is false, because otherwise:
49-
// 1. when total size is not changed, we don't need to alter the table;
50-
// 2. when total size is changed, `oldRowCount` becomes invalid.
51-
// This is to make sure that we only record the right statistics.
52-
if (!noscan) {
53-
val newRowCount = sparkSession.table(tableIdentWithDB).count()
54-
if (newRowCount >= 0 && newRowCount != oldRowCount) {
55-
newStats = if (newStats.isDefined) {
56-
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
57-
} else {
58-
Some(CatalogStatistics(
59-
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
60-
}
61-
}
62-
}
6346
// Update the metastore if the above statistics of the table are different from those
6447
// recorded in the metastore.
48+
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
6549
if (newStats.isDefined) {
6650
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
6751
// Refresh the cached data source table in the catalog.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.SparkSession
2828
import org.apache.spark.sql.catalyst.TableIdentifier
29-
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
29+
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition}
3030
import org.apache.spark.sql.internal.SessionState
3131

3232

@@ -112,4 +112,29 @@ object CommandUtils extends Logging {
112112
size
113113
}
114114

115+
def compareAndGetNewStats(
116+
oldStats: Option[CatalogStatistics],
117+
newTotalSize: BigInt,
118+
newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
119+
val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(-1L)
120+
val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
121+
var newStats: Option[CatalogStatistics] = None
122+
if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
123+
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
124+
}
125+
// We only set rowCount when noscan is false, because otherwise:
126+
// 1. when total size is not changed, we don't need to alter the table;
127+
// 2. when total size is changed, `oldRowCount` becomes invalid.
128+
// This is to make sure that we only record the right statistics.
129+
if (newRowCount.isDefined) {
130+
if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) {
131+
newStats = if (newStats.isDefined) {
132+
newStats.map(_.copy(rowCount = newRowCount))
133+
} else {
134+
Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount))
135+
}
136+
}
137+
}
138+
newStats
139+
}
115140
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet
2+
PARTITIONED BY (ds, hr);
3+
4+
INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10)
5+
VALUES ('k1', 100), ('k2', 200), ('k3', 300);
6+
7+
INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11)
8+
VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401);
9+
10+
INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5)
11+
VALUES ('k1', 102), ('k2', 202);
12+
13+
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
14+
15+
-- Collect stats for a single partition
16+
ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS;
17+
18+
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
19+
20+
-- Collect stats for 2 partitions
21+
ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS;
22+
23+
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
24+
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
25+
26+
-- Collect stats for all partitions
27+
ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS;
28+
29+
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
30+
DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
31+
DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5);
32+
33+
-- DROP TEST TABLES/VIEWS
34+
DROP TABLE t;

0 commit comments

Comments
 (0)