Skip to content

Commit 93bd640

Browse files
committed
handle enum during vstream copy
Enable support for enums during vstream copy phase. There are two reasons that the connector does not handle `enum` for PSDB branches. 1. The upstream debezium-connector-vitess simply does not support `enum` during the VStream copy phase. It tries to cast the row value to an integer, but the value is a string. It seems support for `enum` landed in 2021 debezium#20, and support for snapshots (VStream Copy) landed in 2022 debezium#112, without taking the former into account. This is easily fixed by finding finding the index of the string value in the list of values obtained from `column_type` during the schema discovery phase at the beginning of the VStream. 2. However, this isn't working on some PSDB branches which don't have the fix vitessio/vitess#13045 for this bug vitessio/vitess#12981. Fixable by backporting the bugfix or upgrading those branches. Signed-off-by: Max Englander <max@planetscale.com>
1 parent fd9a261 commit 93bd640

3 files changed

Lines changed: 50 additions & 0 deletions

File tree

src/main/java/io/debezium/connector/vitess/VitessType.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,32 @@ public int getJdbcId() {
4141
return jdbcId;
4242
}
4343

44+
public Integer getEnumOrdinal(String value) {
45+
int index = enumValues.indexOf(value);
46+
if (index == -1) {
47+
return Integer.valueOf(value);
48+
}
49+
return Integer.valueOf(index + 1);
50+
}
51+
4452
public List<String> getEnumValues() {
4553
return enumValues;
4654
}
4755

56+
public Long getSetNumeral(String value) {
57+
String[] members = value.split(",");
58+
Long result = 0L;
59+
for (String member : members) {
60+
long index = enumValues.indexOf(member);
61+
if (index == -1) {
62+
index = Long.valueOf(member);
63+
}
64+
Double power = Math.pow(2, index);
65+
result = result + Long.valueOf(power.longValue());
66+
}
67+
return result;
68+
}
69+
4870
public boolean isEnum() {
4971
return !enumValues.isEmpty();
5072
}

src/main/java/io/debezium/connector/vitess/connection/ReplicationMessageColumnValueResolver.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@ public static Object resolveValue(
2222
case Types.SMALLINT:
2323
return value.asShort();
2424
case Types.INTEGER:
25+
if (vitessType.isEnum()) {
26+
return vitessType.getEnumOrdinal(value.asString());
27+
}
2528
return value.asInteger();
2629
case Types.BIGINT:
30+
if (vitessType.isEnum()) {
31+
return vitessType.getSetNumeral(value.asString());
32+
}
2733
return value.asLong();
2834
case Types.BLOB:
2935
case Types.BINARY:

src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,28 @@ public void testInitialOnlySnapshot() throws Exception {
972972
stopConnector();
973973
}
974974

975+
@Test
976+
public void shouldHandleEnumAndSetDuringTableCopy() throws Exception {
977+
TestHelper.executeDDL("vitess_create_tables.ddl");
978+
979+
TestHelper.execute(INSERT_ENUM_TYPE_STMT, TEST_UNSHARDED_KEYSPACE);
980+
TestHelper.execute(INSERT_SET_TYPE_STMT, TEST_UNSHARDED_KEYSPACE);
981+
982+
final String enumTableName = "enum_table";
983+
final String setTableName = "set_table";
984+
985+
String tableInclude = TEST_UNSHARDED_KEYSPACE + "." + enumTableName + "," + TEST_UNSHARDED_KEYSPACE + "." + setTableName;
986+
startConnector(Function.identity(), false, false, 1,
987+
-1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD);
988+
989+
int expectedRecordsCount = 2;
990+
consumer = testConsumer(expectedRecordsCount);
991+
992+
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
993+
994+
stopConnector();
995+
}
996+
975997
private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception {
976998
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_UNSHARDED_KEYSPACE);
977999

0 commit comments

Comments
 (0)