Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@
import org.junit.jupiter.api.Test;

public class TestGenericData extends DataTest {
@Override
protected boolean supportsDefaultValues() {
return true;
}

@Override
protected boolean supportsUnknown() {
return true;
}

@Override
protected boolean supportsTimestampNanos() {
return true;
}

@Override
protected void writeAndValidate(Schema schema) throws IOException {
Expand Down Expand Up @@ -107,11 +121,6 @@ private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Re
}
}

@Override
protected boolean supportsDefaultValues() {
return true;
}

@Test
public void testTwoLevelList() throws IOException {
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,54 +79,14 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new GenericParquetReaders.FixedReader(desc);
}

protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new GenericParquetReaders.DateReader(desc);
}
protected abstract ParquetValueReader<?> fixedReader(ColumnDescriptor desc);

protected ParquetValueReader<?> timeReader(ColumnDescriptor desc) {
LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation();
Preconditions.checkArgument(
time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time);

LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit();
switch (unit) {
case MICROS:
return new GenericParquetReaders.TimeReader(desc);
case MILLIS:
return new GenericParquetReaders.TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for time: " + unit);
}
}
protected abstract ParquetValueReader<?> dateReader(ColumnDescriptor desc);

protected ParquetValueReader<?> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return new GenericParquetReaders.TimestampInt96Reader(desc);
}
protected abstract ParquetValueReader<?> timeReader(ColumnDescriptor desc);

LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation();
Preconditions.checkArgument(
timestamp instanceof TimestampLogicalTypeAnnotation,
"Invalid timestamp logical type: " + timestamp);

LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit();
switch (unit) {
case MICROS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzReader(desc)
: new GenericParquetReaders.TimestampReader(desc);
case MILLIS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzMillisReader(desc)
: new GenericParquetReaders.TimestampMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit);
}
}
protected abstract ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, boolean isAdjustedToUTC);

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
Expand Down Expand Up @@ -206,8 +166,7 @@ public Optional<ParquetValueReader<?>> visit(TimeLogicalTypeAnnotation timeLogic
@Override
public Optional<ParquetValueReader<?>> visit(
TimestampLogicalTypeAnnotation timestampLogicalType) {
return Optional.of(
timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC()));
return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,14 @@ protected ParquetValueWriter<T> createWriter(Types.StructType struct, MessageTyp
protected abstract ParquetValueWriters.StructWriter<T> createStructWriter(
List<ParquetValueWriter<?>> writers);

protected ParquetValueWriter<?> fixedWriter(ColumnDescriptor desc) {
return new GenericParquetWriter.FixedWriter(desc);
}
protected abstract ParquetValueWriter<?> fixedWriter(ColumnDescriptor desc);

protected ParquetValueWriter<?> dateWriter(ColumnDescriptor desc) {
return new GenericParquetWriter.DateWriter(desc);
}
protected abstract ParquetValueWriter<?> dateWriter(ColumnDescriptor desc);

protected ParquetValueWriter<?> timeWriter(ColumnDescriptor desc) {
return new GenericParquetWriter.TimeWriter(desc);
}
protected abstract ParquetValueWriter<?> timeWriter(ColumnDescriptor desc);

protected ParquetValueWriter<?> timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) {
if (isAdjustedToUTC) {
return new GenericParquetWriter.TimestamptzWriter(desc);
} else {
return new GenericParquetWriter.TimestampWriter(desc);
}
}
protected abstract ParquetValueWriter<?> timestampWriter(
ColumnDescriptor desc, boolean isAdjustedToUTC);

private class WriteBuilder extends TypeWithSchemaVisitor<ParquetValueWriter<?>> {
private final MessageType type;
Expand Down Expand Up @@ -244,8 +233,8 @@ public Optional<ParquetValueWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
Preconditions.checkArgument(
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported",
!LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS and NANOS are supported",
timestampType.getUnit());
return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class GenericParquetReaders extends BaseParquetReaders<Record> {
Expand All @@ -62,6 +65,66 @@ protected ParquetValueReader<Record> createStructReader(
return ParquetValueReaders.recordReader(fieldReaders, structType);
}

@Override
protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new GenericParquetReaders.FixedReader(desc);
}

@Override
protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new GenericParquetReaders.DateReader(desc);
}

@Override
protected ParquetValueReader<?> timeReader(ColumnDescriptor desc) {
LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation();
Preconditions.checkArgument(
time instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation,
"Invalid time logical type: " + time);

LogicalTypeAnnotation.TimeUnit unit =
((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) time).getUnit();
switch (unit) {
case MICROS:
return new GenericParquetReaders.TimeReader(desc);
case MILLIS:
return new GenericParquetReaders.TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for time: " + unit);
}
}

@Override
protected ParquetValueReader<?> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return new GenericParquetReaders.TimestampInt96Reader(desc);
}

LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation();
Preconditions.checkArgument(
timestamp instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation,
"Invalid timestamp logical type: " + timestamp);

LogicalTypeAnnotation.TimeUnit unit =
((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) timestamp).getUnit();
switch (unit) {
case NANOS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzReader(desc, ChronoUnit.NANOS)
: new GenericParquetReaders.TimestampReader(desc, ChronoUnit.NANOS);
case MICROS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzReader(desc, ChronoUnit.MICROS)
: new GenericParquetReaders.TimestampReader(desc, ChronoUnit.MICROS);
case MILLIS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzMillisReader(desc)
: new GenericParquetReaders.TimestampMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit);
}
}

@Override
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return GenericDataUtil.internalToGeneric(type, value);
Expand All @@ -70,7 +133,7 @@ protected Object convertConstant(org.apache.iceberg.types.Type type, Object valu
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
DateReader(ColumnDescriptor desc) {
super(desc);
}
Expand All @@ -81,18 +144,22 @@ public LocalDate read(LocalDate reuse) {
}
}

static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
TimestampReader(ColumnDescriptor desc) {
private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private final ChronoUnit unit;

TimestampReader(ColumnDescriptor desc, ChronoUnit unit) {
super(desc);
this.unit = unit;
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
return EPOCH.plus(column.nextLong(), unit).toLocalDateTime();
}
}

static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}
Expand All @@ -103,7 +170,8 @@ public LocalDateTime read(LocalDateTime reuse) {
}
}

static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

TimestampInt96Reader(ColumnDescriptor desc) {
Expand All @@ -123,18 +191,23 @@ public OffsetDateTime read(OffsetDateTime reuse) {
}
}

static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
TimestamptzReader(ColumnDescriptor desc) {
private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private final ChronoUnit unit;

TimestamptzReader(ColumnDescriptor desc, ChronoUnit unit) {
super(desc);
this.unit = unit;
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
return EPOCH.plus(column.nextLong(), unit);
}
}

static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}
Expand All @@ -145,7 +218,7 @@ public OffsetDateTime read(OffsetDateTime reuse) {
}
}

static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}
Expand All @@ -156,7 +229,7 @@ public LocalTime read(LocalTime reuse) {
}
}

static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
TimeReader(ColumnDescriptor desc) {
super(desc);
}
Expand All @@ -167,7 +240,7 @@ public LocalTime read(LocalTime reuse) {
}
}

static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
FixedReader(ColumnDescriptor desc) {
super(desc);
}
Expand Down
Loading