Skip to content

Commit 8fbd93f

Browse files
committed
Fixes SPARK-8014
1 parent 0f80990 commit 8fbd93f

File tree

3 files changed

+62
-30
lines changed

3 files changed

+62
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ import org.apache.spark._
3030
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3131
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3232
import org.apache.spark.sql.catalyst.CatalystTypeConverters
33+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
3334
import org.apache.spark.sql.catalyst.expressions._
3435
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
35-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
36+
import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
3637
import org.apache.spark.sql.execution.RunnableCommand
3738
import org.apache.spark.sql.types.StructType
3839
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
@@ -94,10 +95,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
9495

9596
// We create a DataFrame by applying the schema of relation to the data to make sure.
9697
// We are writing data based on the expected schema,
97-
val df = sqlContext.createDataFrame(
98-
DataFrame(sqlContext, query).queryExecution.toRdd,
99-
relation.schema,
100-
needsConversion = false)
98+
val df = {
99+
// For partitioned relation r, r.schema's column ordering is different with the column
100+
// ordering of data.logicalPlan (partition columns are all moved after data column). We
101+
// need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
102+
// safely apply the schema of r.schema to the data.
103+
val project = Project(
104+
relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
105+
106+
sqlContext.createDataFrame(
107+
DataFrame(sqlContext, project).queryExecution.toRdd,
108+
relation.schema,
109+
needsConversion = false)
110+
}
101111

102112
val partitionColumns = relation.partitionColumns.fieldNames
103113
if (partitionColumns.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
2525
import org.apache.spark.Logging
2626
import org.apache.spark.deploy.SparkHadoopUtil
2727
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
28-
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
28+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2929
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
3030
import org.apache.spark.sql.catalyst.plans.logical._
3131
import org.apache.spark.sql.execution.RunnableCommand
@@ -322,19 +322,10 @@ private[sql] object ResolvedDataSource {
322322
Some(partitionColumnsSchema(data.schema, partitionColumns)),
323323
caseInsensitiveOptions)
324324

325-
// For partitioned relation r, r.schema's column ordering is different with the column
326-
// ordering of data.logicalPlan. We need a Project to adjust the ordering.
327-
// So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
328-
// the data.
329-
val project =
330-
Project(
331-
r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
332-
data.logicalPlan)
333-
334325
sqlContext.executePlan(
335326
InsertIntoHadoopFsRelation(
336327
r,
337-
project,
328+
data.logicalPlan,
338329
mode)).toRdd
339330
r
340331
case _ =>

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.sources
1919

20+
import java.io.File
21+
22+
import com.google.common.io.Files
2023
import org.apache.hadoop.fs.Path
2124

2225
import org.apache.spark.{SparkException, SparkFunSuite}
@@ -453,6 +456,20 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
453456
}
454457
}
455458
}
459+
460+
test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
461+
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
462+
463+
df.write
464+
.format(dataSourceName)
465+
.mode(SaveMode.Overwrite)
466+
.partitionBy("c", "a")
467+
.saveAsTable("t")
468+
469+
withTable("t") {
470+
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
471+
}
472+
}
456473
}
457474

458475
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -534,20 +551,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
534551
}
535552
}
536553

537-
test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
538-
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
539-
540-
df.write
541-
.format("parquet")
542-
.mode(SaveMode.Overwrite)
543-
.partitionBy("c", "a")
544-
.saveAsTable("t")
545-
546-
withTable("t") {
547-
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
548-
}
549-
}
550-
551554
test("SPARK-7868: _temporary directories should be ignored") {
552555
withTempPath { dir =>
553556
val df = Seq("a", "b", "c").zipWithIndex.toDF()
@@ -563,4 +566,32 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
563566
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
564567
}
565568
}
569+
570+
test("SPARK-8014: Avoid scanning output directory when SaveMode isn't SaveMode.Append") {
571+
withTempDir { dir =>
572+
val path = dir.getCanonicalPath
573+
val df = Seq(1 -> "a").toDF()
574+
575+
// Creates an arbitrary file. If this directory gets scanned, ParquetRelation2 will throw
576+
// since it's not a valid Parquet file.
577+
val emptyFile = new File(path, "empty")
578+
Files.createParentDirs(emptyFile)
579+
Files.touch(emptyFile)
580+
581+
// This shouldn't throw anything.
582+
df.write.format("parquet").mode(SaveMode.Ignore).save(path)
583+
584+
// This should only complain that the destination directory already exists, rather than file
585+
// "empty" is not a Parquet file.
586+
assert {
587+
intercept[RuntimeException] {
588+
df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
589+
}.getMessage.contains("already exists")
590+
}
591+
592+
// This shouldn't throw anything.
593+
df.write.format("parquet").mode(SaveMode.Overwrite).save(path)
594+
checkAnswer(read.format("parquet").load(path), df)
595+
}
596+
}
566597
}

0 commit comments

Comments
 (0)