Skip to content

Commit 3a61e10

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-10339] [SPARK-10334] [SPARK-10301] [SQL] Partitioned table scan can OOM driver and throw a better error message when users need to enable parquet schema merging
This fixes the problem that scanning partitioned table causes driver have a high memory pressure and takes down the cluster. Also, with this fix, we will be able to correctly show the query plan of a query consuming partitioned tables. https://issues.apache.org/jira/browse/SPARK-10339 https://issues.apache.org/jira/browse/SPARK-10334 Finally, this PR squeeze in a "quick fix" for SPARK-10301. It is not a real fix, but it just throw a better error message to let user know what to do. Author: Yin Huai <[email protected]> Closes #8515 from yhuai/partitionedTableScan. (cherry picked from commit 097a7e3) Signed-off-by: Michael Armbrust <[email protected]>
1 parent d178e1e commit 3a61e10

File tree

3 files changed

+65
-42
lines changed

3 files changed

+65
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2727
import org.apache.spark.sql.catalyst.plans.logical
2828
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
29+
import org.apache.spark.sql.execution.SparkPlan
2930
import org.apache.spark.sql.sources._
3031
import org.apache.spark.sql.types.{StringType, StructType}
3132
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
@@ -121,7 +122,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
121122
projections: Seq[NamedExpression],
122123
filters: Seq[Expression],
123124
partitionColumns: StructType,
124-
partitions: Array[Partition]) = {
125+
partitions: Array[Partition]): SparkPlan = {
125126
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
126127

127128
// Because we are creating one RDD per partition, we need to have a shared HadoopConf.
@@ -130,49 +131,51 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
130131
val confBroadcast =
131132
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
132133

133-
// Builds RDD[Row]s for each selected partition.
134-
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
135-
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
136-
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
137-
// some partition column(s).
138-
val scan =
139-
pruneFilterProject(
140-
logicalRelation,
141-
projections,
142-
filters,
143-
(columns: Seq[Attribute], filters) => {
144-
val partitionColNames = partitionColumns.fieldNames
145-
146-
// Don't scan any partition columns to save I/O. Here we are being optimistic and
147-
// assuming partition columns data stored in data files are always consistent with those
148-
// partition values encoded in partition directory paths.
149-
val needed = columns.filterNot(a => partitionColNames.contains(a.name))
150-
val dataRows =
151-
relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
152-
153-
// Merges data values with partition values.
154-
mergeWithPartitionValues(
155-
relation.schema,
156-
columns.map(_.name).toArray,
157-
partitionColNames,
158-
partitionValues,
159-
toCatalystRDD(logicalRelation, needed, dataRows))
160-
})
161-
162-
scan.execute()
163-
}
134+
// Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
135+
// will union all partitions and attach partition values if needed.
136+
val scanBuilder = {
137+
(columns: Seq[Attribute], filters: Array[Filter]) => {
138+
// Builds RDD[Row]s for each selected partition.
139+
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
140+
val partitionColNames = partitionColumns.fieldNames
141+
142+
// Don't scan any partition columns to save I/O. Here we are being optimistic and
143+
// assuming partition columns data stored in data files are always consistent with those
144+
// partition values encoded in partition directory paths.
145+
val needed = columns.filterNot(a => partitionColNames.contains(a.name))
146+
val dataRows =
147+
relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
148+
149+
// Merges data values with partition values.
150+
mergeWithPartitionValues(
151+
relation.schema,
152+
columns.map(_.name).toArray,
153+
partitionColNames,
154+
partitionValues,
155+
toCatalystRDD(logicalRelation, needed, dataRows))
156+
}
157+
158+
val unionedRows =
159+
if (perPartitionRows.length == 0) {
160+
relation.sqlContext.emptyResult
161+
} else {
162+
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
163+
}
164164

165-
val unionedRows =
166-
if (perPartitionRows.length == 0) {
167-
relation.sqlContext.emptyResult
168-
} else {
169-
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
165+
unionedRows
170166
}
167+
}
168+
169+
// Create the scan operator. If needed, add Filter and/or Project on top of the scan.
170+
// The added Filter/Project is on top of the unioned RDD. We do not want to create
171+
// one Filter/Project for every partition.
172+
val sparkPlan = pruneFilterProject(
173+
logicalRelation,
174+
projections,
175+
filters,
176+
scanBuilder)
171177

172-
execution.PhysicalRDD.createFromDataSource(
173-
projections.map(_.toAttribute),
174-
unionedRows,
175-
logicalRelation.relation)
178+
sparkPlan
176179
}
177180

178181
// TODO: refactor this thing. It is very complicated because it does projection internally.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,13 @@ private[parquet] class CatalystRowConverter(
196196
}
197197
}
198198

199+
if (paddedParquetFields.length != catalystType.length) {
200+
throw new UnsupportedOperationException(
201+
"A Parquet file's schema has different number of fields with the table schema. " +
202+
"Please enable schema merging by setting \"mergeSchema\" to true when load " +
203+
"a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
204+
}
205+
199206
paddedParquetFields.zip(catalystType).zipWithIndex.map {
200207
case ((parquetFieldType, catalystField), ordinal) =>
201208
// Converted field value should be set to the `ordinal`-th cell of `currentRow`

sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.common.io.Files
2323
import org.apache.hadoop.fs.Path
2424

2525
import org.apache.spark.deploy.SparkHadoopUtil
26-
import org.apache.spark.sql.{AnalysisException, SaveMode}
26+
import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
2727
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2828

2929

@@ -136,4 +136,17 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
136136
assert(fs.exists(commonSummaryPath))
137137
}
138138
}
139+
140+
test("SPARK-10334 Projections and filters should be kept in physical plan") {
141+
withTempPath { dir =>
142+
val path = dir.getCanonicalPath
143+
144+
sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path)
145+
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
146+
val physicalPlan = df.queryExecution.executedPlan
147+
148+
assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
149+
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
150+
}
151+
}
139152
}

0 commit comments

Comments
 (0)