Skip to content

Commit fca16c5

Browse files
lianchengyhuai
authored andcommitted
[SPARK-10301] [SPARK-10428] [SQL] [BRANCH-1.5] Fixes schema merging for nested structs
We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR apache#8515), but it doesn't cover the case described in SPARK-10428. So this PR backports PR apache#8509, which had once been considered too big a change to be merged into branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for Spark 1.5. Also added more test cases for SPARK-10428. This PR looks big, but the essential change is only ~200 loc. All other changes are for testing. Especially, PR apache#8454 is also backported here because the `ParquetInteroperabilitySuite` introduced in PR apache#8515 depends on it. This should be safe since apache#8454 only touches testing code. Author: Cheng Lian <[email protected]> Closes apache#8583 from liancheng/spark-10301/for-1.5.
1 parent 63c72b9 commit fca16c5

File tree

7 files changed

+1233
-128
lines changed

7 files changed

+1233
-128
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 176 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.util.{Map => JMap}
2121

22-
import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap}
22+
import scala.collection.JavaConverters._
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
2626
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
2727
import org.apache.parquet.io.api.RecordMaterializer
28-
import org.apache.parquet.schema.MessageType
28+
import org.apache.parquet.schema.Type.Repetition
29+
import org.apache.parquet.schema._
2930

3031
import org.apache.spark.Logging
3132
import org.apache.spark.sql.catalyst.InternalRow
32-
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.sql.types._
3334

3435
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
3536
// Called after `init()` when initializing Parquet record reader.
@@ -44,7 +45,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
4445
val parquetRequestedSchema = readContext.getRequestedSchema
4546

4647
val catalystRequestedSchema =
47-
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
48+
Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata =>
4849
metadata
4950
// First tries to read requested schema, which may result from projections
5051
.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
@@ -81,83 +82,191 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
8182
// `StructType` containing all requested columns.
8283
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
8384

84-
// Below we construct a Parquet schema containing all requested columns. This schema tells
85-
// Parquet which columns to read.
86-
//
87-
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
88-
// we have to fallback to the full file schema which contains all columns in the file.
89-
// Obviously this may waste IO bandwidth since it may read more columns than requested.
90-
//
91-
// Two things to note:
92-
//
93-
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
94-
// example, in the case of schema merging, the globally merged schema may contain extra
95-
// columns gathered from other Parquet files. These columns will be simply filled with nulls
96-
// when actually reading the target Parquet file.
97-
//
98-
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
99-
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
100-
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
101-
// containing a single integer array field `f1` may have the following legacy 2-level
102-
// structure:
103-
//
104-
// message root {
105-
// optional group f1 (LIST) {
106-
// required INT32 element;
107-
// }
108-
// }
109-
//
110-
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
111-
//
112-
// message root {
113-
// optional group f1 (LIST) {
114-
// repeated group list {
115-
// required INT32 element;
116-
// }
117-
// }
118-
// }
119-
//
120-
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
121-
// different physical structures.
12285
val parquetRequestedSchema =
12386
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
124-
val toParquet = new CatalystSchemaConverter(conf)
125-
val fileSchema = context.getFileSchema.asGroupType()
126-
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
127-
128-
StructType
129-
// Deserializes the Catalyst schema of requested columns
130-
.fromString(schemaString)
131-
.map { field =>
132-
if (fileFieldNames.contains(field.name)) {
133-
// If the field exists in the target Parquet file, extracts the field type from the
134-
// full file schema and makes a single-field Parquet schema
135-
new MessageType("root", fileSchema.getType(field.name))
136-
} else {
137-
// Otherwise, just resorts to `CatalystSchemaConverter`
138-
toParquet.convert(StructType(Array(field)))
139-
}
140-
}
141-
// Merges all single-field Parquet schemas to form a complete schema for all requested
142-
// columns. Note that it's possible that no columns are requested at all (e.g., count
143-
// some partition column of a partitioned Parquet table). That's why `fold` is used here
144-
// and always fallback to an empty Parquet schema.
145-
.fold(new MessageType("root")) {
146-
_ union _
147-
}
87+
val catalystRequestedSchema = StructType.fromString(schemaString)
88+
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
14889
}
14990

15091
val metadata =
15192
Map.empty[String, String] ++
15293
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
15394
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
15495

155-
new ReadContext(parquetRequestedSchema, metadata)
96+
new ReadContext(parquetRequestedSchema, metadata.asJava)
15697
}
15798
}
15899

