diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 4e9f800ec579a..de62eaa5981b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -126,6 +126,9 @@ private[sql] case class ParquetTableScan( conf) if (requestedPartitionOrdinals.nonEmpty) { + // This check if based on CatalystConverter.createRootConverter. + val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => val partValue = "([^=]+)=([^=]+)".r val partValues = @@ -143,37 +146,46 @@ private[sql] case class ParquetTableScan( relation.partitioningAttributes .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) - val mutableRow = new GenericMutableRow(output.size) - - new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { - iter.next() match { - case (_, row: SpecificMutableRow) => - // Parquet will leave partitioning columns empty, so we fill them in here. - var i = 0 - while (i < requestedPartitionOrdinals.size) { - row(requestedPartitionOrdinals(i)._2) = - partitionRowValues(requestedPartitionOrdinals(i)._1) - i += 1 - } - row - - case (_, row: Row) => - var i = 0 - while (i < row.size) { - mutableRow(i) = row(i) - i += 1 - } - - i = 0 - while (i < requestedPartitionOrdinals.size) { - mutableRow(requestedPartitionOrdinals(i)._2) = - partitionRowValues(requestedPartitionOrdinals(i)._1) - i += 1 - } - - mutableRow + if (primitiveRow) { + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val row = iter.next()._2.asInstanceOf[SpecificMutableRow] + + // Parquet will leave partitioning columns empty, so we fill them in here. + var i = 0 + while (i < requestedPartitionOrdinals.size) { + row(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + row + } + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(output.size) + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = iter.next()._2.asInstanceOf[Row] + + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + // Parquet will leave partitioning columns empty, so we fill them in here. + i = 0 + while (i < requestedPartitionOrdinals.size) { + mutableRow(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + mutableRow } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 1a90ae4637cea..33505f93adccc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -476,16 +476,21 @@ private[sql] case class ParquetRelation2( // When the data does not include the key and the key is requested then we must fill it in // based on information from the input split. if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) { + // This check if based on CatalystConverter.createRootConverter. + val primitiveRow = + requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) => val partValues = selectedPartitions.collectFirst { case p if split.getPath.getParent.toString == p.path => p.values }.get val requiredPartOrdinal = partitionKeyLocations.keys.toSeq - val mutableRow = new GenericMutableRow(requestedSchema.size) - iterator.map { - case (_, row: SpecificMutableRow) => + if (primitiveRow) { + iterator.map { pair => + // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. + val row = pair._2.asInstanceOf[SpecificMutableRow] var i = 0 while (i < requiredPartOrdinal.size) { // TODO Avoids boxing cost here! @@ -494,8 +499,14 @@ private[sql] case class ParquetRelation2( i += 1 } row - - case (_, row: Row) => + } + } else { + // Create a mutable row since we need to fill in values from partition columns. + val mutableRow = new GenericMutableRow(requestedSchema.size) + iterator.map { pair => + // We are using CatalystGroupConverter and it returns a GenericRow. + // Since GenericRow is not mutable, we just cast it to a Row. + val row = pair._2.asInstanceOf[Row] var i = 0 while (i < row.size) { // TODO Avoids boxing cost here! @@ -511,6 +522,7 @@ private[sql] case class ParquetRelation2( i += 1 } mutableRow + } } } } else {