Skip to content

Commit cbaf595

Browse files
lianchengyhuai
authored andcommitted
[SPARK-8014] [SQL] Avoid premature metadata discovery when writing a HadoopFsRelation with a save mode other than Append
The current code references the schema of the DataFrame to be written before checking save mode. This triggers expensive metadata discovery prematurely. For save mode other than `Append`, this metadata discovery is useless since we either ignore the result (for `Ignore` and `ErrorIfExists`) or delete existing files (for `Overwrite`) later. This PR fixes this issue by deferring metadata discovery after save mode checking. Author: Cheng Lian <[email protected]> Closes #6583 from liancheng/spark-8014 and squashes the following commits: 1aafabd [Cheng Lian] Updates comments 088abaa [Cheng Lian] Avoids schema merging and partition discovery when data schema and partition schema are defined 8fbd93f [Cheng Lian] Fixes SPARK-8014 (cherry picked from commit 686a45f) Signed-off-by: Yin Huai <[email protected]>
1 parent 815e056 commit cbaf595

File tree

5 files changed

+67
-32
lines changed

5 files changed

+67
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private[sql] class ParquetRelation2(
190190
}
191191
}
192192

193-
override def dataSchema: StructType = metadataCache.dataSchema
193+
override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
194194

195195
override private[sql] def refresh(): Unit = {
196196
super.refresh()

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ import org.apache.spark._
3030
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3131
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3232
import org.apache.spark.sql.catalyst.CatalystTypeConverters
33+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
3334
import org.apache.spark.sql.catalyst.expressions._
3435
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
35-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
36+
import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
3637
import org.apache.spark.sql.execution.RunnableCommand
3738
import org.apache.spark.sql.types.StructType
3839
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
@@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
9495

9596
// We create a DataFrame by applying the schema of relation to the data to make sure.
9697
// We are writing data based on the expected schema,
97-
val df = sqlContext.createDataFrame(
98-
DataFrame(sqlContext, query).queryExecution.toRdd,
99-
relation.schema,
100-
needsConversion = false)
98+
val df = {
99+
// For partitioned relation r, r.schema's column ordering can be different from the column
100+
// ordering of data.logicalPlan (partition columns are all moved after data column). We
101+
// need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
102+
// safely apply the schema of r.schema to the data.
103+
val project = Project(
104+
relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
105+
106+
sqlContext.createDataFrame(
107+
DataFrame(sqlContext, project).queryExecution.toRdd,
108+
relation.schema,
109+
needsConversion = false)
110+
}
101111

102112
val partitionColumns = relation.partitionColumns.fieldNames
103113
if (partitionColumns.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
2525
import org.apache.spark.Logging
2626
import org.apache.spark.deploy.SparkHadoopUtil
2727
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
28-
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
28+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2929
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
3030
import org.apache.spark.sql.catalyst.plans.logical._
3131
import org.apache.spark.sql.execution.RunnableCommand
@@ -322,19 +322,13 @@ private[sql] object ResolvedDataSource {
322322
Some(partitionColumnsSchema(data.schema, partitionColumns)),
323323
caseInsensitiveOptions)
324324

325-
// For partitioned relation r, r.schema's column ordering is different with the column
326-
// ordering of data.logicalPlan. We need a Project to adjust the ordering.
327-
// So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
328-
// the data.
329-
val project =
330-
Project(
331-
r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
332-
data.logicalPlan)
333-
325+
// For partitioned relation r, r.schema's column ordering can be different from the column
326+
// ordering of data.logicalPlan (partition columns are all moved after data column). This
327+
// will be adjusted within InsertIntoHadoopFsRelation.
334328
sqlContext.executePlan(
335329
InsertIntoHadoopFsRelation(
336330
r,
337-
project,
331+
data.logicalPlan,
338332
mode)).toRdd
339333
r
340334
case _ =>

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
503503
*/
504504
override lazy val schema: StructType = {
505505
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
506-
StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column =>
506+
StructType(dataSchema ++ partitionColumns.filterNot { column =>
507507
dataSchemaColumnNames.contains(column.name.toLowerCase)
508508
})
509509
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.sources
1919

20+
import java.io.File
21+
22+
import com.google.common.io.Files
2023
import org.apache.hadoop.fs.Path
2124
import org.scalatest.FunSuite
2225

@@ -454,6 +457,20 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
454457
}
455458
}
456459
}
460+
461+
test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
462+
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
463+
464+
df.write
465+
.format(dataSourceName)
466+
.mode(SaveMode.Overwrite)
467+
.partitionBy("c", "a")
468+
.saveAsTable("t")
469+
470+
withTable("t") {
471+
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
472+
}
473+
}
457474
}
458475

459476
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -535,20 +552,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
535552
}
536553
}
537554

538-
test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
539-
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
540-
541-
df.write
542-
.format("parquet")
543-
.mode(SaveMode.Overwrite)
544-
.partitionBy("c", "a")
545-
.saveAsTable("t")
546-
547-
withTable("t") {
548-
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
549-
}
550-
}
551-
552555
test("SPARK-7868: _temporary directories should be ignored") {
553556
withTempPath { dir =>
554557
val df = Seq("a", "b", "c").zipWithIndex.toDF()
@@ -564,4 +567,32 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
564567
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
565568
}
566569
}
570+
571+
test("SPARK-8014: Avoid scanning output directory when SaveMode isn't SaveMode.Append") {
572+
withTempDir { dir =>
573+
val path = dir.getCanonicalPath
574+
val df = Seq(1 -> "a").toDF()
575+
576+
// Creates an arbitrary file. If this directory gets scanned, ParquetRelation2 will throw
577+
// since it's not a valid Parquet file.
578+
val emptyFile = new File(path, "empty")
579+
Files.createParentDirs(emptyFile)
580+
Files.touch(emptyFile)
581+
582+
// This shouldn't throw anything.
583+
df.write.format("parquet").mode(SaveMode.Ignore).save(path)
584+
585+
// This should only complain that the destination directory already exists, rather than file
586+
// "empty" is not a Parquet file.
587+
assert {
588+
intercept[RuntimeException] {
589+
df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
590+
}.getMessage.contains("already exists")
591+
}
592+
593+
// This shouldn't throw anything.
594+
df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
595+
checkAnswer(read.format("parquet").load(path), df)
596+
}
597+
}
567598
}

0 commit comments

Comments
 (0)