Skip to content

[SPARK-8014] [SQL] Avoid premature metadata discovery when writing a HadoopFsRelation with a save mode other than Append #6583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
Expand Down Expand Up @@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(

// We create a DataFrame by applying the schema of relation to the data to make sure.
// We are writing data based on the expected schema,
val df = sqlContext.createDataFrame(
DataFrame(sqlContext, query).queryExecution.toRdd,
relation.schema,
needsConversion = false)
val df = {
// For partitioned relation r, r.schema's column ordering is different with the column
// ordering of data.logicalPlan (partition columns are all moved after data column). We
// need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
// safely apply the schema of r.schema to the data.
val project = Project(
relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)

sqlContext.createDataFrame(
DataFrame(sqlContext, project).queryExecution.toRdd,
relation.schema,
needsConversion = false)
}

val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
Expand Down
13 changes: 2 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
Expand Down Expand Up @@ -322,19 +322,10 @@ private[sql] object ResolvedDataSource {
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)

// For partitioned relation r, r.schema's column ordering is different with the column
// ordering of data.logicalPlan. We need a Project to adjust the ordering.
// So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
// the data.
val project =
Project(
r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
data.logicalPlan)

sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
project,
data.logicalPlan,
mode)).toRdd
r
case _ =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This r.schema is where metadata discovery is triggered. This PR fixes this issue by moving this projection into InsertIntoHadoopFsRelation.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.sources

import java.io.File

import com.google.common.io.Files
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, SparkFunSuite}
Expand Down Expand Up @@ -453,6 +456,20 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")

df.write
.format(dataSourceName)
.mode(SaveMode.Overwrite)
.partitionBy("c", "a")
.saveAsTable("t")

withTable("t") {
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this test case here so that it gets executed for all test suite that extend HadoopFsRelationTest.

}

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
Expand Down Expand Up @@ -534,20 +551,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}

test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.partitionBy("c", "a")
.saveAsTable("t")

withTable("t") {
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
}
}

test("SPARK-7868: _temporary directories should be ignored") {
withTempPath { dir =>
val df = Seq("a", "b", "c").zipWithIndex.toDF()
Expand All @@ -563,4 +566,32 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
}
}

test("SPARK-8014: Avoid scanning output directory when SaveMode isn't SaveMode.Append") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val df = Seq(1 -> "a").toDF()

// Creates an arbitrary file. If this directory gets scanned, ParquetRelation2 will throw
// since it's not a valid Parquet file.
val emptyFile = new File(path, "empty")
Files.createParentDirs(emptyFile)
Files.touch(emptyFile)

// This shouldn't throw anything.
df.write.format("parquet").mode(SaveMode.Ignore).save(path)

// This should only complain that the destination directory already exists, rather than file
// "empty" is not a Parquet file.
assert {
intercept[RuntimeException] {
df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
}.getMessage.contains("already exists")
}

// This shouldn't throw anything.
df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
checkAnswer(read.format("parquet").load(path), df)
}
}
}