diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 072a4bcc42ee4..3bd086da9d251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -124,6 +124,13 @@ case class ParquetTableScan( conf) if (requestedPartitionOrdinals.nonEmpty) { + // This check is based on CatalystConverter.createRootConverter. + val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + + // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into + // the `mapPartitionsWithInputSplit` closure below. + val outputSize = output.size + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => val partValue = "([^=]+)=([^=]+)".r val partValues = @@ -141,19 +148,46 @@ case class ParquetTableScan( relation.partitioningAttributes .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) - new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { - val row = iter.next()._2.asInstanceOf[SpecificMutableRow] - - // Parquet will leave partitioning columns empty, so we fill them in here. - var i = 0 - while (i < requestedPartitionOrdinals.size) { - row(requestedPartitionOrdinals(i)._2) = - partitionRowValues(requestedPartitionOrdinals(i)._1) - i += 1 + // Parquet will leave partitioning columns empty, so we fill them in here. + if (primitiveRow) { + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val row = iter.next()._2.asInstanceOf[SpecificMutableRow] + + var i = 0 + while (i < requestedPartitionOrdinals.size) { + row(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + row + } + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(outputSize) + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = iter.next()._2.asInstanceOf[Row] + + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + i = 0 + while (i < requestedPartitionOrdinals.size) { + mutableRow(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + mutableRow } - row } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2e0c6c51c00e5..a7506c8c32af1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,9 +22,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate -import parquet.hadoop.ParquetInputFormat +import parquet.hadoop.{ParquetInputSplit, ParquetInputFormat} import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi @@ -40,7 +39,7 @@ import scala.collection.JavaConversions._ /** * Allows creation of parquet based tables using the syntax - * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option + * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ @@ -265,7 +264,10 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) // When the data does not include the key and the key is requested then we must fill it in // based on information from the input split. if (!dataIncludesKey && partitionKeyLocation != -1) { - baseRDD.mapPartitionsWithInputSplit { case (split, iter) => + val primitiveRow = + requestedSchema.toAttributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + + baseRDD.mapPartitionsWithInputSplit { case (split, iterator) => val partValue = "([^=]+)=([^=]+)".r val partValues = split.asInstanceOf[parquet.hadoop.ParquetInputSplit] @@ -273,15 +275,34 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) .toString .split("/") .flatMap { - case partValue(key, value) => Some(key -> value) - case _ => None - }.toMap - - val currentValue = partValues.values.head.toInt - iter.map { pair => - val res = pair._2.asInstanceOf[SpecificMutableRow] - res.setInt(partitionKeyLocation, currentValue) - res + case partValue(key, value) => Some(key -> value) + case _ => None } + .toMap + + if (primitiveRow) { + iterator.map { pair => + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val mutableRow = pair._2.asInstanceOf[SpecificMutableRow] + var i = 0 + mutableRow.update(partitionKeyLocation, partValues.values.head.toInt) + mutableRow + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(requestedSchema.toAttributes.size) + + iterator.map { pair => + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = pair._2.asInstanceOf[Row] + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + mutableRow.update(partitionKeyLocation, partValues.values.head.toInt) + mutableRow + } } } } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 06fe144666a9c..d788b70481a87 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -31,7 +31,20 @@ import org.apache.spark.sql.hive.test.TestHive._ case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class StructContainer(intStructField: Int, stringStructField: String) +case class ParquetDataWithComplexTypes( + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( + p: Int, + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the @@ -69,6 +82,38 @@ class ParquetMetastoreSuite extends ParquetTest { location '${partitionedTableDirWithKey.getCanonicalPath}' """) + sql(s""" + create external table partitioned_parquet_with_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT, + arrayField ARRAY + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + """) + + sql(s""" + create external table partitioned_parquet_with_key_and_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT, + arrayField ARRAY + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + """) + sql(s""" create external table normal_parquet ( @@ -90,10 +135,24 @@ class ParquetMetastoreSuite extends ParquetTest { sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") } + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)") + } + setConf("spark.sql.hive.convertMetastoreParquet", "true") } override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS partitioned_parquet") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_key") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_complextypes") + sql("DROP TABLE IF EXISTS partitioned_parquet_with_key_and_complextypes") + sql("DROP TABLE IF EXISTS normal_parquet") + setConf("spark.sql.hive.convertMetastoreParquet", "false") } @@ -139,6 +198,22 @@ class ParquetSourceSuite extends ParquetTest { path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' ) """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + ) + """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + ) + """) } } @@ -147,7 +222,10 @@ class ParquetSourceSuite extends ParquetTest { */ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null + var normalTableDir: File = null var partitionedTableDirWithKey: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + var partitionedTableDirWithComplexTypes: File = null override def beforeAll(): Unit = { partitionedTableDir = File.createTempFile("parquettests", "sparksql") @@ -161,6 +239,15 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { .saveAsParquetFile(partDir.getCanonicalPath) } + normalTableDir = File.createTempFile("parquettests", "sparksql") + normalTableDir.delete() + normalTableDir.mkdir() + + sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath) + partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") partitionedTableDirWithKey.delete() partitionedTableDirWithKey.mkdir() @@ -171,9 +258,46 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { .map(i => ParquetDataWithKey(p, i, s"part-$p")) .saveAsParquetFile(partDir.getCanonicalPath) } + + partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)) + .saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithComplexTypes( + i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)) + .saveAsParquetFile(partDir.getCanonicalPath) + } } - Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table => + override protected def afterAll(): Unit = { + //delete temporary files + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithComplexTypes.delete() + } + + Seq( + "partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes").foreach { table => test(s"ordering of the partitioning columns $table") { checkAnswer( sql(s"SELECT p, stringField FROM $table WHERE p = 1"), @@ -186,6 +310,8 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { ) } + + test(s"project the partitioning column $table") { checkAnswer( sql(s"SELECT p, count(*) FROM $table group by p"), @@ -263,6 +389,29 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { } } + Seq( + "partitioned_parquet_with_key_and_complextypes", + "partitioned_parquet_with_complextypes").foreach { table => + test(s"SPARK-5775 read structure from $table") { + checkAnswer( + sql(s""" + SELECT + p, + structField.intStructField, + structField.stringStructField + FROM $table + WHERE p = 1"""), + (1 to 10).map(i => Row(1, i, f"${i}_string"))) + } + + // Re-enable this after SPARK-5508 is fixed + ignore(s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1 to i, 1))) + } + } + test("non-part select(*)") { checkAnswer( sql("SELECT COUNT(*) FROM normal_parquet"),