Skip to content

Commit fabd708

Browse files
authored
feat(binder): add batch field count statistics to RecordBinder and AvroRecordView (#2795)
1 parent 1a32a0d commit fabd708

File tree

2 files changed

+307
-6
lines changed

2 files changed

+307
-6
lines changed

core/src/main/java/kafka/automq/table/binder/RecordBinder.java

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,21 @@
1818
*/
1919
package kafka.automq.table.binder;
2020

21+
22+
import kafka.automq.table.transformer.FieldMetric;
23+
2124
import org.apache.avro.Schema;
2225
import org.apache.avro.generic.GenericRecord;
2326
import org.apache.iceberg.avro.AvroSchemaUtil;
2427
import org.apache.iceberg.data.Record;
2528
import org.apache.iceberg.types.Type;
2629
import org.apache.iceberg.types.Types;
2730

31+
import java.nio.ByteBuffer;
2832
import java.util.HashMap;
33+
import java.util.List;
2934
import java.util.Map;
35+
import java.util.concurrent.atomic.AtomicLong;
3036

3137
/**
3238
* A factory that creates lazy-evaluation Record views of Avro GenericRecords.
@@ -42,6 +48,9 @@ public class RecordBinder {
4248
// Pre-computed RecordBinders for nested STRUCT fields
4349
private final Map<String, RecordBinder> nestedStructBinders;
4450

51+
// Field count statistics for this batch
52+
private final AtomicLong batchFieldCount;
53+
4554

4655
public RecordBinder(GenericRecord avroRecord) {
4756
this(AvroSchemaUtil.toIceberg(avroRecord.getSchema()), avroRecord.getSchema());
@@ -52,8 +61,13 @@ public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema)
5261
}
5362

5463
public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema, TypeAdapter<Schema> typeAdapter) {
64+
this(icebergSchema, avroSchema, typeAdapter, new AtomicLong(0));
65+
}
66+
67+
public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema, TypeAdapter<Schema> typeAdapter, AtomicLong batchFieldCount) {
5568
this.icebergSchema = icebergSchema;
5669
this.typeAdapter = typeAdapter;
70+
this.batchFieldCount = batchFieldCount;
5771

5872
// Pre-compute field name to position mapping
5973
this.fieldNameToPosition = new HashMap<>();
@@ -83,7 +97,22 @@ public Record bind(GenericRecord avroRecord) {
8397
return null;
8498
}
8599
return new AvroRecordView(avroRecord, icebergSchema, typeAdapter,
86-
fieldNameToPosition, fieldMappings, nestedStructBinders);
100+
fieldNameToPosition, fieldMappings, nestedStructBinders, this);
101+
}
102+
103+
/**
104+
* Gets the accumulated field count for this batch and resets it to zero.
105+
* Should be called after each flush to collect field statistics.
106+
*/
107+
public long getAndResetFieldCount() {
108+
return batchFieldCount.getAndSet(0);
109+
}
110+
111+
/**
112+
* Adds field count to the batch total. Called by AvroRecordView instances.
113+
*/
114+
void addFieldCount(long count) {
115+
batchFieldCount.addAndGet(count);
87116
}
88117

89118
private void initializeFieldMappings(Schema avroSchema) {
@@ -138,7 +167,8 @@ private Map<String, RecordBinder> precomputeNestedStructBinders(TypeAdapter<Sche
138167
RecordBinder nestedBinder = new RecordBinder(
139168
mapping.nestedSchema(),
140169
mapping.avroSchema(),
141-
typeAdapter
170+
typeAdapter,
171+
batchFieldCount
142172
);
143173
binders.put(structId, nestedBinder);
144174
}
@@ -155,19 +185,22 @@ private static class AvroRecordView implements Record {
155185
private final Map<String, Integer> fieldNameToPosition;
156186
private final FieldMapping[] fieldMappings;
157187
private final Map<String, RecordBinder> nestedStructBinders;
188+
private final RecordBinder parentBinder;
158189

159190
AvroRecordView(GenericRecord avroRecord,
160191
org.apache.iceberg.Schema icebergSchema,
161192
TypeAdapter<Schema> typeAdapter,
162193
Map<String, Integer> fieldNameToPosition,
163194
FieldMapping[] fieldMappings,
164-
Map<String, RecordBinder> nestedStructBinders) {
195+
Map<String, RecordBinder> nestedStructBinders,
196+
RecordBinder parentBinder) {
165197
this.avroRecord = avroRecord;
166198
this.icebergSchema = icebergSchema;
167199
this.typeAdapter = typeAdapter;
168200
this.fieldNameToPosition = fieldNameToPosition;
169201
this.fieldMappings = fieldMappings;
170202
this.nestedStructBinders = nestedStructBinders;
203+
this.parentBinder = parentBinder;
171204
}
172205

173206
@Override
@@ -189,18 +222,81 @@ public Object get(int pos) {
189222
return null;
190223
}
191224

192-
// Handle STRUCT type
225+
// Handle STRUCT type - delegate to nested binder
193226
if (mapping.typeId() == Type.TypeID.STRUCT) {
194227
String structId = mapping.nestedSchemaId();
195228
RecordBinder nestedBinder = nestedStructBinders.get(structId);
196229
if (nestedBinder == null) {
197230
throw new IllegalStateException("Nested binder not found for struct: " + structId);
198231
}
232+
parentBinder.addFieldCount(1);
199233
return nestedBinder.bind((GenericRecord) avroValue);
200234
}
201235

202-
// Delegate conversion of all other types to the adapter
203-
return typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType());
236+
// Convert non-STRUCT types
237+
Object result = typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType());
238+
239+
// Calculate and accumulate field count
240+
long fieldCount = calculateFieldCount(result, mapping.icebergType());
241+
parentBinder.addFieldCount(fieldCount);
242+
243+
return result;
244+
}
245+
246+
/**
247+
* Calculates the field count for a converted value based on its size.
248+
* Large fields are counted multiple times based on the size threshold.
249+
*/
250+
private long calculateFieldCount(Object value, Type icebergType) {
251+
if (value == null) {
252+
return 0;
253+
}
254+
255+
switch (icebergType.typeId()) {
256+
case STRING:
257+
return FieldMetric.count((String) value);
258+
case BINARY:
259+
return FieldMetric.count((ByteBuffer) value);
260+
case FIXED:
261+
return FieldMetric.count((byte[]) value);
262+
case LIST:
263+
return calculateListFieldCount((List<?>) value, ((Types.ListType) icebergType).elementType());
264+
case MAP:
265+
return calculateMapFieldCount((Map<?, ?>) value, (Types.MapType) icebergType);
266+
default:
267+
return 1; // Struct or Primitive types count as 1 field
268+
}
269+
}
270+
271+
/**
272+
* Calculates field count for List values by summing element costs.
273+
*/
274+
private long calculateListFieldCount(List<?> list, Type elementType) {
275+
if (list == null) {
276+
return 0;
277+
}
278+
279+
long total = 1;
280+
for (Object element : list) {
281+
total += calculateFieldCount(element, elementType);
282+
}
283+
return total;
284+
}
285+
286+
/**
287+
* Calculates field count for Map values by summing key and value costs.
288+
*/
289+
private long calculateMapFieldCount(Map<?, ?> map, Types.MapType mapType) {
290+
if (map == null || map.isEmpty()) {
291+
return 0;
292+
}
293+
294+
long total = 1;
295+
for (Map.Entry<?, ?> entry : map.entrySet()) {
296+
total += calculateFieldCount(entry.getKey(), mapType.keyType());
297+
total += calculateFieldCount(entry.getValue(), mapType.valueType());
298+
}
299+
return total;
204300
}
205301

