Skip to content

Commit 2bba89d

Browse files
authored
Dictionary decoding for int64 timestamps (apache#86)
1 parent 2ecad5d commit 2bba89d

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
244244

245245
case INT64:
246246
if (column.dataType() == DataTypes.LongType ||
247+
column.dataType() == DataTypes.TimestampType ||
247248
DecimalType.is64BitDecimalType(column.dataType())) {
248249
for (int i = rowId; i < rowId + num; ++i) {
249250
if (!column.isNullAt(i)) {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,27 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
811811
}
812812
}
813813
}
814+
815+
test("Timestamp INT64 Dictionary encoding") {
816+
val data = (1 to 1000).map { i =>
817+
if (i < 500) {
818+
Row(new java.sql.Timestamp(10))
819+
} else {
820+
Row(new java.sql.Timestamp(i))
821+
}
822+
}
823+
val schema = StructType(List(StructField("time", TimestampType, false)).toArray)
824+
withSQLConf(ParquetOutputFormat.DICTIONARY_PAGE_SIZE -> "64",
825+
ParquetOutputFormat.PAGE_SIZE -> "128",
826+
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
827+
withTempPath { file =>
828+
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
829+
df.coalesce(1).write.parquet(file.getCanonicalPath)
830+
val df2 = spark.read.parquet(file.getCanonicalPath)
831+
checkAnswer(df2, df.collect().toSeq)
832+
}
833+
}
834+
}
814835
}
815836

816837
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

0 commit comments

Comments
 (0)