Skip to content

[SPARK-10339] [SPARK-10334] [SPARK-10301] [SQL]Partitioned table scan can OOM driver and throw a better error message when users need to enable parquet schema merging #8515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
Expand Down Expand Up @@ -121,7 +122,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
projections: Seq[NamedExpression],
filters: Seq[Expression],
partitionColumns: StructType,
partitions: Array[Partition]) = {
partitions: Array[Partition]): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]

// Because we are creating one RDD per partition, we need to have a shared HadoopConf.
Expand All @@ -130,49 +131,51 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
// some partition column(s).
val scan =
pruneFilterProject(
logicalRelation,
projections,
filters,
(columns: Seq[Attribute], filters) => {
val partitionColNames = partitionColumns.fieldNames

// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val needed = columns.filterNot(a => partitionColNames.contains(a.name))
val dataRows =
relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)

// Merges data values with partition values.
mergeWithPartitionValues(
relation.schema,
columns.map(_.name).toArray,
partitionColNames,
partitionValues,
toCatalystRDD(logicalRelation, needed, dataRows))
})

scan.execute()
}
// Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
// will union all partitions and attach partition values if needed.
val scanBuilder = {
(columns: Seq[Attribute], filters: Array[Filter]) => {
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
val partitionColNames = partitionColumns.fieldNames

// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val needed = columns.filterNot(a => partitionColNames.contains(a.name))
val dataRows =
relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)

// Merges data values with partition values.
mergeWithPartitionValues(
relation.schema,
columns.map(_.name).toArray,
partitionColNames,
partitionValues,
toCatalystRDD(logicalRelation, needed, dataRows))
}

val unionedRows =
if (perPartitionRows.length == 0) {
relation.sqlContext.emptyResult
} else {
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
}

val unionedRows =
if (perPartitionRows.length == 0) {
relation.sqlContext.emptyResult
} else {
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
unionedRows
}
}

// Create the scan operator. If needed, add Filter and/or Project on top of the scan.
// The added Filter/Project is on top of the unioned RDD. We do not want to create
// one Filter/Project for every partition.
val sparkPlan = pruneFilterProject(
logicalRelation,
projections,
filters,
scanBuilder)

execution.PhysicalRDD.createFromDataSource(
projections.map(_.toAttribute),
unionedRows,
logicalRelation.relation)
sparkPlan
}

// TODO: refactor this thing. It is very complicated because it does projection internally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ private[parquet] class CatalystRowConverter(
}
}

if (paddedParquetFields.length != catalystType.length) {
throw new UnsupportedOperationException(
"A Parquet file's schema has different number of fields with the table schema. " +
"Please enable schema merging by setting \"mergeSchema\" to true when load " +
"a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: this is a quick fix version of #8509.

paddedParquetFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}


Expand Down Expand Up @@ -136,4 +136,17 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
assert(fs.exists(commonSummaryPath))
}
}

test("SPARK-10334 Projections and filters should be kept in physical plan") {
withTempPath { dir =>
val path = dir.getCanonicalPath

sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path)
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
val physicalPlan = df.queryExecution.executedPlan

assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
}
}
}