-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Kafka Connect: Support VARIANT when record convert #15283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
033f47f
a53af92
73ea656
2b3fcb1
47f6fe5
e8f57f1
b57e0d8
0117174
be09349
75ec7ce
c6e558a
787663d
75dc8a2
d9c8721
e517884
8bb8740
aa2bcce
5be9fa4
179b07a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,23 +22,28 @@ | |
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.math.BigDecimal; | ||
| import java.math.BigInteger; | ||
| import java.math.RoundingMode; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.time.Instant; | ||
| import java.time.LocalDate; | ||
| import java.time.LocalDateTime; | ||
| import java.time.LocalTime; | ||
| import java.time.OffsetDateTime; | ||
| import java.time.ZoneOffset; | ||
| import java.time.ZonedDateTime; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.time.format.DateTimeFormatterBuilder; | ||
| import java.time.format.DateTimeParseException; | ||
| import java.time.temporal.Temporal; | ||
| import java.util.Base64; | ||
| import java.util.Collection; | ||
| import java.util.Date; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.FileFormat; | ||
|
|
@@ -53,6 +58,7 @@ | |
| import org.apache.iceberg.mapping.NameMappingParser; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.types.Type; | ||
| import org.apache.iceberg.types.Type.PrimitiveType; | ||
| import org.apache.iceberg.types.Types.DecimalType; | ||
|
|
@@ -64,6 +70,13 @@ | |
| import org.apache.iceberg.util.ByteBuffers; | ||
| import org.apache.iceberg.util.DateTimeUtil; | ||
| import org.apache.iceberg.util.UUIDUtil; | ||
| import org.apache.iceberg.variants.ShreddedObject; | ||
| import org.apache.iceberg.variants.ValueArray; | ||
| import org.apache.iceberg.variants.Variant; | ||
| import org.apache.iceberg.variants.VariantMetadata; | ||
| import org.apache.iceberg.variants.VariantValue; | ||
| import org.apache.iceberg.variants.Variants; | ||
| import org.apache.kafka.connect.data.Field; | ||
| import org.apache.kafka.connect.data.Struct; | ||
| import org.apache.kafka.connect.errors.ConnectException; | ||
|
|
||
|
|
@@ -142,6 +155,8 @@ private Object convertValue( | |
| return convertTimeValue(value); | ||
| case TIMESTAMP: | ||
| return convertTimestampValue(value, (TimestampType) type); | ||
| case VARIANT: | ||
| return convertVariantValue(value); | ||
| } | ||
| throw new UnsupportedOperationException("Unsupported type: " + type.typeId()); | ||
| } | ||
|
|
@@ -464,6 +479,171 @@ protected Temporal convertTimestampValue(Object value, TimestampType type) { | |
| return convertLocalDateTime(value); | ||
| } | ||
|
|
||
| protected Variant convertVariantValue(Object value) { | ||
seokyun-ha-toss marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (value instanceof ByteBuffer) { | ||
| return Variant.from((ByteBuffer) value); | ||
| } | ||
|
|
||
| Set<String> fieldNames = Sets.newHashSet(); | ||
| collectFieldNames(value, fieldNames); | ||
| List<String> allFieldNames = fieldNames.stream().sorted().collect(Collectors.toList()); | ||
| VariantMetadata metadata = Variants.metadata(allFieldNames); | ||
| VariantValue variantValue = objectToVariantValue(value, metadata); | ||
| return Variant.of(metadata, variantValue); | ||
| } | ||
|
|
||
| /** | ||
| * Collects all field names (map keys) from the entire object tree into the given set. Used to | ||
| * build a single VariantMetadata for the whole Variant (required for nested maps). | ||
| */ | ||
| private static void collectFieldNames(Object value, Set<String> names) { | ||
| if (value == null) { | ||
| return; | ||
| } | ||
| if (value instanceof Collection) { | ||
| for (Object element : (Collection<?>) value) { | ||
| collectFieldNames(element, names); | ||
| } | ||
| return; | ||
| } | ||
| if (value instanceof Map) { | ||
| Map<?, ?> map = (Map<?, ?>) value; | ||
| for (Map.Entry<?, ?> entry : map.entrySet()) { | ||
| Object key = entry.getKey(); | ||
| if (key != null && key instanceof String) { | ||
| names.add((String) key); | ||
| collectFieldNames(entry.getValue(), names); | ||
| } | ||
| } | ||
| return; | ||
| } | ||
| if (value instanceof Struct) { | ||
| Struct struct = (Struct) value; | ||
| for (Field field : struct.schema().fields()) { | ||
| names.add(field.name()); | ||
| collectFieldNames(struct.get(field), names); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Recursively converts a Java object to a VariantValue using the given shared metadata for all | ||
| * nested maps. Handles primitives, List (array), and Map (object); map keys become field names. | ||
| */ | ||
| private static VariantValue objectToVariantValue(Object value, VariantMetadata metadata) { | ||
| if (value == null) { | ||
| return Variants.ofNull(); | ||
| } | ||
| VariantValue primitive = primitiveToVariantValue(value); | ||
| if (primitive != null) { | ||
| return primitive; | ||
| } | ||
| if (value instanceof Collection) { | ||
| ValueArray array = Variants.array(); | ||
| for (Object element : (Collection<?>) value) { | ||
| array.add(objectToVariantValue(element, metadata)); | ||
| } | ||
| return array; | ||
| } | ||
| if (value instanceof Map) { | ||
| Map<?, ?> map = (Map<?, ?>) value; | ||
| ShreddedObject object = Variants.object(metadata); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a non-shredded version of Object, any performance differences? |
||
| map.forEach( | ||
| (key, val) -> { | ||
| if (key != null && key instanceof String) { | ||
| object.put((String) key, objectToVariantValue(val, metadata)); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like we should throw here if the key is null or the value is an instance of string?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (one way would be checking the ShreddedObject size is the same? |
||
| }); | ||
| return object; | ||
| } | ||
| if (value instanceof Struct) { | ||
| Struct struct = (Struct) value; | ||
| ShreddedObject object = Variants.object(metadata); | ||
| for (Field field : struct.schema().fields()) { | ||
| object.put(field.name(), objectToVariantValue(struct.get(field), metadata)); | ||
| } | ||
| return object; | ||
| } | ||
| throw new IllegalArgumentException("Cannot convert to variant: " + value.getClass().getName()); | ||
| } | ||
|
|
||
| /** | ||
| * Converts a primitive or primitive-like value to VariantValue; returns null if not supported. | ||
| */ | ||
| private static VariantValue primitiveToVariantValue(Object value) { | ||
| if (value instanceof Boolean) { | ||
| return Variants.of((Boolean) value); | ||
| } | ||
| if (value instanceof Instant) { | ||
| return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant((Instant) value)); | ||
| } | ||
| if (value instanceof OffsetDateTime) { | ||
| return Variants.ofTimestamptz(DateTimeUtil.microsFromTimestamptz((OffsetDateTime) value)); | ||
| } | ||
| if (value instanceof ZonedDateTime) { | ||
| return Variants.ofTimestamptz( | ||
| DateTimeUtil.microsFromTimestamptz(((ZonedDateTime) value).toOffsetDateTime())); | ||
| } | ||
| if (value instanceof LocalDateTime) { | ||
| return Variants.ofTimestampntz(DateTimeUtil.microsFromTimestamp((LocalDateTime) value)); | ||
| } | ||
| if (value instanceof LocalDate) { | ||
| return Variants.ofDate(DateTimeUtil.daysFromDate((LocalDate) value)); | ||
| } | ||
| if (value instanceof LocalTime) { | ||
| return Variants.ofTime(DateTimeUtil.microsFromTime((LocalTime) value)); | ||
| } | ||
| if (value instanceof Date) { | ||
| int days = (int) (((Date) value).getTime() / 1000 / 60 / 60 / 24); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The java.util.Date class is poorly named, i think we should be mapping this to Timestamp TZ so we don't lose precision. https://docs.oracle.com/javase/8/docs/api/java/util/Date.html for details. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That’s a good point. Do we need to add a check to determine the exact logical type?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow, really appreciate it @brandonstanleyappfolio , Thanks! I'll take a look!! Sadly, I don't have much time to work on in these days.... I'll resume this work as soon as possible. Thanks for guys! 🙏 |
||
| return Variants.ofDate(days); | ||
| } | ||
| if (value instanceof Number) { | ||
| return numberToVariantValue((Number) value); | ||
| } | ||
| if (value instanceof String) { | ||
| return Variants.of((String) value); | ||
| } | ||
| if (value instanceof ByteBuffer) { | ||
| return Variants.of((ByteBuffer) value); | ||
| } | ||
| if (value instanceof byte[]) { | ||
| return Variants.of(ByteBuffer.wrap((byte[]) value)); | ||
| } | ||
| if (value instanceof UUID) { | ||
| return Variants.ofUUID((UUID) value); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| private static VariantValue numberToVariantValue(Number value) { | ||
| if (value instanceof BigDecimal) { | ||
| return Variants.of((BigDecimal) value); | ||
| } | ||
| if (value instanceof BigInteger) { | ||
| return Variants.of(new BigDecimal((BigInteger) value)); | ||
| } | ||
| if (value instanceof Integer) { | ||
| return Variants.of((Integer) value); | ||
| } | ||
| if (value instanceof Long) { | ||
| return Variants.of((Long) value); | ||
| } | ||
| if (value instanceof Float) { | ||
| return Variants.of((Float) value); | ||
| } | ||
| if (value instanceof Double) { | ||
| return Variants.of((Double) value); | ||
| } | ||
| if (value instanceof Byte) { | ||
| return Variants.of((Byte) value); | ||
| } | ||
| if (value instanceof Short) { | ||
| return Variants.of((Short) value); | ||
| } | ||
seokyun-ha-toss marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also consider Dates?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I interpreted from the pr description that these are derived from JSON which doesn't have dates? But this is a good clarifying question. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense! I can provide some context around my use case if that helps. I am loading Confluent Avro-serialized data that contains timestamp fields. When those fields are deserialized by the connector, they are converted to java.util.Date, which this PR does not currently handle. I added the following code to I also see that @seokyun-ha-toss added support for dates here! |
||
| throw new IllegalArgumentException( | ||
| "Cannot convert Number to variant (unknown type): " + value.getClass().getName()); | ||
| } | ||
|
|
||
| @SuppressWarnings("JavaUtilDate") | ||
| private OffsetDateTime convertOffsetDateTime(Object value) { | ||
| if (value instanceof Number) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.