From d092998e8328bdcc03678355b3597bdef82d1467 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 4 Mar 2025 16:08:26 -0800 Subject: [PATCH] Parquet: Support unknown and timestamp(9) in internal model and generics. --- .../iceberg/data/parquet/TestGenericData.java | 19 +++- .../data/parquet/BaseParquetReaders.java | 53 ++-------- .../data/parquet/BaseParquetWriter.java | 25 ++--- .../data/parquet/GenericParquetReaders.java | 99 ++++++++++++++++--- .../data/parquet/GenericParquetWriter.java | 70 +++++++++++-- .../iceberg/parquet/ParquetValueReaders.java | 1 + .../iceberg/parquet/TypeToMessageType.java | 41 +++++++- .../iceberg/parquet/TestInternalParquet.java | 10 ++ 8 files changed, 221 insertions(+), 97 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index e0a02df97bc0..197caf1f674b 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -46,6 +46,20 @@ import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { + @Override + protected boolean supportsDefaultValues() { + return true; + } + + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } @Override protected void writeAndValidate(Schema schema) throws IOException { @@ -107,11 +121,6 @@ private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); - protected ParquetValueReader fixedReader(ColumnDescriptor desc) { - return new GenericParquetReaders.FixedReader(desc); - } - - protected ParquetValueReader dateReader(ColumnDescriptor desc) { - return new GenericParquetReaders.DateReader(desc); - } + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - protected ParquetValueReader timeReader(ColumnDescriptor desc) { - LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time); - - LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit(); - switch (unit) { - case MICROS: - return new GenericParquetReaders.TimeReader(desc); - case MILLIS: - return new GenericParquetReaders.TimeMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for time: " + unit); - } - } + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - return new GenericParquetReaders.TimestampInt96Reader(desc); - } + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - timestamp instanceof TimestampLogicalTypeAnnotation, - "Invalid timestamp logical type: " + timestamp); - - LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); - switch (unit) { - case MICROS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzReader(desc) - : new GenericParquetReaders.TimestampReader(desc); - case MILLIS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzMillisReader(desc) - : new GenericParquetReaders.TimestampMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); - } - } + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -206,8 +166,7 @@ public Optional> visit(TimeLogicalTypeAnnotation timeLogic @Override public Optional> visit( TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of( - timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC())); + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 92aab005579c..578cc979987a 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -51,25 +51,14 @@ protected ParquetValueWriter createWriter(Types.StructType struct, MessageTyp protected abstract ParquetValueWriters.StructWriter createStructWriter( List> writers); - protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.FixedWriter(desc); - } + protected abstract ParquetValueWriter fixedWriter(ColumnDescriptor desc); - protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.DateWriter(desc); - } + protected abstract ParquetValueWriter dateWriter(ColumnDescriptor desc); - protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.TimeWriter(desc); - } + protected abstract ParquetValueWriter timeWriter(ColumnDescriptor desc); - protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (isAdjustedToUTC) { - return new GenericParquetWriter.TimestamptzWriter(desc); - } else { - return new GenericParquetWriter.TimestampWriter(desc); - } - } + protected abstract ParquetValueWriter timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC); private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; @@ -244,8 +233,8 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", + !LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS and NANOS are supported", timestampType.getUnit()); return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 8d22e2337cdf..e12f379b36bb 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -35,9 +35,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; public class GenericParquetReaders extends BaseParquetReaders { @@ -62,6 +65,66 @@ protected ParquetValueReader createStructReader( return ParquetValueReaders.recordReader(fieldReaders, structType); } + @Override + protected ParquetValueReader fixedReader(ColumnDescriptor desc) { + return new GenericParquetReaders.FixedReader(desc); + } + + @Override + protected ParquetValueReader dateReader(ColumnDescriptor desc) { + return new GenericParquetReaders.DateReader(desc); + } + + @Override + protected ParquetValueReader timeReader(ColumnDescriptor desc) { + LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); + Preconditions.checkArgument( + time instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation, + "Invalid time logical type: " + time); + + LogicalTypeAnnotation.TimeUnit unit = + ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) time).getUnit(); + switch (unit) { + case MICROS: + return new GenericParquetReaders.TimeReader(desc); + case MILLIS: + return new GenericParquetReaders.TimeMillisReader(desc); + default: + throw new UnsupportedOperationException("Unsupported unit for time: " + unit); + } + } + + @Override + protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { + if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + return new GenericParquetReaders.TimestampInt96Reader(desc); + } + + LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); + Preconditions.checkArgument( + timestamp instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation, + "Invalid timestamp logical type: " + timestamp); + + LogicalTypeAnnotation.TimeUnit unit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) timestamp).getUnit(); + switch (unit) { + case NANOS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzReader(desc, ChronoUnit.NANOS) + : new GenericParquetReaders.TimestampReader(desc, ChronoUnit.NANOS); + case MICROS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzReader(desc, ChronoUnit.MICROS) + : new GenericParquetReaders.TimestampReader(desc, ChronoUnit.MICROS); + case MILLIS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzMillisReader(desc) + : new GenericParquetReaders.TimestampMillisReader(desc); + default: + throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); + } + } + @Override protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return GenericDataUtil.internalToGeneric(type, value); @@ -70,7 +133,7 @@ protected Object convertConstant(org.apache.iceberg.types.Type type, Object valu private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - static class DateReader extends ParquetValueReaders.PrimitiveReader { + private static class DateReader extends ParquetValueReaders.PrimitiveReader { DateReader(ColumnDescriptor desc) { super(desc); } @@ -81,18 +144,22 @@ public LocalDate read(LocalDate reuse) { } } - static class TimestampReader extends ParquetValueReaders.PrimitiveReader { - TimestampReader(ColumnDescriptor desc) { + private static class TimestampReader extends ParquetValueReaders.PrimitiveReader { + private final ChronoUnit unit; + + TimestampReader(ColumnDescriptor desc, ChronoUnit unit) { super(desc); + this.unit = unit; } @Override public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); + return EPOCH.plus(column.nextLong(), unit).toLocalDateTime(); } } - static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader { + private static class TimestampMillisReader + extends ParquetValueReaders.PrimitiveReader { TimestampMillisReader(ColumnDescriptor desc) { super(desc); } @@ -103,7 +170,8 @@ public LocalDateTime read(LocalDateTime reuse) { } } - static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader { + private static class TimestampInt96Reader + extends ParquetValueReaders.PrimitiveReader { private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { @@ -123,18 +191,23 @@ public OffsetDateTime read(OffsetDateTime reuse) { } } - static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader { - TimestamptzReader(ColumnDescriptor desc) { + private static class TimestamptzReader + extends ParquetValueReaders.PrimitiveReader { + private final ChronoUnit unit; + + TimestamptzReader(ColumnDescriptor desc, ChronoUnit unit) { super(desc); + this.unit = unit; } @Override public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); + return EPOCH.plus(column.nextLong(), unit); } } - static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader { + private static class TimestamptzMillisReader + extends ParquetValueReaders.PrimitiveReader { TimestamptzMillisReader(ColumnDescriptor desc) { super(desc); } @@ -145,7 +218,7 @@ public OffsetDateTime read(OffsetDateTime reuse) { } } - static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { + private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { TimeMillisReader(ColumnDescriptor desc) { super(desc); } @@ -156,7 +229,7 @@ public LocalTime read(LocalTime reuse) { } } - static class TimeReader extends ParquetValueReaders.PrimitiveReader { + private static class TimeReader extends ParquetValueReaders.PrimitiveReader { TimeReader(ColumnDescriptor desc) { super(desc); } @@ -167,7 +240,7 @@ public LocalTime read(LocalTime reuse) { } } - static class FixedReader extends ParquetValueReaders.PrimitiveReader { + private static class FixedReader extends ParquetValueReaders.PrimitiveReader { FixedReader(ColumnDescriptor desc) { super(desc); } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index b4727cc52bf6..559b8c7fb62c 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -33,6 +33,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; public class GenericParquetWriter extends BaseParquetWriter { @@ -49,10 +51,41 @@ protected StructWriter createStructWriter(List> wr return ParquetValueWriters.recordWriter(writers); } + @Override + protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.FixedWriter(desc); + } + + @Override + protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.DateWriter(desc); + } + + @Override + protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.TimeWriter(desc); + } + + @Override + protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { + LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); + Preconditions.checkArgument( + timestamp instanceof TimestampLogicalTypeAnnotation, + "Invalid timestamp logical type: " + timestamp); + + LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); + + if (isAdjustedToUTC) { + return new GenericParquetWriter.TimestamptzWriter(desc, fromParquet(unit)); + } else { + return new GenericParquetWriter.TimestampWriter(desc, fromParquet(unit)); + } + } + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - static class DateWriter extends ParquetValueWriters.PrimitiveWriter { + private static class DateWriter extends ParquetValueWriters.PrimitiveWriter { DateWriter(ColumnDescriptor desc) { super(desc); } @@ -63,7 +96,7 @@ public void write(int repetitionLevel, LocalDate value) { } } - static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { + private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { TimeWriter(ColumnDescriptor desc) { super(desc); } @@ -74,30 +107,36 @@ public void write(int repetitionLevel, LocalTime value) { } } - static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { - TimestampWriter(ColumnDescriptor desc) { + private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { + private final ChronoUnit unit; + + TimestampWriter(ColumnDescriptor desc, ChronoUnit unit) { super(desc); + this.unit = unit; } @Override public void write(int repetitionLevel, LocalDateTime value) { - column.writeLong( - repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); + column.writeLong(repetitionLevel, unit.between(EPOCH, value.atOffset(ZoneOffset.UTC))); } } - static class TimestamptzWriter extends ParquetValueWriters.PrimitiveWriter { - TimestamptzWriter(ColumnDescriptor desc) { + private static class TimestamptzWriter + extends ParquetValueWriters.PrimitiveWriter { + private final ChronoUnit unit; + + TimestamptzWriter(ColumnDescriptor desc, ChronoUnit unit) { super(desc); + this.unit = unit; } @Override public void write(int repetitionLevel, OffsetDateTime value) { - column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); + column.writeLong(repetitionLevel, unit.between(EPOCH, value)); } } - static class FixedWriter extends ParquetValueWriters.PrimitiveWriter { + private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter { private final int length; FixedWriter(ColumnDescriptor desc) { @@ -115,4 +154,15 @@ public void write(int repetitionLevel, byte[] value) { column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); } } + + private static ChronoUnit fromParquet(LogicalTypeAnnotation.TimeUnit unit) { + switch (unit) { + case MICROS: + return ChronoUnit.MICROS; + case NANOS: + return ChronoUnit.NANOS; + default: + throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); + } + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 0d26708bdf23..63aac8006e2d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -137,6 +137,7 @@ public static ParquetValueReader timestamps(ColumnDescriptor desc) { case MILLIS: return new TimestampMillisReader(desc); case MICROS: + case NANOS: return new UnboxedReader<>(desc); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java index 44e407f72a75..c2be9e6bcceb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java @@ -32,6 +32,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type.NestedType; import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types.DecimalType; import org.apache.iceberg.types.Types.FixedType; @@ -39,6 +40,7 @@ import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampNanoType; import org.apache.iceberg.types.Types.TimestampType; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -58,6 +60,10 @@ public class TypeToMessageType { LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS); private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.MICROS); + private static final LogicalTypeAnnotation TIMESTAMP_NANOS = + LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.NANOS); + private static final LogicalTypeAnnotation TIMESTAMPTZ_NANOS = + LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.NANOS); private static final String METADATA = "metadata"; private static final String VALUE = "value"; private static final String TYPED_VALUE = "typed_value"; @@ -76,7 +82,11 @@ public MessageType convert(Schema schema, String name) { Types.MessageTypeBuilder builder = Types.buildMessage(); for (NestedField field : schema.columns()) { - builder.addField(field(field)); + // unknown type is not written to data files + Type fieldType = field(field); + if (fieldType != null) { + builder.addField(field(field)); + } } return builder.named(AvroSchemaUtil.makeCompatibleName(name)); @@ -86,7 +96,11 @@ public GroupType struct(StructType struct, Type.Repetition repetition, int id, S Types.GroupBuilder builder = Types.buildGroup(repetition); for (NestedField field : struct.fields()) { - builder.addField(field(field)); + // unknown type is not written to data files + Type fieldType = field(field); + if (fieldType != null) { + builder.addField(field(field)); + } } return builder.id(id).named(AvroSchemaUtil.makeCompatibleName(name)); @@ -98,7 +112,10 @@ public Type field(NestedField field) { int id = field.fieldId(); String name = field.name(); - if (field.type().isPrimitiveType()) { + if (field.type().typeId() == TypeID.UNKNOWN) { + return null; + + } else if (field.type().isPrimitiveType()) { return primitive(field.type().asPrimitiveType(), repetition, id, name); } else if (field.type().isVariantType()) { @@ -119,8 +136,12 @@ public Type field(NestedField field) { public GroupType list(ListType list, Type.Repetition repetition, int id, String name) { NestedField elementField = list.fields().get(0); + Type elementType = field(elementField); + Preconditions.checkArgument( + elementType != null, "Cannot convert element Parquet: %s", elementField.type()); + return Types.list(repetition) - .element(field(elementField)) + .element(elementType) .id(id) .named(AvroSchemaUtil.makeCompatibleName(name)); } @@ -128,6 +149,12 @@ public GroupType list(ListType list, Type.Repetition repetition, int id, String public GroupType map(MapType map, Type.Repetition repetition, int id, String name) { NestedField keyField = map.fields().get(0); NestedField valueField = map.fields().get(1); + Type keyType = field(keyField); + Preconditions.checkArgument(keyType != null, "Cannot convert key Parquet: %s", keyField.type()); + Type valueType = field(valueField); + Preconditions.checkArgument( + valueType != null, "Cannot convert value Parquet: %s", valueField.type()); + return Types.map(repetition) .key(field(keyField)) .value(field(valueField)) @@ -198,6 +225,12 @@ public Type primitive( } else { return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); } + case TIMESTAMP_NANO: + if (((TimestampNanoType) primitive).shouldAdjustToUTC()) { + return Types.primitive(INT64, repetition).as(TIMESTAMPTZ_NANOS).id(id).named(name); + } else { + return Types.primitive(INT64, repetition).as(TIMESTAMP_NANOS).id(id).named(name); + } case STRING: return Types.primitive(BINARY, repetition).as(STRING).id(id).named(name); case BINARY: diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java index b89e9f48a4ef..c22344be6b1e 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java @@ -41,6 +41,16 @@ protected boolean supportsDefaultValues() { return true; } + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { List expected = RandomInternalData.generate(schema, 100, 1376L);