159100
private[parquet] object CatalystReadSupport {
160101
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
161102

162103
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
104+
105+
/**
106+
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
107+
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
108+
*/
109+
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
110+
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
111+
Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
112+
}
113+
114+
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
115+
catalystType match {
116+
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
117+
// Only clips array types with nested type as element type.
118+
clipParquetListType(parquetType.asGroupType(), t.elementType)
119+
120+
case t: MapType
121+
if !isPrimitiveCatalystType(t.keyType) ||
122+
!isPrimitiveCatalystType(t.valueType) =>
123+
// Only clips map types with nested key type or value type
124+
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
125+
126+
case t: StructType =>
127+
clipParquetGroup(parquetType.asGroupType(), t)
128+
129+
case _ =>
130+
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
131+
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.
132+
parquetType
133+
}
134+
}
135+
136+
/**
137+
* Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to
138+
* [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an
139+
* [[AtomicType]].
140+
*/
141+
private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
142+
dataType match {
143+
case _: ArrayType | _: MapType | _: StructType => false
144+
case _ => true
145+
}
146+
}
147+
148+
/**
149+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type
150+
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
151+
* [[StructType]].
152+
*/
153+
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
154+
// Precondition of this method, should only be called for lists with nested element types.
155+
assert(!isPrimitiveCatalystType(elementType))
156+
157+
// Unannotated repeated group should be interpreted as required list of required element, so
158+
// list element type is just the group itself. Clip it.
159+
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
160+
clipParquetType(parquetList, elementType)
161+
} else {
162+
assert(
163+
parquetList.getOriginalType == OriginalType.LIST,
164+
"Invalid Parquet schema. " +
165+
"Original type of annotated Parquet lists must be LIST: " +
166+
parquetList.toString)
167+
168+
assert(
169+
parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED),
170+
"Invalid Parquet schema. " +
171+
"LIST-annotated group should only have exactly one repeated field: " +
172+
parquetList)
173+
174+
// Precondition of this method, should only be called for lists with nested element types.
175+
assert(!parquetList.getType(0).isPrimitive)
176+
177+
val repeatedGroup = parquetList.getType(0).asGroupType()
178+
179+
// If the repeated field is a group with multiple fields, or the repeated field is a group
180+
// with one field and is named either "array" or uses the LIST-annotated group's name with
181+
// "_tuple" appended then the repeated type is the element type and elements are required.
182+
// Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
183+
// only field.
184+
if (
185+
repeatedGroup.getFieldCount > 1 ||
186+
repeatedGroup.getName == "array" ||
187+
repeatedGroup.getName == parquetList.getName + "_tuple"
188+
) {
189+
Types
190+
.buildGroup(parquetList.getRepetition)
191+
.as(OriginalType.LIST)
192+
.addField(clipParquetType(repeatedGroup, elementType))
193+
.named(parquetList.getName)
194+
} else {
195+
// Otherwise, the repeated field's type is the element type with the repeated field's
196+
// repetition.
197+
Types
198+
.buildGroup(parquetList.getRepetition)
199+
.as(OriginalType.LIST)
200+
.addField(
201+
Types
202+
.repeatedGroup()
203+
.addField(clipParquetType(repeatedGroup.getType(0), elementType))
204+
.named(repeatedGroup.getName))
205+
.named(parquetList.getName)
206+
}
207+
}
208+
}
209+
210+
/**
211+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or
212+
* value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or
213+
* a [[StructType]].
214+
*/
215+
private def clipParquetMapType(
216+
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
217+
// Precondition of this method, only handles maps with nested key types or value types.
218+
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
219+
220+
val repeatedGroup = parquetMap.getType(0).asGroupType()
221+
val parquetKeyType = repeatedGroup.getType(0)
222+
val parquetValueType = repeatedGroup.getType(1)
223+
224+
val clippedRepeatedGroup =
225+
Types
226+
.repeatedGroup()
227+
.as(repeatedGroup.getOriginalType)
228+
.addField(clipParquetType(parquetKeyType, keyType))
229+
.addField(clipParquetType(parquetValueType, valueType))
230+
.named(repeatedGroup.getName)
231+
232+
Types
233+
.buildGroup(parquetMap.getRepetition)
234+
.as(parquetMap.getOriginalType)
235+
.addField(clippedRepeatedGroup)
236+
.named(parquetMap.getName)
237+
}
238+
239+
/**
240+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
241+
*
242+
* @return A clipped [[GroupType]], which has at least one field.
243+
* @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty
244+
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
245+
* pruning.
246+
*/
247+
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
248+
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
249+
Types
250+
.buildGroup(parquetRecord.getRepetition)
251+
.as(parquetRecord.getOriginalType)
252+
.addFields(clippedParquetFields: _*)
253+
.named(parquetRecord.getName)
254+
}
255+
256+
/**
257+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
258+
*
259+
* @return A list of clipped [[GroupType]] fields, which can be empty.
260+
*/
261+
private def clipParquetGroupFields(
262+
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
263+
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
264+
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
265+
structType.map { f =>
266+
parquetFieldMap
267+
.get(f.name)
268+
.map(clipParquetType(_, f.dataType))
269+
.getOrElse(toParquet.convertField(f))
270+
}
271+
}
163272
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
2020
import java.math.{BigDecimal, BigInteger}
2121
import java.nio.ByteOrder
2222

