Skip to content

Commit 6dfc4a7

Browse files
viiryarxin
authored andcommitted
[SPARK-13537][SQL] Fix readBytes in VectorizedPlainValuesReader
JIRA: https://issues.apache.org/jira/browse/SPARK-13537 ## What changes were proposed in this pull request? In readBytes of VectorizedPlainValuesReader, we use buffer[offset] to access bytes in buffer. It is incorrect because offset is added with Platform.BYTE_ARRAY_OFFSET when initialization. We should fix it. ## How was this patch tested? `ParquetHadoopFsRelationSuite` sometimes (depending on the randomly generated data) will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52136/consoleFull) by this bug. After applying this, the test can be passed. I added a test to `ParquetHadoopFsRelationSuite` with the data which will fail without this patch. The error exception: [info] ParquetHadoopFsRelationSuite: [info] - test all data types - StringType (440 milliseconds) [info] - test all data types - BinaryType (434 milliseconds) [info] - test all data types - BooleanType (406 milliseconds) 20:59:38.618 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 2597.0 (TID 67966) java.lang.ArrayIndexOutOfBoundsException: 46 at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBytes(VectorizedPlainValuesReader.java:88) Author: Liang-Chi Hsieh <[email protected]> Closes #11418 from viirya/fix-readbytes.
1 parent 9e01dcc commit 6dfc4a7

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public final void readBytes(int total, ColumnVector c, int rowId) {
8585
for (int i = 0; i < total; i++) {
8686
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
8787
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
88-
c.putByte(rowId + i, buffer[offset]);
88+
c.putByte(rowId + i, Platform.getByte(buffer, offset));
8989
offset += 4;
9090
}
9191
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,37 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
175175
}
176176
}
177177
}
178+
179+
test(s"SPARK-13537: Fix readBytes in VectorizedPlainValuesReader") {
180+
withTempPath { file =>
181+
val path = file.getCanonicalPath
182+
183+
val schema = new StructType()
184+
.add("index", IntegerType, nullable = false)
185+
.add("col", ByteType, nullable = true)
186+
187+
val data = Seq(Row(1, -33.toByte), Row(2, 0.toByte), Row(3, -55.toByte), Row(4, 56.toByte),
188+
Row(5, 127.toByte), Row(6, -44.toByte), Row(7, 23.toByte), Row(8, -95.toByte),
189+
Row(9, 127.toByte), Row(10, 13.toByte))
190+
191+
val rdd = sqlContext.sparkContext.parallelize(data)
192+
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
193+
194+
df.write
195+
.mode("overwrite")
196+
.format(dataSourceName)
197+
.option("dataSchema", df.schema.json)
198+
.save(path)
199+
200+
val loadedDF = sqlContext
201+
.read
202+
.format(dataSourceName)
203+
.option("dataSchema", df.schema.json)
204+
.schema(df.schema)
205+
.load(path)
206+
.orderBy("index")
207+
208+
checkAnswer(loadedDF, df)
209+
}
210+
}
178211
}

0 commit comments

Comments
 (0)