Overview of the Issue
In VReplication workflows and the VStream API there is a copy phase (initial snapshot copy) and a running/replicating phase. You can read more about the lifecycle here.
For the copy phase, a rowstreamer (a specific type of vstreamer) streams the rows resulting from a query used to get a consistent snapshot of the table at that logical point in time (a GTID set which matches the consistent snapshot of the table). For the running/replicating phase a standard vstreamer streams filtered binary log events from the source tablet to the target tablet(s).
Because the rowstreamer sends the results of a query, for ENUM and SET columns it sends the string value that it gets back from MySQL. Because the vstreamer sends filtered binary log events, for ENUM and SET columns it sends the integer based value as that's what is in the binary log events.
This has been (partially) addressed in the past on the consumer side. For VReplication workflows the consumers are vcopier for the rowstreamer and vplayer for the vstreamer and the vplayer started doing the integer to string mapping in #15349 (this was done as it caused problems for Vitess OnlineDDL when the schema change was shuffling the order of elements in the ENUM around). In VStream consumers such as the Debezium Vitess connector and the PlanetScale Airbyte connector this mapping was done when processing the incoming VEvents (debezium connector PR, airbyte connector PR).
Rather than pushing this work on to each current and future consumer, we should unify the behavior in the VReplication vstreamer and its subtypes such as rowstreamer.
Reproduction Steps
Setup the test env:
git checkout main && make build
pushd examples/local
./101_initial_cluster.sh
mysql < ../common/insert_commerce_data.sql
vtctldclient ApplySchema --ddl-strategy=direct --sql "alter table customer add size enum ('small', 'medium', 'large') default 'medium'" commerce
Modify the vstream_client example to stream everything from the customer table in the unsharded commerce keyspace:
diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go
index 98d2129f89..939178159f 100644
--- a/examples/local/vstream_client.go
+++ b/examples/local/vstream_client.go
@@ -38,7 +38,7 @@ import (
*/
func main() {
ctx := context.Background()
- streamCustomer := true
+ streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
In another Terminal, start a vtgate VStream of the customer table. It will start with the copy phase and then continue on in the running/replicating phase:
cd ${VTROOT}/examples/local
go run vstream_client.go
Generate some more data now that the vstream is past the copy phase and in the running/replicating phase:
mysql commerce
# ... perform inserts, updates, deletes against the customer table
Clean up:
Sample results:
[type:BEGIN keyspace:"commerce" shard:"0" type:FIELD field_event:{table_name:"commerce.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_commerce" org_name:"customer_id" column_length:20 charset:63 flags:49667 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_commerce" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} fields:{name:"size" type:ENUM table:"customer" org_table:"customer" database:"vt_commerce" org_name:"size" column_length:24 charset:255 flags:256 column_type:"enum('small','medium','large')"} keyspace:"commerce" shard:"0"} keyspace:"commerce" shard:"0"]
[type:VGTID vgtid:{shard_gtids:{keyspace:"commerce" shard:"0" gtid:"MySQL56/f9370c8c-9484-11ee-a06b-d03490376450:1-62"}} keyspace:"commerce" shard:"0"]
[type:ROW row_event:{table_name:"commerce.customer" row_changes:{after:{lengths:1 lengths:16 lengths:6 values:"1alice@domain.commedium"}} keyspace:"commerce" shard:"0"} keyspace:"commerce" shard:"0" type:ROW row_event:{table_name:"commerce.customer" row_changes:{after:{lengths:1 lengths:14 lengths:6 values:"2bob@domain.commedium"}} keyspace:"commerce" shard:"0"} keyspace:"commerce" shard:"0" type:ROW row_event:{table_name:"commerce.customer" row_changes:{after:{lengths:1 lengths:18 lengths:6 values:"3charlie@domain.commedium"}} keyspace:"commerce" shard:"0"} keyspace:"commerce" shard:"0" type:ROW row_event:{table_name:"commerce.customer" row_changes:{after:{lengths:1 lengths:14 lengths:6 values:"4dan@domain.commedium"}} keyspace:"commerce" shard:"0"} keyspace:"commerce" shard:"0" type:ROW row_event:{table_name:"commerce.customer" row_changes:{after:{lengths:1 lengths:14 lengths:6 values:"5eve@domain.commedium"}} keyspace:"commerce" shard:"0"} keyspace:"commerce" shard:"0" type:VGTID vgtid:{shard_gtids:{keyspace:"commerce" shard:"0" gtid:"MySQL56/f9370c8c-9484-11ee-a06b-d03490376450:1-62" table_p_ks:{table_name:"customer" lastpk:{fields:{name:"customer_id" type:INT64 charset:63 flags:49667} rows:{lengths:1 values:"5"}}}}} keyspace:"commerce" shard:"0" type:COMMIT keyspace:"commerce" shard:"0"]
[type:BEGIN keyspace:"commerce" shard:"0" type:VGTID vgtid:{shard_gtids:{keyspace:"commerce" shard:"0" gtid:"MySQL56/f9370c8c-9484-11ee-a06b-d03490376450:1-62"}} keyspace:"commerce" shard:"0" type:COMMIT keyspace:"commerce" shard:"0"]
[type:COPY_COMPLETED keyspace:"commerce" shard:"0" type:COPY_COMPLETED]
[type:BEGIN timestamp:1701901111 current_time:1701901111210764000 keyspace:"commerce" shard:"0" type:FIELD timestamp:1701901111 field_event:{table_name:"commerce.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_commerce" org_name:"customer_id" column_length:20 charset:63 flags:49667 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_commerce" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} fields:{name:"size" type:ENUM table:"customer" org_table:"customer" database:"vt_commerce" org_name:"size" column_length:24 charset:255 flags:256 column_type:"enum('small','medium','large')"} keyspace:"commerce" shard:"0"} current_time:1701901111213501000 keyspace:"commerce" shard:"0" type:ROW timestamp:1701901111 row_event:{table_name:"commerce.customer" row_changes:{after:{lengths:1 lengths:21 lengths:1 values:"6mlord@planetscale.com1"}} keyspace:"commerce" shard:"0" flags:1} current_time:1701901111213523000 keyspace:"commerce" shard:"0" type:VGTID vgtid:{shard_gtids:{keyspace:"commerce" shard:"0" gtid:"MySQL56/f9370c8c-9484-11ee-a06b-d03490376450:1-63"}} keyspace:"commerce" shard:"0" type:COMMIT timestamp:1701901111 current_time:1701901111213534000 keyspace:"commerce" shard:"0"]
Binary Version
❯ vtgate --version
vtgate version Version: 20.0.0-SNAPSHOT (Git revision da1301b1cec3ba7c12b148eda94d7fbaa063e6ef branch 'main') built on Thu Apr 18 10:44:19 EDT 2024 by matt@pslord.local using go1.22.2 darwin/arm64
Operating System and Environment details
Log Fragments
No response
Overview of the Issue
In
VReplicationworkflows and theVStreamAPI there is a copy phase (initial snapshot copy) and a running/replicating phase. You can read more about the lifecycle here.For the copy phase, a
rowstreamer(a specific type ofvstreamer) streams the rows resulting from a query used to get a consistent snapshot of the table at that logical point in time (a GTID set which matches the consistent snapshot of the table). For the running/replicating phase a standardvstreamerstreams filtered binary log events from the source tablet to the target tablet(s).Because the
rowstreamersends the results of a query, forENUMandSETcolumns it sends the string value that it gets back from MySQL. Because thevstreamersends filtered binary log events, forENUMandSETcolumns it sends the integer based value as that's what is in the binary log events.This has been (partially) addressed in the past on the consumer side. For
VReplicationworkflows the consumers arevcopierfor therowstreamerandvplayerfor thevstreamerand thevplayerstarted doing the integer to string mapping in #15349 (this was done as it caused problems for Vitess OnlineDDL when the schema change was shuffling the order of elements in the ENUM around). InVStreamconsumers such as the Debezium Vitess connector and the PlanetScale Airbyte connector this mapping was done when processing the incomingVEvents(debezium connector PR, airbyte connector PR).Rather than pushing this work on to each current and future consumer, we should unify the behavior in the
VReplicationvstreamerand its subtypes such asrowstreamer.Reproduction Steps
Setup the test env:
Modify the
vstream_clientexample to stream everything from thecustomertable in the unshardedcommercekeyspace:In another Terminal, start a vtgate VStream of the
customertable. It will start with the copy phase and then continue on in the running/replicating phase:Generate some more data now that the vstream is past the copy phase and in the running/replicating phase:
Clean up:
Sample results:
Binary Version
❯ vtgate --version vtgate version Version: 20.0.0-SNAPSHOT (Git revision da1301b1cec3ba7c12b148eda94d7fbaa063e6ef branch 'main') built on Thu Apr 18 10:44:19 EDT 2024 by matt@pslord.local using go1.22.2 darwin/arm64Operating System and Environment details
Log Fragments
No response