Skip to content

Commit 3ce58cf

Browse files
committed
[SPARK-4553] [SPARK-5767] [SQL] Wires Parquet data source with the newly introduced write support for data source API
This PR migrates the Parquet data source to the new data source write support API. Now users can also overwriting and appending to existing tables. Notice that inserting into partitioned tables is not supported yet. When Parquet data source is enabled, insertion to Hive Metastore Parquet tables is also fullfilled by the Parquet data source. This is done by the newly introduced `HiveMetastoreCatalog.ParquetConversions` rule, which is a "proper" implementation of the original hacky `HiveStrategies.ParquetConversion`. The latter is still preserved, and can be removed together with the old Parquet support in the future. TODO: - [x] Update outdated comments in `newParquet.scala`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4563) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #4563 from liancheng/parquet-refining and squashes the following commits: fa98d27 [Cheng Lian] Fixes test cases which should disable off Parquet data source 2476e82 [Cheng Lian] Fixes compilation error introduced during rebasing a83d290 [Cheng Lian] Passes Hive Metastore partitioning information to ParquetRelation2
1 parent 199a9e8 commit 3ce58cf

File tree

12 files changed

+1148
-675
lines changed

12 files changed

+1148
-675
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
446446
baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
447447
} else {
448448
DataFrame(this, parquet.ParquetRelation(
449-
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
449+
(path +: paths).mkString(","), Some(sparkContext.hadoopConfiguration), this))
450450
}
451451

452452
/**

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,6 @@ private[parquet] object FileSystemHelper {
647647
sys.error("ERROR: attempting to append to set of Parquet files and found file" +
648648
s"that does not match name pattern: $other")
649649
case _ => 0
650-
}.reduceLeft((a, b) => if (a < b) b else a)
650+
}.reduceOption(_ max _).getOrElse(0)
651651
}
652652
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import scala.reflect.ClassTag
2323
import scala.reflect.runtime.universe.TypeTag
2424
import scala.util.Try
2525

26-
import org.apache.spark.sql.{DataFrame, SQLContext}
2726
import org.apache.spark.sql.catalyst.util
27+
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
2828
import org.apache.spark.util.Utils
2929

3030
/**
@@ -37,7 +37,8 @@ import org.apache.spark.util.Utils
3737
trait ParquetTest {
3838
val sqlContext: SQLContext
3939

40-
import sqlContext._
40+
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
41+
import sqlContext.{conf, sparkContext}
4142

4243
protected def configuration = sparkContext.hadoopConfiguration
4344

@@ -49,11 +50,11 @@ trait ParquetTest {
4950
*/
5051
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
5152
val (keys, values) = pairs.unzip
52-
val currentValues = keys.map(key => Try(getConf(key)).toOption)
53-
(keys, values).zipped.foreach(setConf)
53+
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
54+
(keys, values).zipped.foreach(conf.setConf)
5455
try f finally {
5556
keys.zip(currentValues).foreach {
56-
case (key, Some(value)) => setConf(key, value)
57+
case (key, Some(value)) => conf.setConf(key, value)
5758
case (key, None) => conf.unsetConf(key)
5859
}
5960
}
@@ -88,7 +89,6 @@ trait ParquetTest {
8889
protected def withParquetFile[T <: Product: ClassTag: TypeTag]
8990
(data: Seq[T])
9091
(f: String => Unit): Unit = {
91-
import sqlContext.implicits._
9292
withTempPath { file =>
9393
sparkContext.parallelize(data).toDF().saveAsParquetFile(file.getCanonicalPath)
9494
f(file.getCanonicalPath)
@@ -102,14 +102,14 @@ trait ParquetTest {
102102
protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
103103
(data: Seq[T])
104104
(f: DataFrame => Unit): Unit = {
105-
withParquetFile(data)(path => f(parquetFile(path)))
105+
withParquetFile(data)(path => f(sqlContext.parquetFile(path)))
106106
}
107107

108108
/**
109109
* Drops temporary table `tableName` after calling `f`.
110110
*/
111111
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
112-
try f finally dropTempTable(tableName)
112+
try f finally sqlContext.dropTempTable(tableName)
113113
}
114114

115115
/**
@@ -125,4 +125,26 @@ trait ParquetTest {
125125
withTempTable(tableName)(f)
126126
}
127127
}
128+
129+
protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
130+
data: Seq[T], path: File): Unit = {
131+
data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
132+
}
133+
134+
protected def makePartitionDir(
135+
basePath: File,
136+
defaultPartitionName: String,
137+
partitionCols: (String, Any)*): File = {
138+
val partNames = partitionCols.map { case (k, v) =>
139+
val valueString = if (v == null || v == "") defaultPartitionName else v.toString
140+
s"$k=$valueString"
141+
}
142+
143+
val partDir = partNames.foldLeft(basePath) { (parent, child) =>
144+
new File(parent, child)
145+
}
146+
147+
assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
148+
partDir
149+
}
128150
}

0 commit comments

Comments
 (0)