206302
@Override

core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,4 +1042,209 @@ public void testBindWithNestedOptionalRecord() {
10421042
Record boundRecordWithNull = recordBinder.bind(envelopeRecordWithNull);
10431043
assertNull(boundRecordWithNull.getField("before"));
10441044
}
1045+
1046+
// Test method for field count statistics
1047+
@Test
1048+
public void testFieldCountStatistics() {
1049+
// Test different field types and their count calculations
1050+
String avroSchemaStr = "{\n" +
1051+
" \"type\": \"record\",\n" +
1052+
" \"name\": \"TestRecord\",\n" +
1053+
" \"fields\": [\n" +
1054+
" {\"name\": \"smallString\", \"type\": \"string\"},\n" +
1055+
" {\"name\": \"largeString\", \"type\": \"string\"},\n" +
1056+
" {\"name\": \"intField\", \"type\": \"int\"},\n" +
1057+
" {\"name\": \"binaryField\", \"type\": \"bytes\"}\n" +
1058+
" ]\n" +
1059+
"}";
1060+
1061+
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
1062+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
1063+
RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema);
1064+
1065+
// Create test record with different field sizes
1066+
GenericRecord avroRecord = new GenericData.Record(avroSchema);
1067+
avroRecord.put("smallString", "small"); // 5 chars = 1 field (5+23)/24 = 1
1068+
avroRecord.put("largeString", "a".repeat(50)); // 50 chars = 3 fields (50+23)/24 = 3
1069+
avroRecord.put("intField", 42); // primitive = 1 field
1070+
avroRecord.put("binaryField", ByteBuffer.wrap("test".repeat(10).getBytes())); // 40 bytes = 2 fields (40+31)/32 = 2
1071+
1072+
// Bind record - this should trigger field counting
1073+
Record icebergRecord = recordBinder.bind(avroRecord);
1074+
1075+
// Access all fields to trigger counting
1076+
assertEquals("small", icebergRecord.getField("smallString"));
1077+
assertEquals("a".repeat(50), icebergRecord.getField("largeString"));
1078+
assertEquals(42, icebergRecord.getField("intField"));
1079+
assertEquals("test".repeat(10), new String(((ByteBuffer) icebergRecord.getField("binaryField")).array()));
1080+
1081+
// Check field count: 1 + 3 + 1 + 2 = 7 fields total
1082+
long fieldCount = recordBinder.getAndResetFieldCount();
1083+
assertEquals(7, fieldCount);
1084+
1085+
// Second call should return 0 (reset)
1086+
assertEquals(0, recordBinder.getAndResetFieldCount());
1087+
1088+
testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord);
1089+
assertEquals(7, recordBinder.getAndResetFieldCount());
1090+
}
1091+
1092+
@Test
1093+
public void testFieldCountWithComplexTypes() {
1094+
// Test field counting for LIST and MAP types
1095+
String avroSchemaStr = "{\n" +
1096+
" \"type\": \"record\",\n" +
1097+
" \"name\": \"ComplexRecord\",\n" +
1098+
" \"fields\": [\n" +
1099+
" {\"name\": \"stringList\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" +
1100+
" {\"name\": \"stringMap\", \"type\": {\"type\": \"map\", \"values\": \"string\"}}\n" +
1101+
" ]\n" +
1102+
"}";
1103+
1104+
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
1105+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
1106+
RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema);
1107+
1108+
GenericRecord avroRecord = new GenericData.Record(avroSchema);
1109+
// List with 3 small strings: 1 (list itself) + 3 * 1 = 4 fields
1110+
avroRecord.put("stringList", Arrays.asList("a", "b", "c"));
1111+
1112+
// Map with 2 entries: 1 (map itself) + 2 * (1 key + 1 value) = 5 fields
1113+
Map<String, String> map = new HashMap<>();
1114+
map.put("key1", "val1");
1115+
map.put("key2", "val2");
1116+
avroRecord.put("stringMap", map);
1117+
1118+
Record icebergRecord = recordBinder.bind(avroRecord);
1119+
1120+
// Access fields to trigger counting
1121+
assertEquals(Arrays.asList("a", "b", "c"), icebergRecord.getField("stringList"));
1122+
assertEquals(map, icebergRecord.getField("stringMap"));
1123+
1124+
// Total: 4 (list) + 5 (map) = 9 fields
1125+
long fieldCount = recordBinder.getAndResetFieldCount();
1126+
assertEquals(9, fieldCount);
1127+
1128+
testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord);
1129+
assertEquals(9, recordBinder.getAndResetFieldCount());
1130+
}
1131+
1132+
@Test
1133+
public void testFieldCountWithNestedStructure() {
1134+
// Test field counting for nested records
1135+
String avroSchemaStr = "{\n" +
1136+
" \"type\": \"record\",\n" +
1137+
" \"name\": \"NestedRecord\",\n" +
1138+
" \"fields\": [\n" +
1139+
" {\"name\": \"simpleField\", \"type\": \"string\"},\n" +
1140+
" {\n" +
1141+
" \"name\": \"nestedField\",\n" +
1142+
" \"type\": {\n" +
1143+
" \"type\": \"record\",\n" +
1144+
" \"name\": \"Nested\",\n" +
1145+
" \"fields\": [\n" +
1146+
" {\"name\": \"nestedString\", \"type\": \"string\"},\n" +
1147+
" {\"name\": \"nestedInt\", \"type\": \"int\"}\n" +
1148+
" ]\n" +
1149+
" }\n" +
1150+
" }\n" +
1151+
" ]\n" +
1152+
"}";
1153+
1154+
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
1155+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
1156+
RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema);
1157+
1158+
// Create nested record
1159+
GenericRecord nestedRecord = new GenericData.Record(avroSchema.getField("nestedField").schema());
1160+
nestedRecord.put("nestedString", "nested"); // 1 field
1161+
nestedRecord.put("nestedInt", 123); // 1 field
1162+
1163+
GenericRecord mainRecord = new GenericData.Record(avroSchema);
1164+
mainRecord.put("simpleField", "simple"); // 1 field
1165+
mainRecord.put("nestedField", nestedRecord); // STRUCT fields are counted when accessed
1166+
1167+
Record icebergRecord = recordBinder.bind(mainRecord);
1168+
1169+
// Access all fields including nested ones
1170+
assertEquals("simple", icebergRecord.getField("simpleField"));
1171+
Record nested = (Record) icebergRecord.getField("nestedField");
1172+
assertEquals("nested", nested.getField("nestedString"));
1173+
assertEquals(123, nested.getField("nestedInt"));
1174+
1175+
// Total: 1 (simple) + 1(struct) + 1 (nested string) + 1 (nested int) = 4 fields
1176+
// Note: STRUCT type itself doesn't add to count, only its leaf fields
1177+
long fieldCount = recordBinder.getAndResetFieldCount();
1178+
assertEquals(4, fieldCount);
1179+
1180+
testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord);
1181+
assertEquals(4, recordBinder.getAndResetFieldCount());
1182+
}
1183+
1184+
@Test
1185+
public void testFieldCountBatchAccumulation() {
1186+
// Test that field counts accumulate across multiple record bindings
1187+
String avroSchemaStr = "{\n" +
1188+
" \"type\": \"record\",\n" +
1189+
" \"name\": \"SimpleRecord\",\n" +
1190+
" \"fields\": [\n" +
1191+
" {\"name\": \"stringField\", \"type\": \"string\"},\n" +
1192+
" {\"name\": \"intField\", \"type\": \"int\"}\n" +
1193+
" ]\n" +
1194+
"}";
1195+
1196+
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
1197+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
1198+
RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema);
1199+
1200+
// Process multiple records
1201+
for (int i = 0; i < 3; i++) {
1202+
GenericRecord avroRecord = new GenericData.Record(avroSchema);
1203+
avroRecord.put("stringField", "test" + i); // 1 field each
1204+
avroRecord.put("intField", i); // 1 field each
1205+
1206+
Record icebergRecord = recordBinder.bind(avroRecord);
1207+
// Access fields to trigger counting
1208+
icebergRecord.getField("stringField");
1209+
icebergRecord.getField("intField");
1210+
}
1211+
1212+
// Total: 3 records * 2 fields each = 6 fields
1213+
long totalFieldCount = recordBinder.getAndResetFieldCount();
1214+
assertEquals(6, totalFieldCount);
1215+
}
1216+
1217+
@Test
1218+
public void testFieldCountWithNullValues() {
1219+
// Test that null values don't contribute to field count
1220+
String avroSchemaStr = "{\n" +
1221+
" \"type\": \"record\",\n" +
1222+
" \"name\": \"NullableRecord\",\n" +
1223+
" \"fields\": [\n" +
1224+
" {\"name\": \"nonNullField\", \"type\": \"string\"},\n" +
1225+
" {\"name\": \"nullField\", \"type\": [\"null\", \"string\"], \"default\": null}\n" +
1226+
" ]\n" +
1227+
"}";
1228+
1229+
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
1230+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
1231+
RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema);
1232+
1233+
GenericRecord avroRecord = new GenericData.Record(avroSchema);
1234+
avroRecord.put("nonNullField", "value"); // 1 field
1235+
avroRecord.put("nullField", null); // 0 fields
1236+
1237+
Record icebergRecord = recordBinder.bind(avroRecord);
1238+
1239+
// Access both fields
1240+
assertEquals("value", icebergRecord.getField("nonNullField"));
1241+
assertNull(icebergRecord.getField("nullField"));
1242+
1243+
// Only the non-null field should count
1244+
long fieldCount = recordBinder.getAndResetFieldCount();
1245+
assertEquals(1, fieldCount);
1246+
1247+
testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord);
1248+
assertEquals(1, recordBinder.getAndResetFieldCount());
1249+
}
10451250
}

0 commit comments

Comments
 (0)