From d4efb2d3ce129c4c80a4ded5984f7c7677408a49 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Tue, 1 Jul 2025 00:58:13 -0400 Subject: [PATCH] add updated mysql_cdc --- python/sources/mysql_cdc/README.md | 162 +++++++++++++++++++ python/sources/mysql_cdc/dockerfile | 35 ++++ python/sources/mysql_cdc/helper_functions.py | 101 ++++++++++++ python/sources/mysql_cdc/icon.png | Bin 0 -> 2367 bytes python/sources/mysql_cdc/library.json | 115 +++++++++++++ python/sources/mysql_cdc/main.py | 36 +++++ python/sources/mysql_cdc/requirements.txt | 3 + 7 files changed, 452 insertions(+) create mode 100644 python/sources/mysql_cdc/README.md create mode 100644 python/sources/mysql_cdc/dockerfile create mode 100644 python/sources/mysql_cdc/helper_functions.py create mode 100644 python/sources/mysql_cdc/icon.png create mode 100644 python/sources/mysql_cdc/library.json create mode 100644 python/sources/mysql_cdc/main.py create mode 100644 python/sources/mysql_cdc/requirements.txt diff --git a/python/sources/mysql_cdc/README.md b/python/sources/mysql_cdc/README.md new file mode 100644 index 00000000..9debe7f3 --- /dev/null +++ b/python/sources/mysql_cdc/README.md @@ -0,0 +1,162 @@ +# MySQL CDC + +This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It's built using **Quix Streams StatefulSource** to ensure exactly-once processing and automatic recovery after restarts. + +## Key Features + +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing + +## How to run + +1. Set up your MySQL database with binary logging enabled +2. Configure environment variables for MySQL connection +3. Install dependencies: `pip install -r requirements.txt` +4. Run: `python main.py` + +## Environment variables + +### Required MySQL Connection +- **output**: Name of the output topic to write into. +- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server. +- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306). +- **MYSQL_DATABASE**: The name of the database for CDC. +- **MYSQL_USER**: The username that should be used to interact with the database. +- **MYSQL_PASSWORD**: The password for the user configured above. +- **MYSQL_TABLE**: The name of the table for CDC. + +### Optional Configuration +- **MYSQL_SNAPSHOT_HOST**: MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica). +- **MYSQL_INITIAL_SNAPSHOT**: Set to "true" to perform initial snapshot of existing data (default: false). +- **MYSQL_SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000). +- **MYSQL_FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false). + +## Quix Streams StatefulSource + +The connector uses Quix Streams' `StatefulSource` class which provides: + +- **Automatic State Persistence**: Binlog positions and snapshot status are automatically saved to the state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Fault Tolerance**: Automatic recovery from failures with consistent state +- **Production-Ready**: Built on Quix Streams' proven architecture + +### State Management: +- **Binlog Position**: Automatically tracked as `binlog_position_{schema}_{table}` +- **Snapshot Completion**: Tracked as `snapshot_completed_{schema}_{table}` +- **Transactional Processing**: State changes are committed atomically with message production + +Example state data: +```json +{ + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0 +} +``` + +## Initial Snapshot + +Enable initial snapshot to capture existing table data before starting CDC: + +```env +MYSQL_INITIAL_SNAPSHOT=true +MYSQL_SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica +``` + +The initial snapshot: +- Processes data in configurable batches to avoid memory issues +- Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts +- Marks completion in the StatefulSource state store to avoid re-processing on restart +- Can be forced to re-run with `FORCE_SNAPSHOT=true` + +## Requirements / Prerequisites + +- A MySQL Database with binary logging enabled. +- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration. +- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges. +- For initial snapshot: `SELECT` privilege on the target table. + +### MySQL Configuration Example +```ini +[mysqld] +server-id = 1 +log_bin = /var/log/mysql/mysql-bin.log +binlog_expire_logs_seconds = 864000 +max_binlog_size = 100M +binlog-format = ROW +binlog_row_metadata = FULL +binlog_row_image = FULL +``` + +### MySQL User Permissions +```sql +-- Create replication user +CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + +-- Grant replication privileges +GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + +-- Grant select for initial snapshot +GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + +FLUSH PRIVILEGES; +``` + +## Change Event Format + +### INSERT/Snapshot Insert +```json +{ + "kind": "insert", // or "snapshot_insert" for initial snapshot + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "John Doe"], + "oldkeys": {} +} +``` + +### UPDATE +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "Jane Doe"], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "John Doe"] + } +} +``` + +### DELETE +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "Jane Doe"] + } +} +``` + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/python/sources/mysql_cdc/dockerfile b/python/sources/mysql_cdc/dockerfile new file mode 100644 index 00000000..94050c86 --- /dev/null +++ b/python/sources/mysql_cdc/dockerfile @@ -0,0 +1,35 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# TODO: remove this RUN block when done doing "@ git+" install in requirements.txt +# This should be done BEFORE merging PR +RUN apt-get update && \ + apt-get install -y git && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/sources/mysql_cdc/helper_functions.py b/python/sources/mysql_cdc/helper_functions.py new file mode 100644 index 00000000..779146c6 --- /dev/null +++ b/python/sources/mysql_cdc/helper_functions.py @@ -0,0 +1,101 @@ +from datetime import timedelta +import os +import json + + +def load_config(): + driver = os.environ["driver"] + server = os.environ["server"] + user_id = os.environ["userid"] + password = os.environ["password"] + database = os.environ["database"] + table_name = os.environ["table_name"] + last_modified_column = os.environ["last_modified_column"] + time_delta_config = os.environ["time_delta"] + + try: + use_utc_for_offset = bool(os.environ["offset_is_utc"]) + except Exception as e: + raise Exception("Use UTC For Offset must be True or False", e) + + drop_cols = os.getenv("columns_to_drop") + rename_cols = None + passed_rename_cols = os.getenv("columns_to_rename") + + try: + poll_interval = int(os.environ["poll_interval_seconds"]) + except Exception as e: + raise Exception("Poll Interval must be an integer", e) + + if poll_interval < 1: + poll_interval = 1 + + try: + if passed_rename_cols != None and passed_rename_cols != "": + rename_cols = json.loads(passed_rename_cols) + except Exception as e: + raise Exception("Invalid JSON supplied for column renames", e) + + return { + "driver": driver, + "server": server, + "user_id": user_id, + "password": password, + "database": database, + "table_name": table_name, + "last_modified_column": last_modified_column, + "time_delta": make_time_delta_from_config(time_delta_config), + "drop_cols": drop_cols, + "rename_cols": rename_cols, + "use_utc": use_utc_for_offset, + "poll_interval": poll_interval + } + + +def make_time_delta_from_config(time_delta_config) -> timedelta: + time_delta_values = time_delta_config.split(",") + + if len(time_delta_values) != 5: + raise Exception( + "time_delta_config must contain 5 values, one for each of seconds, minutes, hours, days and weeks") + + try: + seconds = int(time_delta_values[0]) + minutes = int(time_delta_values[1]) + hours = int(time_delta_values[2]) + days = int(time_delta_values[3]) + weeks = int(time_delta_values[4]) + return timedelta(seconds = seconds, minutes = minutes, hours = hours, days = days, weeks = weeks) + except TypeError as te: + raise Exception("Unable to cast one of the supplied values to int", te) + except Exception as e: + raise Exception("Something went wrong configuring the time delta", e) + + +def check_table_exists(conn, table) -> bool: + if not conn.cursor().tables(table).fetchone(): + print("Table does not exist") + return False + return True + + +def check_column_exists(conn, table, column_name) -> bool: + for c in conn.cursor().columns(table = table): + if column_name == c.column_name: + return True + print("Key column [{}] not found in table [{}]".format(column_name, table)) + return False + + +def drop_columns(conn, cols_to_drop, table_data, table_name) -> any: + for col in cols_to_drop: + if check_column_exists(conn, table_name, col): + table_data = table_data.drop(col, 1) + return table_data + + +def rename_columns(conn, cols_to_rename, table_data, table_name) -> any: + for col in cols_to_rename: + if check_column_exists(conn, table_name, col): + table_data = table_data.rename(columns={col: cols_to_rename[col]}) + return table_data \ No newline at end of file diff --git a/python/sources/mysql_cdc/icon.png b/python/sources/mysql_cdc/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..725e0be48f0292fa996055f48b72f5ba0be05833 GIT binary patch literal 2367 zcmV-F3BdM=P)(_4vfFKBhAP9mW2!bF8f*=Tj zAP9mW2!bF8f|$!_c_nDE1C)FMw73>FCQqI1F7*#Oc{XUVAGCBp%iBSV0w{3^w73m) zIx*Yb>wR?U63}vc`TzC91E9p0eWQqHL5WdNVm0X0Dlq1?v)b35L?su37A~mNCqmbA z;_aa1W1yr5T29%5e=D&Slz7*y^|vR<%FlojyI?GJLQNy7`2f4#uFMm_^Md0yB0*wdKwF`(o>LCMdDuH)2{OI=V= ze=L0>X!(18!A@=LVo!S*B|Z&WJ{MYSH;T8|(Y`?*S5Y79Y4Mff`tWT?r;qG%k9!Ow zjs_(sLCa5uuHy*LWaigwr{sFOE@<%sFy^o>^t6Z3;xTRIJ%XTvs7bO?_ zdj;`F+Bvra&gDux1=-;R@!O^@_dO$#Z>tY|O(ruRwD>D%k%`?FJs~Y0w!3TetcVuC z9kCKGKvsUVmFA$u$BO%dk(S!`Jvn(MXt}4Z|6h;c;UhrF|9}>c1Hin#*Jo^0Qh*je zY)^*F7Vm-&wKl}}7qnOxxf*{eQSwXv<}7)3$NJV6 zX!&!rlwCVLm;Laz$);IkBNA z;|0*-YCAO%b?`_1)1bx2AuBH~yOT@(_Sc8h{w?;l0vbK`Hup^A3_zYJx(lwG*M zGkj-6#xB>e2P)d&HN*FWlx+CFCJbkF4S?WmqQvity7&={CErl_i>!sJD9)C8mJ5Ip z#ac3y*xNbv19*mgD>5>#iZ6qwA_{%zMYE`867F7g++x?Thr^2Ix}gCOoOb+>2fDet zK+6w?rH2*vH~SRHwDKNl;`hu0CANhW?q?U5rn3Tc5YOR{1%EJ(=D0Fk!@n9bVxBcT zk}R0SSrMb%*^~iF-OEz?AS{{o?!?HNey`95JU1P|B#l3~jtCDhz;V`Sp-eZN(~N z0et29(ZN!DI~tU-E|xupD=GT^fXo>)0OIBOg`VM$H|xKnukSj{ zv}J&?#ED%tK=PdMVw_wPYNzC;NZa$fWq=X|vt*f>lrrU88&Q-!#BBx zxna*Gac=8(09(;r10c8=n^xqb;gr6BtBEo1U^YRQ4A2Oig2D!1PV097KVdcuaA{;? zwyJo2B;tBr7Loa?Fo1`5N0m^-+%bTCK1x0pQ0Dji0}T1+K%1Qv>q9RZ!0_eWH-P`S z6ZrgKW!NE*)=|#@=*Ne@>%N&bfNK(Kswu|?@`46%*y>=r;<^%u39(xW`NXQobcTZw z3eMnA*vM>iXAMx{?S!JHE||=srVL<@<^mf$EnbB2(UWJ&08=J$v}^cp!y|p*efSwF zJgiLqcl2_C?OnriL3h`6m={vC!#9;SXd$=Q@SPam?wkR9Wo)StG2K){X7x~nmY*sf zN(AqqXYDCbw1kWm-iWyl-`+%-2uv3kvc7WKIB=L}*v}$U$|2WbJ3=xHKTyN1-4Q%n za2MOtgy#%+QiKN;Ds*SnFC_y+LJ_rn ze}2d{>>1DDH+g38%xD+!aK$xzt!vnWp5gnN-&62|UY=LHM*^oJ*Wq`&hCSV^EG(sW zB>1J7_abaq`}&4;@R6Ug%+DQLx2_6&f3Rrzoygtx9;}XGW8B#`DpLmPzm6cngk~I1 zv#QCOF#H2}UJw}FyKqgYC2PtoIjWgHMtA714lWijp?PJ}$s8)nWue5ClOG1VIo4K@bE% l5ClOG1VIo4K@c$a_&