Skip to content

Split MySQL CDC into separate template and quix connector #626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions python/sources/mysql_cdc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# MySQL CDC

This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It's built using **Quix Streams StatefulSource** to ensure exactly-once processing and automatic recovery after restarts.

## Key Features

- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework
- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking
- **Exactly-Once Processing**: No data loss during application restarts or failures
- **Initial Snapshot**: Optionally capture existing data before starting CDC
- **Automatic Recovery**: Seamlessly resume processing after interruptions
- **Change Buffering**: Batches changes for efficient Kafka publishing

## How to run

1. Set up your MySQL database with binary logging enabled
2. Configure environment variables for MySQL connection
3. Install dependencies: `pip install -r requirements.txt`
4. Run: `python main.py`

## Environment variables

### Required MySQL Connection
- **output**: Name of the output topic to write into.
- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server.
- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306).
- **MYSQL_DATABASE**: The name of the database for CDC.
- **MYSQL_USER**: The username that should be used to interact with the database.
- **MYSQL_PASSWORD**: The password for the user configured above.
- **MYSQL_TABLE**: The name of the table for CDC.

### Optional Configuration
- **MYSQL_SNAPSHOT_HOST**: MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica).
- **MYSQL_INITIAL_SNAPSHOT**: Set to "true" to perform initial snapshot of existing data (default: false).
- **MYSQL_SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000).
- **MYSQL_FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false).

## Quix Streams StatefulSource

The connector uses Quix Streams' `StatefulSource` class which provides:

- **Automatic State Persistence**: Binlog positions and snapshot status are automatically saved to the state store
- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication
- **Fault Tolerance**: Automatic recovery from failures with consistent state
- **Production-Ready**: Built on Quix Streams' proven architecture

### State Management:
- **Binlog Position**: Automatically tracked as `binlog_position_{schema}_{table}`
- **Snapshot Completion**: Tracked as `snapshot_completed_{schema}_{table}`
- **Transactional Processing**: State changes are committed atomically with message production

Example state data:
```json
{
"log_file": "mysql-bin.000123",
"log_pos": 45678,
"timestamp": 1704067200.0
}
```

## Initial Snapshot

Enable initial snapshot to capture existing table data before starting CDC:

```env
MYSQL_INITIAL_SNAPSHOT=true
MYSQL_SNAPSHOT_BATCH_SIZE=1000
MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica
```

The initial snapshot:
- Processes data in configurable batches to avoid memory issues
- Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts
- Marks completion in the StatefulSource state store to avoid re-processing on restart
- Can be forced to re-run with `FORCE_SNAPSHOT=true`

## Requirements / Prerequisites

- A MySQL Database with binary logging enabled.
- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration.
- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges.
- For initial snapshot: `SELECT` privilege on the target table.

### MySQL Configuration Example
```ini
[mysqld]
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_expire_logs_seconds = 864000
max_binlog_size = 100M
binlog-format = ROW
binlog_row_metadata = FULL
binlog_row_image = FULL
```

### MySQL User Permissions
```sql
-- Create replication user
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password';

-- Grant replication privileges
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';

-- Grant select for initial snapshot
GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%';

FLUSH PRIVILEGES;
```

## Change Event Format

### INSERT/Snapshot Insert
```json
{
"kind": "insert", // or "snapshot_insert" for initial snapshot
"schema": "database_name",
"table": "table_name",
"columnnames": ["id", "name"],
"columnvalues": [123, "John Doe"],
"oldkeys": {}
}
```

### UPDATE
```json
{
"kind": "update",
"schema": "database_name",
"table": "table_name",
"columnnames": ["id", "name"],
"columnvalues": [123, "Jane Doe"],
"oldkeys": {
"keynames": ["id", "name"],
"keyvalues": [123, "John Doe"]
}
}
```

### DELETE
```json
{
"kind": "delete",
"schema": "database_name",
"table": "table_name",
"columnnames": [],
"columnvalues": [],
"oldkeys": {
"keynames": ["id", "name"],
"keyvalues": [123, "Jane Doe"]
}
}
```

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
35 changes: 35 additions & 0 deletions python/sources/mysql_cdc/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# TODO: remove this RUN block when done doing "@ git+" install in requirements.txt
# This should be done BEFORE merging PR
RUN apt-get update && \
apt-get install -y git && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
101 changes: 101 additions & 0 deletions python/sources/mysql_cdc/helper_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from datetime import timedelta
import os
import json


def load_config():
driver = os.environ["driver"]
server = os.environ["server"]
user_id = os.environ["userid"]
password = os.environ["password"]
database = os.environ["database"]
table_name = os.environ["table_name"]
last_modified_column = os.environ["last_modified_column"]
time_delta_config = os.environ["time_delta"]

try:
use_utc_for_offset = bool(os.environ["offset_is_utc"])
except Exception as e:
raise Exception("Use UTC For Offset must be True or False", e)

drop_cols = os.getenv("columns_to_drop")
rename_cols = None
passed_rename_cols = os.getenv("columns_to_rename")

try:
poll_interval = int(os.environ["poll_interval_seconds"])
except Exception as e:
raise Exception("Poll Interval must be an integer", e)

if poll_interval < 1:
poll_interval = 1

try:
if passed_rename_cols != None and passed_rename_cols != "":
rename_cols = json.loads(passed_rename_cols)
except Exception as e:
raise Exception("Invalid JSON supplied for column renames", e)

return {
"driver": driver,
"server": server,
"user_id": user_id,
"password": password,
"database": database,
"table_name": table_name,
"last_modified_column": last_modified_column,
"time_delta": make_time_delta_from_config(time_delta_config),
"drop_cols": drop_cols,
"rename_cols": rename_cols,
"use_utc": use_utc_for_offset,
"poll_interval": poll_interval
}


def make_time_delta_from_config(time_delta_config) -> timedelta:
time_delta_values = time_delta_config.split(",")

if len(time_delta_values) != 5:
raise Exception(
"time_delta_config must contain 5 values, one for each of seconds, minutes, hours, days and weeks")

try:
seconds = int(time_delta_values[0])
minutes = int(time_delta_values[1])
hours = int(time_delta_values[2])
days = int(time_delta_values[3])
weeks = int(time_delta_values[4])
return timedelta(seconds = seconds, minutes = minutes, hours = hours, days = days, weeks = weeks)
except TypeError as te:
raise Exception("Unable to cast one of the supplied values to int", te)
except Exception as e:
raise Exception("Something went wrong configuring the time delta", e)


def check_table_exists(conn, table) -> bool:
if not conn.cursor().tables(table).fetchone():
print("Table does not exist")
return False
return True


def check_column_exists(conn, table, column_name) -> bool:
for c in conn.cursor().columns(table = table):
if column_name == c.column_name:
return True
print("Key column [{}] not found in table [{}]".format(column_name, table))
return False


def drop_columns(conn, cols_to_drop, table_data, table_name) -> any:
for col in cols_to_drop:
if check_column_exists(conn, table_name, col):
table_data = table_data.drop(col, 1)
return table_data


def rename_columns(conn, cols_to_rename, table_data, table_name) -> any:
for col in cols_to_rename:
if check_column_exists(conn, table_name, col):
table_data = table_data.rename(columns={col: cols_to_rename[col]})
return table_data
Binary file added python/sources/mysql_cdc/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading