Skip to content

Commit 21235dc

Browse files
committed
[SPARK-3131][SQL] Allow user to set parquet compression codec for writing ParquetFile in SQLContext
1 parent cd0720c commit 21235dc

File tree

3 files changed

+107
-5
lines changed

3 files changed

+107
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private[spark] object SQLConf {
3333
val DIALECT = "spark.sql.dialect"
3434
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
3535
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
36+
val PARQUET_COMPRESSION = "spark.sql.parquet.compression"
3637

3738
// This is only used for the thriftserver
3839
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -77,6 +78,9 @@ trait SQLConf {
7778
/** When true tables cached using the in-memory columnar caching will be compressed. */
7879
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
7980

81+
/** The compression codec for writing to a Parquetfile */
82+
private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "gzip")
83+
8084
/** The number of rows that will be */
8185
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
8286

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,13 @@ private[sql] object ParquetRelation {
100100
// The compression type
101101
type CompressionType = parquet.hadoop.metadata.CompressionCodecName
102102

103-
// The default compression
104-
val defaultCompression = CompressionCodecName.GZIP
103+
// The parquet compression short names
104+
val shortParquetCompressionCodecNames = Map(
105+
"NONE" -> CompressionCodecName.UNCOMPRESSED,
106+
"UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
107+
"SNAPPY" -> CompressionCodecName.SNAPPY,
108+
"GZIP" -> CompressionCodecName.GZIP,
109+
"LZO" -> CompressionCodecName.LZO)
105110

106111
/**
107112
* Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
@@ -141,9 +146,8 @@ private[sql] object ParquetRelation {
141146
conf: Configuration,
142147
sqlContext: SQLContext): ParquetRelation = {
143148
val path = checkPath(pathString, allowExisting, conf)
144-
if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
145-
conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
146-
}
149+
conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
150+
sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
147151
ParquetRelation.enableLogForwarding()
148152
ParquetTypesConverter.writeMetaData(attributes, path, conf)
149153
new ParquetRelation(path.toString, Some(conf), sqlContext) {

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
186186
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
187187
}
188188

189+
test("Compression options for writing to a Parquetfile") {
190+
val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
191+
import scala.collection.JavaConversions._
192+
193+
val file = getTempFilePath("parquet")
194+
val path = file.toString
195+
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
196+
.map(i => TestRDDEntry(i, s"val_$i"))
197+
198+
// test default compression codec (gzip)
199+
rdd.saveAsParquetFile(path)
200+
var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
201+
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
202+
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
203+
204+
parquetFile(path).registerTempTable("tmp")
205+
checkAnswer(
206+
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
207+
(5, "val_5") ::
208+
(7, "val_7") :: Nil)
209+
210+
Utils.deleteRecursively(file)
211+
212+
// test uncompressed parquet file with property value "UNCOMPRESSED"
213+
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")
214+
215+
rdd.saveAsParquetFile(path)
216+
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
217+
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
218+
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
219+
220+
parquetFile(path).registerTempTable("tmp")
221+
checkAnswer(
222+
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
223+
(5, "val_5") ::
224+
(7, "val_7") :: Nil)
225+
226+
Utils.deleteRecursively(file)
227+
228+
// test uncompressed parquet file with property value "none"
229+
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")
230+
231+
rdd.saveAsParquetFile(path)
232+
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
233+
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
234+
assert(actualCodec === "UNCOMPRESSED" :: Nil)
235+
236+
parquetFile(path).registerTempTable("tmp")
237+
checkAnswer(
238+
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
239+
(5, "val_5") ::
240+
(7, "val_7") :: Nil)
241+
242+
Utils.deleteRecursively(file)
243+
244+
// test gzip compression codec
245+
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
246+
247+
rdd.saveAsParquetFile(path)
248+
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
249+
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
250+
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
251+
252+
parquetFile(path).registerTempTable("tmp")
253+
checkAnswer(
254+
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
255+
(5, "val_5") ::
256+
(7, "val_7") :: Nil)
257+
258+
Utils.deleteRecursively(file)
259+
260+
// test snappy compression codec
261+
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")
262+
263+
rdd.saveAsParquetFile(path)
264+
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
265+
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
266+
assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
267+
268+
parquetFile(path).registerTempTable("tmp")
269+
checkAnswer(
270+
sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
271+
(5, "val_5") ::
272+
(7, "val_7") :: Nil)
273+
274+
Utils.deleteRecursively(file)
275+
276+
// TODO: Lzo requires additional external setup steps so leave it out for now
277+
// ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
278+
279+
// Set it back.
280+
TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
281+
}
282+
189283
test("Read/Write All Types with non-primitive type") {
190284
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
191285
val range = (0 to 255)

0 commit comments

Comments
 (0)