23-
import scala.collection.JavaConversions._
23+
import scala.collection.JavaConverters._
2424
import scala.collection.mutable.ArrayBuffer
2525

2626
import org.apache.parquet.column.Dictionary
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
113113
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
114114
* any "parent" container.
115115
*
116-
* @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
117-
* Parquet file being read, while constructor argument [[catalystType]] refers to requested
118-
* fields of the global schema. The key difference is that, in case of schema merging,
119-
* [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
120-
* the following [[catalystType]]:
121-
* {{{
122-
* new StructType()
123-
* .add("f1", IntegerType, nullable = false)
124-
* .add("f2", StringType, nullable = true)
125-
* .add("f3", new StructType()
126-
* .add("f31", DoubleType, nullable = false)
127-
* .add("f32", IntegerType, nullable = true)
128-
* .add("f33", StringType, nullable = true), nullable = false)
129-
* }}}
130-
* and the following [[parquetType]] (`f2` and `f32` are missing):
131-
* {{{
132-
* message root {
133-
* required int32 f1;
134-
* required group f3 {
135-
* required double f31;
136-
* optional binary f33 (utf8);
137-
* }
138-
* }
139-
* }}}
140-
*
141116
* @param parquetType Parquet schema of Parquet records
142117
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
143118
* @param updater An updater which propagates converted field values to the parent container
@@ -148,6 +123,16 @@ private[parquet] class CatalystRowConverter(
148123
updater: ParentContainerUpdater)
149124
extends CatalystGroupConverter(updater) with Logging {
150125

126+
assert(
127+
parquetType.getFieldCount == catalystType.length,
128+
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
129+
|
130+
|Parquet schema:
131+
|$parquetType
132+
|Catalyst schema:
133+
|${catalystType.prettyJson}
134+
""".stripMargin)
135+
151136
logDebug(
152137
s"""Building row converter for the following schema:
153138
|
@@ -179,31 +164,7 @@ private[parquet] class CatalystRowConverter(
179164

180165
// Converters for each field.
181166
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
182-
// In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
183-
// those missing fields and create converters for them, although values of these fields are
184-
// always null.
185-
val paddedParquetFields = {
186-
val parquetFields = parquetType.getFields
187-
val parquetFieldNames = parquetFields.map(_.getName).toSet
188-
val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
189-
190-
// We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
191-
// creating the schema converter here, since values of missing fields are always null.
192-
val toParquet = new CatalystSchemaConverter()
193-
194-
(parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
195-
catalystType.indexWhere(_.name == f.getName)
196-
}
197-
}
198-
199-
if (paddedParquetFields.length != catalystType.length) {
200-
throw new UnsupportedOperationException(
201-
"A Parquet file's schema has different number of fields with the table schema. " +
202-
"Please enable schema merging by setting \"mergeSchema\" to true when load " +
203-
"a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
204-
}
205-
206-
paddedParquetFields.zip(catalystType).zipWithIndex.map {
167+
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
207168
case ((parquetFieldType, catalystField), ordinal) =>
208169
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
209170
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
5555
* to old style non-standard behaviors.
5656
*/
5757
private[parquet] class CatalystSchemaConverter(
58-
private val assumeBinaryIsString: Boolean,
59-
private val assumeInt96IsTimestamp: Boolean,
60-
private val followParquetFormatSpec: Boolean) {
61-
62-
// Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
63-
// which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
64-
def this() = this(
65-
assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
66-
assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
67-
followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
58+
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
59+
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
60+
followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
61+
) {
6862

6963
def this(conf: SQLConf) = this(
7064
assumeBinaryIsString = conf.isParquetBinaryAsString,

0 commit comments

Comments
 (0)