diff --git a/.changeset/wet-berries-enjoy.md b/.changeset/wet-berries-enjoy.md new file mode 100644 index 00000000..5d07184b --- /dev/null +++ b/.changeset/wet-berries-enjoy.md @@ -0,0 +1,21 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +'@powersync/service-sync-rules': minor +--- + +MySQL: +- Added schema change handling + - Except for some edge cases, the following schema changes are now handled automatically: + - Creation, renaming, dropping and truncation of tables. + - Creation and dropping of unique indexes and primary keys. + - Adding, modifying, dropping and renaming of table columns. + - If a schema change cannot handled automatically, a warning with details will be logged. + - Mismatches in table schema from the Zongji binlog listener are now handled more gracefully. +- Replication of wildcard tables is now supported. +- Improved logging for binlog event processing. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 6a0cfcf0..bd316624 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -164,9 +164,9 @@ export class MongoSyncBucketStorage async resolveTable(options: storage.ResolveTableOptions): Promise { const { group_id, connection_id, connection_tag, entity_descriptor } = options; - const { schema, name: table, objectId, replicationColumns } = entity_descriptor; + const { schema, name, objectId, replicaIdColumns } = entity_descriptor; - const columns = replicationColumns.map((column) => ({ + const normalizedReplicaIdColumns = replicaIdColumns.map((column) => ({ name: column.name, type: column.type, type_oid: column.typeId @@ -178,8 +178,8 @@ export class MongoSyncBucketStorage group_id: group_id, connection_id: connection_id, schema_name: schema, - table_name: table, - replica_id_columns2: columns + table_name: name, + replica_id_columns2: normalizedReplicaIdColumns }; if (objectId != null) { filter.relation_id = objectId; @@ -192,24 +192,24 @@ export class MongoSyncBucketStorage connection_id: connection_id, relation_id: objectId, schema_name: schema, - table_name: table, + table_name: name, replica_id_columns: null, - replica_id_columns2: columns, + replica_id_columns2: normalizedReplicaIdColumns, snapshot_done: false, snapshot_status: undefined }; await col.insertOne(doc, { session }); } - const sourceTable = new storage.SourceTable( - doc._id, - connection_tag, - objectId, - schema, - table, - replicationColumns, - doc.snapshot_done ?? true - ); + const sourceTable = new storage.SourceTable({ + id: doc._id, + connectionTag: connection_tag, + objectId: objectId, + schema: schema, + name: name, + replicaIdColumns: replicaIdColumns, + snapshotComplete: doc.snapshot_done ?? true + }); sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable); sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable); @@ -224,7 +224,7 @@ export class MongoSyncBucketStorage let dropTables: storage.SourceTable[] = []; // Detect tables that are either renamed, or have different replica_id_columns - let truncateFilter = [{ schema_name: schema, table_name: table }] as any[]; + let truncateFilter = [{ schema_name: schema, table_name: name }] as any[]; if (objectId != null) { // Only detect renames if the source uses relation ids. truncateFilter.push({ relation_id: objectId }); @@ -242,15 +242,16 @@ export class MongoSyncBucketStorage .toArray(); dropTables = truncate.map( (doc) => - new storage.SourceTable( - doc._id, - connection_tag, - doc.relation_id, - doc.schema_name, - doc.table_name, - doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [], - doc.snapshot_done ?? true - ) + new storage.SourceTable({ + id: doc._id, + connectionTag: connection_tag, + objectId: doc.relation_id, + schema: doc.schema_name, + name: doc.table_name, + replicaIdColumns: + doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [], + snapshotComplete: doc.snapshot_done ?? true + }) ); result = { @@ -577,7 +578,6 @@ export class MongoSyncBucketStorage `${this.slot_name} Cleared batch of data in ${lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...` ); await timers.setTimeout(lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5); - continue; } else { throw e; } diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index f7c523b9..9a093b2e 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; import { MongoManager } from '../replication/MongoManager.js'; -import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js'; +import { constructAfterRecord, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js'; import * as types from '../types/types.js'; import { escapeRegExp } from '../utils.js'; @@ -137,15 +137,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { if (tablePattern.isWildcard) { patternResult.tables = []; for (let collection of collections) { - const sourceTable = new SourceTable( - 0, - this.connectionTag, - collection.name, - schema, - collection.name, - [], - true - ); + const sourceTable = new SourceTable({ + id: 0, + connectionTag: this.connectionTag, + objectId: collection.name, + schema: schema, + name: collection.name, + replicaIdColumns: [], + snapshotComplete: true + }); let errors: service_types.ReplicationError[] = []; if (collection.type == 'view') { errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` }); @@ -164,15 +164,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { }); } } else { - const sourceTable = new SourceTable( - 0, - this.connectionTag, - tablePattern.name, - schema, - tablePattern.name, - [], - true - ); + const sourceTable = new SourceTable({ + id: 0, + connectionTag: this.connectionTag, + objectId: tablePattern.name, + schema: schema, + name: tablePattern.name, + replicaIdColumns: [], + snapshotComplete: true + }); const syncData = sqlSyncRules.tableSyncsData(sourceTable); const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 21d83d27..0a516af8 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -215,7 +215,7 @@ export class ChangeStream { async estimatedCountNumber(table: storage.SourceTable): Promise { const db = this.client.db(table.schema); - return await db.collection(table.table).estimatedDocumentCount(); + return await db.collection(table.name).estimatedDocumentCount(); } /** @@ -449,7 +449,7 @@ export class ChangeStream { const totalEstimatedCount = await this.estimatedCountNumber(table); let at = table.snapshotStatus?.replicatedCount ?? 0; const db = this.client.db(table.schema); - const collection = db.collection(table.table); + const collection = db.collection(table.name); await using query = new ChunkedSnapshotQuery({ collection, key: table.snapshotStatus?.lastKey, diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index afaa09cf..4ac11efb 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -13,7 +13,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S schema: source.db, // Not relevant for MongoDB - we use db + coll name as the identifier objectId: undefined, - replicationColumns: [{ name: '_id' }] + replicaIdColumns: [{ name: '_id' }] } satisfies storage.SourceEntityDescriptor; } @@ -22,7 +22,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S */ export function getCacheIdentifier(source: storage.SourceEntityDescriptor | storage.SourceTable): string { if (source instanceof storage.SourceTable) { - return `${source.schema}.${source.table}`; + return `${source.schema}.${source.name}`; } return `${source.schema}.${source.name}`; } diff --git a/modules/module-mysql/dev/docker/mysql/init-scripts/my.cnf b/modules/module-mysql/dev/docker/mysql/init-scripts/my.cnf index 99f01c70..ea21db79 100644 --- a/modules/module-mysql/dev/docker/mysql/init-scripts/my.cnf +++ b/modules/module-mysql/dev/docker/mysql/init-scripts/my.cnf @@ -4,6 +4,4 @@ enforce-gtid-consistency = ON # Row format required for ZongJi binlog_format = row log_bin=mysql-bin -server-id=1 -binlog-do-db=mydatabase -replicate-do-table=mydatabase.lists \ No newline at end of file +server-id=1 \ No newline at end of file diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index e15f116b..d1090da8 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,9 +33,10 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "0.2.0", + "@powersync/mysql-zongji": "^0.4.0", "async": "^3.2.4", "mysql2": "^3.11.0", + "node-sql-parser": "^5.3.9", "semver": "^7.5.4", "ts-codec": "^1.3.0", "uri-js": "^4.4.1", diff --git a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts index a76751ef..19f1cde6 100644 --- a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts +++ b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts @@ -208,7 +208,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI { idColumnsResult = await common.getReplicationIdentityColumns({ connection: connection, schema, - table_name: tableName + tableName: tableName }); } catch (ex) { idColumnsError = { level: 'fatal', message: ex.message }; @@ -217,7 +217,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI { } const idColumns = idColumnsResult?.columns ?? []; - const sourceTable = new storage.SourceTable(0, this.config.tag, tableName, schema, tableName, idColumns, true); + const sourceTable = new storage.SourceTable({ + id: 0, + connectionTag: this.config.tag, + objectId: tableName, + schema: schema, + name: tableName, + replicaIdColumns: idColumns, + snapshotComplete: true + }); const syncData = syncRules.tableSyncsData(sourceTable); const syncParameters = syncRules.tableSyncsParameters(sourceTable); @@ -232,7 +240,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI { let selectError: service_types.ReplicationError | null = null; try { await this.retriedQuery({ - query: `SELECT * FROM ${sourceTable.table} LIMIT 1` + query: `SELECT * FROM ${sourceTable.name} LIMIT 1` }); } catch (e) { selectError = { level: 'fatal', message: e.message }; diff --git a/modules/module-mysql/src/common/ReplicatedGTID.ts b/modules/module-mysql/src/common/ReplicatedGTID.ts index d51d43a7..dc7713e9 100644 --- a/modules/module-mysql/src/common/ReplicatedGTID.ts +++ b/modules/module-mysql/src/common/ReplicatedGTID.ts @@ -92,10 +92,15 @@ export class ReplicatedGTID { * @returns A comparable string in the format * `padded_end_transaction|raw_gtid|binlog_filename|binlog_position` */ - get comparable() { + get comparable(): string { const { raw, position } = this; const [, transactionRanges] = this.raw.split(':'); + // This means no transactions have been executed on the database yet + if (!transactionRanges) { + return ReplicatedGTID.ZERO.comparable; + } + let maxTransactionId = 0; for (const range of transactionRanges.split(',')) { diff --git a/modules/module-mysql/src/common/common-index.ts b/modules/module-mysql/src/common/common-index.ts index 6da00571..63112da4 100644 --- a/modules/module-mysql/src/common/common-index.ts +++ b/modules/module-mysql/src/common/common-index.ts @@ -1,6 +1,5 @@ export * from './check-source-configuration.js'; -export * from './get-replication-columns.js'; -export * from './get-tables-from-pattern.js'; +export * from './schema-utils.js'; export * from './mysql-to-sqlite.js'; export * from './read-executed-gtid.js'; export * from './ReplicatedGTID.js'; diff --git a/modules/module-mysql/src/common/get-tables-from-pattern.ts b/modules/module-mysql/src/common/get-tables-from-pattern.ts deleted file mode 100644 index 166bf93a..00000000 --- a/modules/module-mysql/src/common/get-tables-from-pattern.ts +++ /dev/null @@ -1,44 +0,0 @@ -import * as sync_rules from '@powersync/service-sync-rules'; -import mysql from 'mysql2/promise'; - -export type GetDebugTablesInfoOptions = { - connection: mysql.Connection; - tablePattern: sync_rules.TablePattern; -}; - -export async function getTablesFromPattern(options: GetDebugTablesInfoOptions): Promise> { - const { connection, tablePattern } = options; - const schema = tablePattern.schema; - - if (tablePattern.isWildcard) { - const [results] = await connection.query( - `SELECT - TABLE_NAME AS table_name - FROM - INFORMATION_SCHEMA.TABLES - WHERE - TABLE_SCHEMA = ? - AND TABLE_NAME LIKE ?`, - [schema, tablePattern.tablePattern] - ); - - return new Set( - results - .filter((result) => result.table_name.startsWith(tablePattern.tablePrefix)) - .map((result) => result.table_name) - ); - } else { - const [[match]] = await connection.query( - `SELECT - TABLE_NAME AS table_name - FROM - INFORMATION_SCHEMA.TABLES - WHERE - TABLE_SCHEMA = ? - AND TABLE_NAME = ?`, - [tablePattern.schema, tablePattern.tablePattern] - ); - // Only return the first result - return new Set([match.table_name]); - } -} diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index aa5aa949..46a4b058 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -183,6 +183,9 @@ export function toSQLiteRow(row: Record, columns: Map { + const { connection, schema, tableName } = options; + + const [allColumns] = await mysql_utils.retriedQuery({ + connection: connection, + query: ` + SELECT + s.COLUMN_NAME AS name, + c.DATA_TYPE as type + FROM + INFORMATION_SCHEMA.COLUMNS s + JOIN + INFORMATION_SCHEMA.COLUMNS c + ON + s.TABLE_SCHEMA = c.TABLE_SCHEMA + AND s.TABLE_NAME = c.TABLE_NAME + AND s.COLUMN_NAME = c.COLUMN_NAME + WHERE + s.TABLE_SCHEMA = ? + AND s.TABLE_NAME = ? + ORDER BY + s.ORDINAL_POSITION; + `, + params: [schema, tableName] + }); + + return allColumns.map((row) => { + return { + name: row.name, + type: row.type + }; + }); +} + +export interface GetReplicationIdentityColumnsOptions { + connection: mysqlPromise.Connection; + schema: string; + tableName: string; +} -export type ReplicationIdentityColumnsResult = { - columns: storage.ColumnDescriptor[]; +export interface ReplicationIdentityColumnsResult { + columns: ColumnDescriptor[]; // TODO maybe export an enum from the core package identity: string; -}; +} export async function getReplicationIdentityColumns( - options: GetReplicationColumnsOptions + options: GetReplicationIdentityColumnsOptions ): Promise { - const { connection, schema, table_name } = options; + const { connection, schema, tableName } = options; const [primaryKeyColumns] = await mysql_utils.retriedQuery({ connection: connection, query: ` @@ -39,7 +80,7 @@ export async function getReplicationIdentityColumns( ORDER BY s.SEQ_IN_INDEX; `, - params: [schema, table_name] + params: [schema, tableName] }); if (primaryKeyColumns.length) { @@ -52,8 +93,7 @@ export async function getReplicationIdentityColumns( }; } - // TODO: test code with tables with unique keys, compound key etc. - // No primary key, find the first valid unique key + // No primary key, check if any of the columns have a unique constraint we can use const [uniqueKeyColumns] = await mysql_utils.retriedQuery({ connection: connection, query: ` @@ -78,7 +118,7 @@ export async function getReplicationIdentityColumns( AND s.NON_UNIQUE = 0 ORDER BY s.SEQ_IN_INDEX; `, - params: [schema, table_name] + params: [schema, tableName] }); if (uniqueKeyColumns.length > 0) { @@ -91,34 +131,53 @@ export async function getReplicationIdentityColumns( }; } - const [allColumns] = await mysql_utils.retriedQuery({ + const allColumns = await getColumns({ connection: connection, - query: ` - SELECT - s.COLUMN_NAME AS name, - c.DATA_TYPE as type - FROM - INFORMATION_SCHEMA.COLUMNS s - JOIN - INFORMATION_SCHEMA.COLUMNS c - ON - s.TABLE_SCHEMA = c.TABLE_SCHEMA - AND s.TABLE_NAME = c.TABLE_NAME - AND s.COLUMN_NAME = c.COLUMN_NAME - WHERE - s.TABLE_SCHEMA = ? - AND s.TABLE_NAME = ? - ORDER BY - s.ORDINAL_POSITION; - `, - params: [schema, table_name] + schema: schema, + tableName: tableName }); return { - columns: allColumns.map((row) => ({ - name: row.name, - type: row.type - })), + columns: allColumns, identity: 'full' }; } + +export async function getTablesFromPattern( + connection: mysqlPromise.Connection, + tablePattern: TablePattern +): Promise { + const schema = tablePattern.schema; + + if (tablePattern.isWildcard) { + const [results] = await mysql_utils.retriedQuery({ + connection: connection, + query: ` + SELECT TABLE_NAME + FROM information_schema.tables + WHERE TABLE_SCHEMA = ? + AND TABLE_NAME LIKE ? + AND table_type = 'BASE TABLE' + `, + params: [schema, tablePattern.tablePattern] + }); + + return results + .map((row) => row.TABLE_NAME) + .filter((tableName: string) => tableName.startsWith(tablePattern.tablePrefix)); + } else { + const [results] = await mysql_utils.retriedQuery({ + connection: connection, + query: ` + SELECT TABLE_NAME + FROM information_schema.tables + WHERE TABLE_SCHEMA = ? + AND TABLE_NAME = ? + AND table_type = 'BASE TABLE' + `, + params: [schema, tablePattern.tablePattern] + }); + + return results.map((row) => row.TABLE_NAME); + } +} diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 3a4b76d7..e651143a 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -11,6 +11,7 @@ import { framework, getUuidReplicaIdentityBson, MetricsEngine, + SourceTable, storage } from '@powersync/service-core'; import mysql from 'mysql2'; @@ -18,10 +19,10 @@ import mysqlPromise from 'mysql2/promise'; import { TableMapEntry } from '@powersync/mysql-zongji'; import * as common from '../common/common-index.js'; -import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js'; +import { createRandomServerId, qualifiedMySQLTable } from '../utils/mysql-utils.js'; import { MySQLConnectionManager } from './MySQLConnectionManager.js'; import { ReplicationMetric } from '@powersync/service-types'; -import { BinLogEventHandler, BinLogListener, Row } from './zongji/BinLogListener.js'; +import { BinLogEventHandler, BinLogListener, Row, SchemaChange, SchemaChangeType } from './zongji/BinLogListener.js'; export interface BinLogStreamOptions { connections: MySQLConnectionManager; @@ -83,7 +84,7 @@ export class BinLogStream { * Keep track of whether we have done a commit or keepalive yet. * We can only compute replication lag if isStartingReplication == false, or oldestUncommittedChange is present. */ - private isStartingReplication = true; + isStartingReplication = true; constructor(private options: BinLogStreamOptions) { this.logger = options.logger ?? defaultLogger; @@ -158,10 +159,10 @@ export class BinLogStream { const promiseConnection = (connection as mysql.Connection).promise(); try { await promiseConnection.query(`SET time_zone = '+00:00'`); - await promiseConnection.query('BEGIN'); + await promiseConnection.query('START TRANSACTION'); try { gtid = await common.readExecutedGtid(promiseConnection); - await this.snapshotTable(connection.connection, batch, result.table); + await this.snapshotTable(connection as mysql.Connection, batch, result.table); await promiseConnection.query('COMMIT'); } catch (e) { await this.tryRollback(promiseConnection); @@ -185,62 +186,24 @@ export class BinLogStream { return []; } - let tableRows: any[]; - const prefix = tablePattern.isWildcard ? tablePattern.tablePrefix : undefined; - if (tablePattern.isWildcard) { - const result = await this.connections.query( - `SELECT TABLE_NAME -FROM information_schema.tables -WHERE TABLE_SCHEMA = ? AND TABLE_NAME LIKE ?; -`, - [tablePattern.schema, tablePattern.tablePattern] - ); - tableRows = result[0]; - } else { - const result = await this.connections.query( - `SELECT TABLE_NAME -FROM information_schema.tables -WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?; -`, - [tablePattern.schema, tablePattern.tablePattern] - ); - tableRows = result[0]; - } - let tables: storage.SourceTable[] = []; - - for (let row of tableRows) { - const name = row['TABLE_NAME'] as string; - if (prefix && !name.startsWith(prefix)) { - continue; - } - - const result = await this.connections.query( - `SELECT 1 -FROM information_schema.tables -WHERE table_schema = ? AND table_name = ? -AND table_type = 'BASE TABLE';`, - [tablePattern.schema, tablePattern.name] - ); - if (result[0].length == 0) { - this.logger.info(`Skipping ${tablePattern.schema}.${name} - no table exists/is not a base table`); - continue; - } + const connection = await this.connections.getConnection(); + const matchedTables: string[] = await common.getTablesFromPattern(connection, tablePattern); + connection.release(); - const connection = await this.connections.getConnection(); - const replicationColumns = await common.getReplicationIdentityColumns({ - connection: connection, - schema: tablePattern.schema, - table_name: tablePattern.name - }); - connection.release(); + let tables: storage.SourceTable[] = []; + for (const matchedTable of matchedTables) { + const replicaIdColumns = await this.getReplicaIdColumns(matchedTable, tablePattern.schema); const table = await this.handleRelation( batch, { - name, + name: matchedTable, schema: tablePattern.schema, - objectId: getMysqlRelId(tablePattern), - replicationColumns: replicationColumns.columns + objectId: getMysqlRelId({ + schema: tablePattern.schema, + name: matchedTable + }), + replicaIdColumns: replicaIdColumns }, false ); @@ -337,11 +300,11 @@ AND table_type = 'BASE TABLE';`, batch: storage.BucketStorageBatch, table: storage.SourceTable ) { - this.logger.info(`Replicating ${table.qualifiedName}`); + this.logger.info(`Replicating ${qualifiedMySQLTable(table)}`); // TODO count rows and log progress at certain batch sizes // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${escapeMysqlTableName(table)}`); + const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`); const stream = query.stream(); let columns: Map | undefined = undefined; @@ -448,11 +411,9 @@ AND table_type = 'BASE TABLE';`, { zeroLSN: common.ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, async (batch) => { const binlogEventHandler = this.createBinlogEventHandler(batch); - // Only listen for changes to tables in the sync rules - const includedTables = [...this.tableCache.values()].map((table) => table.table); const binlogListener = new BinLogListener({ logger: this.logger, - includedTables: includedTables, + sourceTables: this.syncRules.getSourceTables(), startPosition: binLogPositionState, connectionManager: this.connections, serverId: serverId, @@ -461,15 +422,15 @@ AND table_type = 'BASE TABLE';`, this.abortSignal.addEventListener( 'abort', - () => { + async () => { this.logger.info('Abort signal received, stopping replication...'); - binlogListener.stop(); + await binlogListener.stop(); }, { once: true } ); - // Only returns when the replication is stopped or interrupted by an error await binlogListener.start(); + await binlogListener.replicateUntilStopped(); } ); } @@ -515,10 +476,91 @@ AND table_type = 'BASE TABLE';`, }, onRotate: async () => { this.isStartingReplication = false; + }, + onSchemaChange: async (change: SchemaChange) => { + await this.handleSchemaChange(batch, change); } }; } + private async handleSchemaChange(batch: storage.BucketStorageBatch, change: SchemaChange): Promise { + if (change.type === SchemaChangeType.RENAME_TABLE) { + const fromTableId = getMysqlRelId({ + schema: change.schema, + name: change.table + }); + + const fromTable = this.tableCache.get(fromTableId); + // Old table needs to be cleaned up + if (fromTable) { + await batch.drop([fromTable]); + this.tableCache.delete(fromTableId); + } + // The new table matched a table in the sync rules + if (change.newTable) { + await this.handleCreateOrUpdateTable(batch, change.newTable!, change.schema); + } + } else { + const tableId = getMysqlRelId({ + schema: change.schema, + name: change.table + }); + + const table = this.getTable(tableId); + + switch (change.type) { + case SchemaChangeType.ALTER_TABLE_COLUMN: + case SchemaChangeType.REPLICATION_IDENTITY: + // For these changes, we need to update the table if the replication identity columns have changed. + await this.handleCreateOrUpdateTable(batch, change.table, change.schema); + break; + case SchemaChangeType.TRUNCATE_TABLE: + await batch.truncate([table]); + break; + case SchemaChangeType.DROP_TABLE: + await batch.drop([table]); + this.tableCache.delete(tableId); + break; + default: + // No action needed for other schema changes + break; + } + } + } + + private async getReplicaIdColumns(tableName: string, schema: string) { + const connection = await this.connections.getConnection(); + const replicaIdColumns = await common.getReplicationIdentityColumns({ + connection, + schema, + tableName + }); + connection.release(); + + return replicaIdColumns.columns; + } + + private async handleCreateOrUpdateTable( + batch: storage.BucketStorageBatch, + tableName: string, + schema: string + ): Promise { + const replicaIdColumns = await this.getReplicaIdColumns(tableName, schema); + return await this.handleRelation( + batch, + { + name: tableName, + schema: schema, + objectId: getMysqlRelId({ + schema: schema, + name: tableName + }), + replicaIdColumns: replicaIdColumns + }, + true + ); + } + private async writeChanges( batch: storage.BucketStorageBatch, msg: { @@ -529,17 +571,23 @@ AND table_type = 'BASE TABLE';`, } ): Promise { const columns = common.toColumnDescriptors(msg.tableEntry); + const tableId = getMysqlRelId({ + schema: msg.tableEntry.parentSchema, + name: msg.tableEntry.tableName + }); + + let table = this.tableCache.get(tableId); + if (table == null) { + // This write event is for a new table that matches a table in the sync rules + // We need to create the table in the storage and cache it. + table = await this.handleCreateOrUpdateTable(batch, msg.tableEntry.tableName, msg.tableEntry.parentSchema); + } for (const [index, row] of msg.rows.entries()) { await this.writeChange(batch, { type: msg.type, database: msg.tableEntry.parentSchema, - sourceTable: this.getTable( - getMysqlRelId({ - schema: msg.tableEntry.parentSchema, - name: msg.tableEntry.tableName - }) - ), + sourceTable: table!, table: msg.tableEntry.tableName, columns: columns, row: row, diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index 47432b2a..4de30f53 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -1,16 +1,62 @@ import * as common from '../../common/common-index.js'; import async from 'async'; -import { BinLogEvent, StartOptions, TableMapEntry, ZongJi } from '@powersync/mysql-zongji'; +import { BinLogEvent, BinLogQueryEvent, StartOptions, TableMapEntry, ZongJi } from '@powersync/mysql-zongji'; import * as zongji_utils from './zongji-utils.js'; import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework'; import { MySQLConnectionManager } from '../MySQLConnectionManager.js'; +import timers from 'timers/promises'; +import pkg, { + AST, + BaseFrom, + DropIndexStatement, + Parser as ParserType, + RenameStatement, + TruncateStatement +} from 'node-sql-parser'; +import { + isAlterTable, + isColumnExpression, + isConstraintExpression, + isCreateUniqueIndex, + isDropIndex, + isDropTable, + isRenameExpression, + isRenameTable, + isTruncate, + matchedSchemaChangeQuery +} from '../../utils/parser-utils.js'; +import { TablePattern } from '@powersync/service-sync-rules'; -// Maximum time the processing queue can be paused before resuming automatically -// MySQL server will automatically terminate replication connections after 60 seconds of inactivity, so this guards against that. -const MAX_QUEUE_PAUSE_TIME_MS = 45_000; +const { Parser } = pkg; export type Row = Record; +/** + * Schema changes that can be detected by inspecting query events. + * Note that create table statements are not included here, since new tables are automatically detected when row events + * are received for them. + */ +export enum SchemaChangeType { + RENAME_TABLE = 'Rename Table', + DROP_TABLE = 'Drop Table', + TRUNCATE_TABLE = 'Truncate Table', + ALTER_TABLE_COLUMN = 'Alter Table Column', + REPLICATION_IDENTITY = 'Alter Replication Identity' +} + +export interface SchemaChange { + type: SchemaChangeType; + /** + * The table that the schema change applies to. + */ + table: string; + schema: string; + /** + * Populated for table renames if the newTable was matched by the DatabaseFilter + */ + newTable?: string; +} + export interface BinLogEventHandler { onTransactionStart: (options: { timestamp: Date }) => Promise; onRotate: () => Promise; @@ -18,12 +64,13 @@ export interface BinLogEventHandler { onUpdate: (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => Promise; onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise; onCommit: (lsn: string) => Promise; + onSchemaChange: (change: SchemaChange) => Promise; } export interface BinLogListenerOptions { connectionManager: MySQLConnectionManager; eventHandler: BinLogEventHandler; - includedTables: string[]; + sourceTables: TablePattern[]; serverId: number; startPosition: common.BinLogPosition; logger?: Logger; @@ -34,18 +81,24 @@ export interface BinLogListenerOptions { * events on the provided BinLogEventHandler. */ export class BinLogListener { + private sqlParser: ParserType; private connectionManager: MySQLConnectionManager; private eventHandler: BinLogEventHandler; private binLogPosition: common.BinLogPosition; private currentGTID: common.ReplicatedGTID | null; private logger: Logger; + private listenerError: Error | null; + private databaseFilter: { [schema: string]: (table: string) => boolean }; zongji: ZongJi; processingQueue: async.QueueObject; + + isStopped: boolean = false; + isStopping: boolean = false; /** * The combined size in bytes of all the binlog events currently in the processing queue. */ - queueMemoryUsage: number; + queueMemoryUsage: number = 0; constructor(public options: BinLogListenerOptions) { this.logger = options.logger ?? defaultLogger; @@ -53,10 +106,11 @@ export class BinLogListener { this.eventHandler = options.eventHandler; this.binLogPosition = options.startPosition; this.currentGTID = null; - - this.processingQueue = async.queue(this.createQueueWorker(), 1); - this.queueMemoryUsage = 0; + this.sqlParser = new Parser(); + this.processingQueue = this.createProcessingQueue(); this.zongji = this.createZongjiListener(); + this.listenerError = null; + this.databaseFilter = this.createDatabaseFilter(options.sourceTables); } /** @@ -67,126 +121,155 @@ export class BinLogListener { return this.connectionManager.options.binlog_queue_memory_limit * 1024 * 1024; } - public async start(): Promise { + public async start(isRestart: boolean = false): Promise { if (this.isStopped) { return; } - this.logger.info(`Starting replication. Created replica client with serverId:${this.options.serverId}`); + + this.logger.info( + `${isRestart ? 'Restarting' : 'Starting'} BinLog Listener with replica client id:${this.options.serverId}...` + ); + + // Set a heartbeat interval for the Zongji replication connection + // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown + // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + // The heartbeat needs to be set before starting the listener, since the replication connection is locked once replicating + await new Promise((resolve, reject) => { + this.zongji.connection.query( + // In nanoseconds, 10^9 = 1s + 'set @master_heartbeat_period=28*1000000000', + (error: any, results: any, _fields: any) => { + if (error) { + reject(error); + } else { + this.logger.info('Successfully set up replication connection heartbeat.'); + resolve(results); + } + } + ); + }); + + // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. + // The timeout here must be greater than the master_heartbeat_period. + const socket = this.zongji.connection._socket!; + socket.setTimeout(60_000, () => { + this.logger.info('Destroying socket due to replication connection timeout.'); + socket.destroy(new Error('Replication connection timeout.')); + }); this.zongji.start({ // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive // tablemap events always need to be included for the other row events to work - includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], - includeSchema: { [this.connectionManager.databaseName]: this.options.includedTables }, + includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog', 'query'], + includeSchema: this.databaseFilter, filename: this.binLogPosition.filename, position: this.binLogPosition.offset, serverId: this.options.serverId } satisfies StartOptions); - return new Promise((resolve, reject) => { - // Handle an edge case where the listener has already been stopped before completing startup - if (this.isStopped) { - this.logger.info('BinLog listener was stopped before startup completed.'); + return new Promise((resolve) => { + this.zongji.once('ready', () => { + this.logger.info( + `BinLog Listener ${isRestart ? 'restarted' : 'started'}. Listening for events from position: ${this.binLogPosition.filename}:${this.binLogPosition.offset}` + ); resolve(); - } - - this.zongji.on('error', (error) => { - if (!this.isStopped) { - this.logger.error('Binlog listener error:', error); - this.stop(); - reject(error); - } else { - this.logger.warn('Binlog listener error during shutdown:', error); - } }); + }); + } - this.processingQueue.error((error) => { - if (!this.isStopped) { - this.logger.error('BinlogEvent processing error:', error); - this.stop(); - reject(error); - } else { - this.logger.warn('BinlogEvent processing error during shutdown:', error); - } - }); + private async restartZongji(): Promise { + if (this.zongji.stopped) { + this.zongji = this.createZongjiListener(); + await this.start(true); + } + } - this.zongji.on('stopped', () => { - resolve(); - this.logger.info('BinLog listener stopped. Replication ended.'); + private async stopZongji(): Promise { + if (!this.zongji.stopped) { + this.logger.info('Stopping BinLog Listener...'); + await new Promise((resolve) => { + this.zongji.once('stopped', () => { + resolve(); + }); + this.zongji.stop(); }); - }); + this.logger.info('BinLog Listener stopped.'); + } } - public stop(): void { - if (!this.isStopped) { - this.zongji.stop(); + public async stop(): Promise { + if (!(this.isStopped || this.isStopping)) { + this.isStopping = true; + await this.stopZongji(); this.processingQueue.kill(); + + this.isStopped = true; } } - private get isStopped(): boolean { - return this.zongji.stopped; + public async replicateUntilStopped(): Promise { + while (!this.isStopped) { + await timers.setTimeout(1_000); + } + + if (this.listenerError) { + this.logger.error('BinLog Listener stopped due to an error:', this.listenerError); + throw this.listenerError; + } + } + + private createProcessingQueue(): async.QueueObject { + const queue = async.queue(this.createQueueWorker(), 1); + + queue.error((error) => { + if (!(this.isStopped || this.isStopping)) { + this.listenerError = error; + this.stop(); + } else { + this.logger.warn('Error processing BinLog event during shutdown:', error); + } + }); + + return queue; } private createZongjiListener(): ZongJi { const zongji = this.connectionManager.createBinlogListener(); zongji.on('binlog', async (evt) => { - this.logger.info(`Received Binlog event:${evt.getEventName()}`); + this.logger.debug(`Received BinLog event:${evt.getEventName()}`); + this.processingQueue.push(evt); this.queueMemoryUsage += evt.size; // When the processing queue grows past the threshold, we pause the binlog listener if (this.isQueueOverCapacity()) { this.logger.info( - `Binlog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing Binlog listener.` + `BinLog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing BinLog Listener.` ); - zongji.pause(); - const resumeTimeoutPromise = new Promise((resolve) => { - setTimeout(() => resolve('timeout'), MAX_QUEUE_PAUSE_TIME_MS); - }); - - await Promise.race([this.processingQueue.empty(), resumeTimeoutPromise]); - - this.logger.info(`Binlog processing queue backlog cleared. Resuming Binlog listener.`); - zongji.resume(); + await this.stopZongji(); + await this.processingQueue.drain(); + this.logger.info(`BinLog processing queue backlog cleared. Resuming BinLog Listener.`); + await this.restartZongji(); } }); - zongji.on('ready', async () => { - // Set a heartbeat interval for the Zongji replication connection - // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown - // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. - await new Promise((resolve, reject) => { - this.zongji.connection.query( - // In nanoseconds, 10^9 = 1s - 'set @master_heartbeat_period=28*1000000000', - (error: any, results: any, fields: any) => { - if (error) { - reject(error); - } else { - this.logger.info('Successfully set up replication connection heartbeat...'); - resolve(results); - } - } - ); - }); - - // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. - // The timeout here must be greater than the master_heartbeat_period. - const socket = this.zongji.connection._socket!; - socket.setTimeout(60_000, () => { - this.logger.info('Destroying socket due to replication connection timeout.'); - socket.destroy(new Error('Replication connection timeout.')); - }); - this.logger.info( - `BinLog listener setup complete. Reading binlog from: ${this.binLogPosition.filename}:${this.binLogPosition.offset}` - ); + zongji.on('error', (error) => { + if (!(this.isStopped || this.isStopping)) { + this.listenerError = error; + this.stop(); + } else { + this.logger.warn('Ignored BinLog Listener error during shutdown:', error); + } }); return zongji; } + isQueueOverCapacity(): boolean { + return this.queueMemoryUsage >= this.queueMemoryLimit; + } + private createQueueWorker() { return async (evt: BinLogEvent) => { switch (true) { @@ -201,15 +284,27 @@ export class BinLogListener { offset: evt.nextPosition } }); + this.binLogPosition.offset = evt.nextPosition; await this.eventHandler.onTransactionStart({ timestamp: new Date(evt.timestamp) }); + this.logger.info(`Processed GTID event: ${this.currentGTID.comparable}`); break; case zongji_utils.eventIsRotation(evt): + const newFile = this.binLogPosition.filename !== evt.binlogName; this.binLogPosition.filename = evt.binlogName; - this.binLogPosition.offset = evt.position; await this.eventHandler.onRotate(); + + if (newFile) { + this.logger.info( + `Processed Rotate event. New BinLog file is: ${this.binLogPosition.filename}:${this.binLogPosition.offset}` + ); + } break; case zongji_utils.eventIsWriteMutation(evt): - await this.eventHandler.onWrite(evt.rows, evt.tableMap[evt.tableId]); + const tableMap = evt.tableMap[evt.tableId]; + await this.eventHandler.onWrite(evt.rows, tableMap); + this.logger.info( + `Processed Write event for table [${tableMap.parentSchema}.${tableMap.tableName}]. ${evt.rows.length} row(s) inserted.` + ); break; case zongji_utils.eventIsUpdateMutation(evt): await this.eventHandler.onUpdate( @@ -217,27 +312,209 @@ export class BinLogListener { evt.rows.map((row) => row.before), evt.tableMap[evt.tableId] ); + this.logger.info( + `Processed Update event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) updated.` + ); break; case zongji_utils.eventIsDeleteMutation(evt): await this.eventHandler.onDelete(evt.rows, evt.tableMap[evt.tableId]); + this.logger.info( + `Processed Delete event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) deleted.` + ); break; case zongji_utils.eventIsXid(evt): + this.binLogPosition.offset = evt.nextPosition; const LSN = new common.ReplicatedGTID({ raw_gtid: this.currentGTID!.raw, - position: { - filename: this.binLogPosition.filename, - offset: evt.nextPosition - } + position: this.binLogPosition }).comparable; await this.eventHandler.onCommit(LSN); + this.logger.info(`Processed Xid event - transaction complete. LSN: ${LSN}.`); + break; + case zongji_utils.eventIsQuery(evt): + await this.processQueryEvent(evt); break; } + // Update the binlog position after processing the event + this.binLogPosition.offset = evt.nextPosition; this.queueMemoryUsage -= evt.size; }; } - isQueueOverCapacity(): boolean { - return this.queueMemoryUsage >= this.queueMemoryLimit; + private async processQueryEvent(event: BinLogQueryEvent): Promise { + const { query, nextPosition } = event; + + // BEGIN query events mark the start of a transaction before any row events. They are not relevant for schema changes + if (query === 'BEGIN') { + return; + } + + const schemaChanges = this.toSchemaChanges(query, event.schema); + if (schemaChanges.length > 0) { + // Since handling the schema changes can take a long time, we need to stop the Zongji listener instead of pausing it. + await this.stopZongji(); + + for (const change of schemaChanges) { + this.logger.info(`Processing schema change ${change.type} for table [${change.schema}.${change.table}]`); + await this.eventHandler.onSchemaChange(change); + } + + // DDL queries are auto commited, but do not come with a corresponding Xid event. + // This is problematic for DDL queries which result in row events because the checkpoint is not moved on, + // so we manually commit here. + this.binLogPosition.offset = nextPosition; + const LSN = new common.ReplicatedGTID({ + raw_gtid: this.currentGTID!.raw, + position: this.binLogPosition + }).comparable; + await this.eventHandler.onCommit(LSN); + + this.logger.info(`Successfully processed ${schemaChanges.length} schema change(s).`); + + // If there are still events in the processing queue, we need to process those before restarting Zongji + if (!this.processingQueue.idle()) { + this.logger.info(`Processing [${this.processingQueue.length()}] events(s) before resuming...`); + this.processingQueue.drain(async () => { + await this.restartZongji(); + }); + } else { + await this.restartZongji(); + } + } + } + + /** + * Function that interprets a DDL query for any applicable schema changes. + * If the query does not contain any relevant schema changes, an empty array is returned. + * The defaultSchema is derived from the database set on the MySQL Node.js connection client. + * It is used as a fallback when the schema/database cannot be determined from the query DDL. + * + * @param query + * @param defaultSchema + */ + private toSchemaChanges(query: string, defaultSchema: string): SchemaChange[] { + let statements: AST[] = []; + try { + const ast = this.sqlParser.astify(query, { database: 'MySQL' }); + statements = Array.isArray(ast) ? ast : [ast]; + } catch (error) { + if (matchedSchemaChangeQuery(query, Object.values(this.databaseFilter))) { + this.logger.warn( + `Failed to parse query: [${query}]. + Please review for the schema changes and manually redeploy the sync rules if required.` + ); + } + return []; + } + + const changes: SchemaChange[] = []; + for (const statement of statements) { + if (isTruncate(statement)) { + const truncateStatement = statement as TruncateStatement; + // Truncate statements can apply to multiple tables + for (const entity of truncateStatement.name) { + changes.push({ + type: SchemaChangeType.TRUNCATE_TABLE, + table: entity.table, + schema: entity.db ?? defaultSchema + }); + } + } else if (isDropTable(statement)) { + for (const entity of statement.name) { + changes.push({ type: SchemaChangeType.DROP_TABLE, table: entity.table, schema: entity.db ?? defaultSchema }); + } + } else if (isDropIndex(statement)) { + const dropStatement = statement as DropIndexStatement; + changes.push({ + type: SchemaChangeType.REPLICATION_IDENTITY, + table: dropStatement.table.table, + schema: dropStatement.table.db ?? defaultSchema + }); + } else if (isCreateUniqueIndex(statement)) { + // Potential change to the replication identity if the table has no prior unique constraint + changes.push({ + type: SchemaChangeType.REPLICATION_IDENTITY, + // @ts-ignore - The type definitions for node-sql-parser do not reflect the correct structure here + table: statement.table!.table, + // @ts-ignore + schema: statement.table!.db ?? defaultSchema + }); + } else if (isRenameTable(statement)) { + const renameStatement = statement as RenameStatement; + // Rename statements can apply to multiple tables + for (const table of renameStatement.table) { + const schema = table[0].db ?? defaultSchema; + const isNewTableIncluded = this.databaseFilter[schema](table[1].table); + changes.push({ + type: SchemaChangeType.RENAME_TABLE, + table: table[0].table, + newTable: isNewTableIncluded ? table[1].table : undefined, + schema + }); + } + } else if (isAlterTable(statement)) { + const fromTable = statement.table[0] as BaseFrom; + for (const expression of statement.expr) { + if (isRenameExpression(expression)) { + changes.push({ + type: SchemaChangeType.RENAME_TABLE, + table: fromTable.table, + newTable: expression.table, + schema: fromTable.db ?? defaultSchema + }); + } else if (isColumnExpression(expression)) { + changes.push({ + type: SchemaChangeType.ALTER_TABLE_COLUMN, + table: fromTable.table, + schema: fromTable.db ?? defaultSchema + }); + } else if (isConstraintExpression(expression)) { + // Potential changes to the replication identity + changes.push({ + type: SchemaChangeType.REPLICATION_IDENTITY, + table: fromTable.table, + schema: fromTable.db ?? defaultSchema + }); + } + } + } + } + // Filter out schema changes that are not relevant to the included tables + return changes.filter( + (change) => + this.isTableIncluded(change.table, change.schema) || + (change.newTable && this.isTableIncluded(change.newTable, change.schema)) + ); + } + + private isTableIncluded(tableName: string, schema: string): boolean { + return this.databaseFilter[schema] && this.databaseFilter[schema](tableName); + } + + private createDatabaseFilter(sourceTables: TablePattern[]): { [schema: string]: (table: string) => boolean } { + // Group sync rule tables by schema + const schemaMap = new Map(); + for (const table of sourceTables) { + if (!schemaMap.has(table.schema)) { + const tables = [table]; + schemaMap.set(table.schema, tables); + } else { + schemaMap.get(table.schema)!.push(table); + } + } + + const databaseFilter: { [schema: string]: (table: string) => boolean } = {}; + for (const entry of schemaMap.entries()) { + const [schema, sourceTables] = entry; + databaseFilter[schema] = (table: string) => + sourceTables.findIndex((sourceTable) => + sourceTable.isWildcard + ? table.startsWith(sourceTable.tablePattern.substring(0, sourceTable.tablePattern.length - 1)) + : table === sourceTable.name + ) !== -1; + } + + return databaseFilter; } } diff --git a/modules/module-mysql/src/replication/zongji/zongji-utils.ts b/modules/module-mysql/src/replication/zongji/zongji-utils.ts index ee9e4c53..b1d2c579 100644 --- a/modules/module-mysql/src/replication/zongji/zongji-utils.ts +++ b/modules/module-mysql/src/replication/zongji/zongji-utils.ts @@ -5,7 +5,8 @@ import { BinLogRotationEvent, BinLogTableMapEvent, BinLogRowUpdateEvent, - BinLogXidEvent + BinLogXidEvent, + BinLogQueryEvent } from '@powersync/mysql-zongji'; export function eventIsGTIDLog(event: BinLogEvent): event is BinLogGTIDLogEvent { @@ -35,3 +36,7 @@ export function eventIsDeleteMutation(event: BinLogEvent): event is BinLogRowEve export function eventIsUpdateMutation(event: BinLogEvent): event is BinLogRowUpdateEvent { return event.getEventName() == 'updaterows'; } + +export function eventIsQuery(event: BinLogEvent): event is BinLogQueryEvent { + return event.getEventName() == 'query'; +} diff --git a/modules/module-mysql/src/types/node-sql-parser-extended-types.ts b/modules/module-mysql/src/types/node-sql-parser-extended-types.ts new file mode 100644 index 00000000..dd6af40e --- /dev/null +++ b/modules/module-mysql/src/types/node-sql-parser-extended-types.ts @@ -0,0 +1,25 @@ +import 'node-sql-parser'; + +/** + * Missing Type definitions for the node-sql-parser + */ +declare module 'node-sql-parser' { + interface RenameStatement { + type: 'rename'; + table: { db: string | null; table: string }[][]; + } + + interface TruncateStatement { + type: 'truncate'; + keyword: 'table'; // There are more keywords possible, but we only care about 'table' + name: { db: string | null; table: string; as: string | null }[]; + } + + // This custom type more accurately describes what the structure of a Drop statement looks like for indexes. + interface DropIndexStatement { + type: 'drop'; + keyword: 'index'; + table: { db: string | null; table: string }; + name: any[]; + } +} diff --git a/modules/module-mysql/src/utils/mysql-utils.ts b/modules/module-mysql/src/utils/mysql-utils.ts index f1733f3e..623e8973 100644 --- a/modules/module-mysql/src/utils/mysql-utils.ts +++ b/modules/module-mysql/src/utils/mysql-utils.ts @@ -2,8 +2,8 @@ import { logger } from '@powersync/lib-services-framework'; import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import * as types from '../types/types.js'; -import { coerce, gte } from 'semver'; -import { SourceTable } from '@powersync/service-core'; +import { coerce, gte, satisfies } from 'semver'; +import { SourceEntityDescriptor } from '@powersync/service-core'; export type RetriedQueryOptions = { connection: mysqlPromise.Connection; @@ -86,6 +86,21 @@ export function isVersionAtLeast(version: string, minimumVersion: string): boole return gte(coercedVersion!, coercedMinimumVersion!, { loose: true }); } -export function escapeMysqlTableName(table: SourceTable): string { - return `\`${table.schema.replaceAll('`', '``')}\`.\`${table.table.replaceAll('`', '``')}\``; +export function satisfiesVersion(version: string, targetVersion: string): boolean { + const coercedVersion = coerce(version); + + return satisfies(coercedVersion!, targetVersion!, { loose: true }); +} + +export function qualifiedMySQLTable(table: SourceEntityDescriptor): string; +export function qualifiedMySQLTable(table: string, schema: string): string; + +export function qualifiedMySQLTable(table: SourceEntityDescriptor | string, schema?: string): string { + if (typeof table === 'object') { + return `\`${table.schema.replaceAll('`', '``')}\`.\`${table.name.replaceAll('`', '``')}\``; + } else if (schema) { + return `\`${schema.replaceAll('`', '``')}\`.\`${table.replaceAll('`', '``')}\``; + } else { + return `\`${table.replaceAll('`', '``')}\``; + } } diff --git a/modules/module-mysql/src/utils/parser-utils.ts b/modules/module-mysql/src/utils/parser-utils.ts new file mode 100644 index 00000000..d3647dcd --- /dev/null +++ b/modules/module-mysql/src/utils/parser-utils.ts @@ -0,0 +1,73 @@ +import { Alter, AST, Create, Drop, TruncateStatement, RenameStatement, DropIndexStatement } from 'node-sql-parser'; + +// We ignore create table statements, since even in the worst case we will pick up the changes when row events for that +// table are received. +const DDL_KEYWORDS = ['alter table', 'drop table', 'truncate table', 'rename table']; + +/** + * Check if a query is a DDL statement that applies to tables matching any of the provided matcher functions. + * @param query + * @param matchers + */ +export function matchedSchemaChangeQuery(query: string, matchers: ((table: string) => boolean)[]) { + // Normalize case and remove backticks for matching + const normalizedQuery = query.toLowerCase().replace(/`/g, ''); + + const isDDLQuery = DDL_KEYWORDS.some((keyword) => normalizedQuery.includes(keyword)); + if (isDDLQuery) { + const tokens = normalizedQuery.split(/[^a-zA-Z0-9_`]+/); + // Check if any matched table names appear in the query + for (const token of tokens) { + const matchFound = matchers.some((matcher) => matcher(token)); + if (matchFound) { + return true; + } + } + } + + return false; +} + +// @ts-ignore +export function isTruncate(statement: AST): statement is TruncateStatement { + // @ts-ignore + return statement.type === 'truncate'; +} + +// @ts-ignore +export function isRenameTable(statement: AST): statement is RenameStatement { + // @ts-ignore + return statement.type === 'rename'; +} + +export function isAlterTable(statement: AST): statement is Alter { + return statement.type === 'alter'; +} + +export function isRenameExpression(expression: any): boolean { + return expression.resource === 'table' && expression.action === 'rename'; +} + +export function isColumnExpression(expression: any): boolean { + return expression.resource === 'column'; +} + +export function isConstraintExpression(expression: any): boolean { + return ( + (expression.resource === 'key' && expression.keyword === 'primary key') || + expression.resource === 'constraint' || + (expression.resource === 'index' && expression.action === 'drop') + ); +} + +export function isDropTable(statement: AST): statement is Drop { + return statement.type === 'drop' && statement.keyword === 'table'; +} + +export function isDropIndex(statement: AST): statement is DropIndexStatement { + return statement.type === 'drop' && statement.keyword === 'index'; +} + +export function isCreateUniqueIndex(statement: AST): statement is Create { + return statement.type === 'create' && statement.keyword === 'index' && statement.index_type === 'unique'; +} diff --git a/modules/module-mysql/test/src/BinLogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts index c36bf1cf..c7447dd7 100644 --- a/modules/module-mysql/test/src/BinLogListener.test.ts +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -1,12 +1,24 @@ -import { describe, test, beforeEach, vi, expect, afterEach } from 'vitest'; -import { BinLogEventHandler, BinLogListener, Row } from '@module/replication/zongji/BinLogListener.js'; +import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; +import { + BinLogEventHandler, + BinLogListener, + Row, + SchemaChange, + SchemaChangeType +} from '@module/replication/zongji/BinLogListener.js'; import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; -import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; +import { clearTestDb, createTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; import { v4 as uuid } from 'uuid'; import * as common from '@module/common/common-index.js'; -import { createRandomServerId } from '@module/utils/mysql-utils.js'; +import { + createRandomServerId, + getMySQLVersion, + qualifiedMySQLTable, + satisfiesVersion +} from '@module/utils/mysql-utils.js'; import { TableMapEntry } from '@powersync/mysql-zongji'; import crypto from 'crypto'; +import { TablePattern } from '@powersync/service-sync-rules'; describe('BinlogListener tests', () => { const MAX_QUEUE_CAPACITY_MB = 1; @@ -18,26 +30,26 @@ describe('BinlogListener tests', () => { let connectionManager: MySQLConnectionManager; let eventHandler: TestBinLogEventHandler; let binLogListener: BinLogListener; + let isMySQL57: boolean = false; - beforeEach(async () => { + beforeAll(async () => { connectionManager = new MySQLConnectionManager(BINLOG_LISTENER_CONNECTION_OPTIONS, {}); const connection = await connectionManager.getConnection(); - await clearTestDb(connection); - await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`); + const version = await getMySQLVersion(connection); + isMySQL57 = satisfiesVersion(version, '5.7.x'); connection.release(); - const fromGTID = await getFromGTID(connectionManager); + }); + beforeEach(async () => { + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`); + connection.release(); eventHandler = new TestBinLogEventHandler(); - binLogListener = new BinLogListener({ - connectionManager: connectionManager, - eventHandler: eventHandler, - startPosition: fromGTID.position, - includedTables: ['test_DATA'], - serverId: createRandomServerId(1) - }); + binLogListener = await createBinlogListener(); }); - afterEach(async () => { + afterAll(async () => { await connectionManager.end(); }); @@ -45,17 +57,15 @@ describe('BinlogListener tests', () => { const stopSpy = vi.spyOn(binLogListener.zongji, 'stop'); const queueStopSpy = vi.spyOn(binLogListener.processingQueue, 'kill'); - const startPromise = binLogListener.start(); - setTimeout(async () => binLogListener.stop(), 50); + await binLogListener.start(); + await binLogListener.stop(); - await expect(startPromise).resolves.toBeUndefined(); expect(stopSpy).toHaveBeenCalled(); expect(queueStopSpy).toHaveBeenCalled(); }); - test('Pause Zongji binlog listener when processing queue reaches maximum memory size', async () => { - const pauseSpy = vi.spyOn(binLogListener.zongji, 'pause'); - const resumeSpy = vi.spyOn(binLogListener.zongji, 'resume'); + test('Zongji listener is stopped when processing queue reaches maximum memory size', async () => { + const stopSpy = vi.spyOn(binLogListener.zongji, 'stop'); // Pause the event handler to force a backlog on the processing queue eventHandler.pause(); @@ -63,24 +73,24 @@ describe('BinlogListener tests', () => { const ROW_COUNT = 10; await insertRows(connectionManager, ROW_COUNT); - const startPromise = binLogListener.start(); + await binLogListener.start(); - // Wait for listener to pause due to queue reaching capacity - await vi.waitFor(() => expect(pauseSpy).toHaveBeenCalled(), { timeout: 5000 }); + // Wait for listener to stop due to queue reaching capacity + await vi.waitFor(() => expect(stopSpy).toHaveBeenCalled(), { timeout: 5000 }); expect(binLogListener.isQueueOverCapacity()).toBeTruthy(); // Resume event processing eventHandler.unpause!(); + const restartSpy = vi.spyOn(binLogListener, 'start'); await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 }); - binLogListener.stop(); - await expect(startPromise).resolves.toBeUndefined(); + await binLogListener.stop(); // Confirm resume was called after unpausing - expect(resumeSpy).toHaveBeenCalled(); + expect(restartSpy).toHaveBeenCalled(); }); - test('Binlog events are correctly forwarded to provided binlog events handler', async () => { - const startPromise = binLogListener.start(); + test('Row events: Write, update, delete', async () => { + await binLogListener.start(); const ROW_COUNT = 10; await insertRows(connectionManager, ROW_COUNT); @@ -93,9 +103,378 @@ describe('BinlogListener tests', () => { await deleteRows(connectionManager); await vi.waitFor(() => expect(eventHandler.rowsDeleted).equals(ROW_COUNT), { timeout: 5000 }); - binLogListener.stop(); - await expect(startPromise).resolves.toBeUndefined(); + await binLogListener.stop(); + }); + + test('Schema change event: Rename table', async () => { + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_DATA RENAME test_DATA_new`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.RENAME_TABLE, + connectionManager.databaseName, + 'test_DATA', + 'test_DATA_new' + ); + }); + + test('Schema change event: Rename multiple tables', async () => { + // RENAME TABLE supports renaming multiple tables in a single statement + // We generate a schema change event for each table renamed + await binLogListener.start(); + await connectionManager.query(`RENAME TABLE + test_DATA TO test_DATA_new, + test_DATA_new TO test_DATA + `); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(2), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.RENAME_TABLE, + connectionManager.databaseName, + 'test_DATA' + ); + // New table name is undefined since the renamed table is not included by the database filter + expect(eventHandler.schemaChanges[0].newTable).toBeUndefined(); + + assertSchemaChange( + eventHandler.schemaChanges[1], + SchemaChangeType.RENAME_TABLE, + connectionManager.databaseName, + 'test_DATA_new', + 'test_DATA' + ); + }); + + test('Schema change event: Truncate table', async () => { + await binLogListener.start(); + await connectionManager.query(`TRUNCATE TABLE test_DATA`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.TRUNCATE_TABLE, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Drop table', async () => { + await binLogListener.start(); + await connectionManager.query(`DROP TABLE test_DATA`); + await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.DROP_TABLE, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Drop column', async () => { + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_DATA DROP COLUMN description`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Add column', async () => { + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_DATA ADD COLUMN new_column VARCHAR(255)`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Modify column', async () => { + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_DATA MODIFY COLUMN description TEXT`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Rename column via change statement', async () => { + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_DATA CHANGE COLUMN description description_new MEDIUMTEXT`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Rename column via rename statement', async () => { + // Syntax ALTER TABLE RENAME COLUMN was only introduced in MySQL 8.0.0 + if (!isMySQL57) { + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_DATA RENAME COLUMN description TO description_new`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + } + }); + + test('Schema change event: Multiple column changes', async () => { + // ALTER TABLE can have multiple column changes in a single statement + await binLogListener.start(); + await connectionManager.query( + `ALTER TABLE test_DATA DROP COLUMN description, ADD COLUMN new_description TEXT, MODIFY COLUMN id VARCHAR(50)` + ); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(3), { timeout: 5000 }); + await binLogListener.stop(); + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + + assertSchemaChange( + eventHandler.schemaChanges[1], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + + assertSchemaChange( + eventHandler.schemaChanges[2], + SchemaChangeType.ALTER_TABLE_COLUMN, + connectionManager.databaseName, + 'test_DATA' + ); + }); + + test('Schema change event: Drop and Add primary key', async () => { + await connectionManager.query(`CREATE TABLE test_constraints (id CHAR(36), description VARCHAR(100))`); + const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_constraints')]; + binLogListener = await createBinlogListener(sourceTables); + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_constraints ADD PRIMARY KEY (id)`); + await connectionManager.query(`ALTER TABLE test_constraints DROP PRIMARY KEY`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(2), { timeout: 5000 }); + await binLogListener.stop(); + // Event for the add + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.REPLICATION_IDENTITY, + connectionManager.databaseName, + 'test_constraints' + ); + // Event for the drop + assertSchemaChange( + eventHandler.schemaChanges[1], + SchemaChangeType.REPLICATION_IDENTITY, + connectionManager.databaseName, + 'test_constraints' + ); + }); + + test('Schema change event: Add and drop unique constraint', async () => { + await connectionManager.query(`CREATE TABLE test_constraints (id CHAR(36), description VARCHAR(100))`); + const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_constraints')]; + binLogListener = await createBinlogListener(sourceTables); + await binLogListener.start(); + await connectionManager.query(`ALTER TABLE test_constraints ADD UNIQUE (description)`); + await connectionManager.query(`ALTER TABLE test_constraints DROP INDEX description`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(2), { timeout: 5000 }); + await binLogListener.stop(); + // Event for the creation + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.REPLICATION_IDENTITY, + connectionManager.databaseName, + 'test_constraints' + ); + // Event for the drop + assertSchemaChange( + eventHandler.schemaChanges[1], + SchemaChangeType.REPLICATION_IDENTITY, + connectionManager.databaseName, + 'test_constraints' + ); + }); + + test('Schema change event: Add and drop a unique index', async () => { + await connectionManager.query(`CREATE TABLE test_constraints (id CHAR(36), description VARCHAR(100))`); + const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_constraints')]; + binLogListener = await createBinlogListener(sourceTables); + await binLogListener.start(); + await connectionManager.query(`CREATE UNIQUE INDEX description_idx ON test_constraints (description)`); + await connectionManager.query(`DROP INDEX description_idx ON test_constraints`); + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(2), { timeout: 5000 }); + await binLogListener.stop(); + // Event for the creation + assertSchemaChange( + eventHandler.schemaChanges[0], + SchemaChangeType.REPLICATION_IDENTITY, + connectionManager.databaseName, + 'test_constraints' + ); + // Event for the drop + assertSchemaChange( + eventHandler.schemaChanges[1], + SchemaChangeType.REPLICATION_IDENTITY, + connectionManager.databaseName, + 'test_constraints' + ); + }); + + test('Schema changes for non-matching tables are ignored', async () => { + // TableFilter = only match 'test_DATA' + await binLogListener.start(); + await connectionManager.query(`CREATE TABLE test_ignored (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`ALTER TABLE test_ignored ADD COLUMN new_column VARCHAR(10)`); + await connectionManager.query(`DROP TABLE test_ignored`); + + // "Anchor" event to latch onto, ensuring that the schema change events have finished + await insertRows(connectionManager, 1); + await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(1), { timeout: 5000 }); + await binLogListener.stop(); + + expect(eventHandler.schemaChanges.length).toBe(0); }); + + test('Sequential schema change handling', async () => { + // If there are multiple schema changes in the binlog processing queue, we only restart the binlog listener once + // all the schema changes have been processed + const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_multiple')]; + binLogListener = await createBinlogListener(sourceTables); + + await connectionManager.query(`CREATE TABLE test_multiple (id CHAR(36), description VARCHAR(100))`); + await connectionManager.query(`ALTER TABLE test_multiple ADD COLUMN new_column VARCHAR(10)`); + await connectionManager.query(`ALTER TABLE test_multiple ADD PRIMARY KEY (id)`); + await connectionManager.query(`ALTER TABLE test_multiple MODIFY COLUMN new_column TEXT`); + await connectionManager.query(`DROP TABLE test_multiple`); + + await binLogListener.start(); + + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(4), { timeout: 5000 }); + await binLogListener.stop(); + expect(eventHandler.schemaChanges[0].type).toBe(SchemaChangeType.ALTER_TABLE_COLUMN); + expect(eventHandler.schemaChanges[1].type).toBe(SchemaChangeType.REPLICATION_IDENTITY); + expect(eventHandler.schemaChanges[2].type).toBe(SchemaChangeType.ALTER_TABLE_COLUMN); + expect(eventHandler.schemaChanges[3].type).toBe(SchemaChangeType.DROP_TABLE); + }); + + test('Unprocessed binlog event received that does match the current table schema', async () => { + // If we process a binlog event for a table which has since had its schema changed, we expect the binlog listener to stop with an error + const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_failure')]; + binLogListener = await createBinlogListener(sourceTables); + + await connectionManager.query(`CREATE TABLE test_failure (id CHAR(36), description VARCHAR(100))`); + await connectionManager.query(`INSERT INTO test_failure(id, description) VALUES('${uuid()}','test_failure')`); + await connectionManager.query(`ALTER TABLE test_failure DROP COLUMN description`); + + await binLogListener.start(); + + await expect(() => binLogListener.replicateUntilStopped()).rejects.toThrow( + /that does not match its current schema/ + ); + }); + + test('Unprocessed binlog event received for a dropped table', async () => { + const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_failure')]; + binLogListener = await createBinlogListener(sourceTables); + + // If we process a binlog event for a table which has since been dropped, we expect the binlog listener to stop with an error + await connectionManager.query(`CREATE TABLE test_failure (id CHAR(36), description VARCHAR(100))`); + await connectionManager.query(`INSERT INTO test_failure(id, description) VALUES('${uuid()}','test_failure')`); + await connectionManager.query(`DROP TABLE test_failure`); + + await binLogListener.start(); + + await expect(() => binLogListener.replicateUntilStopped()).rejects.toThrow(/or the table has been dropped/); + }); + + test('Multi database events', async () => { + await createTestDb(connectionManager, 'multi_schema'); + const testTable = qualifiedMySQLTable('test_DATA_multi', 'multi_schema'); + await connectionManager.query(`CREATE TABLE ${testTable} (id CHAR(36) PRIMARY KEY,description TEXT);`); + + const sourceTables = [ + new TablePattern(connectionManager.databaseName, 'test_DATA'), + new TablePattern('multi_schema', 'test_DATA_multi') + ]; + binLogListener = await createBinlogListener(sourceTables); + await binLogListener.start(); + + // Default database insert into test_DATA + await insertRows(connectionManager, 1); + // multi_schema database insert into test_DATA_multi + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('${uuid()}','test')`); + await connectionManager.query(`DROP TABLE ${testTable}`); + + await vi.waitFor(() => expect(eventHandler.schemaChanges.length).toBe(1), { timeout: 5000 }); + await binLogListener.stop(); + expect(eventHandler.rowsWritten).toBe(2); + assertSchemaChange(eventHandler.schemaChanges[0], SchemaChangeType.DROP_TABLE, 'multi_schema', 'test_DATA_multi'); + }); + + async function createBinlogListener( + sourceTables?: TablePattern[], + startPosition?: common.BinLogPosition + ): Promise { + if (!sourceTables) { + sourceTables = [new TablePattern(connectionManager.databaseName, 'test_DATA')]; + } + + if (!startPosition) { + const fromGTID = await getFromGTID(connectionManager); + startPosition = fromGTID.position; + } + + return new BinLogListener({ + connectionManager: connectionManager, + eventHandler: eventHandler, + startPosition: startPosition, + sourceTables: sourceTables, + serverId: createRandomServerId(1) + }); + } + + function assertSchemaChange( + change: SchemaChange, + type: SchemaChangeType, + schema: string, + table: string, + newTable?: string + ) { + expect(change.type).toBe(type); + expect(change.schema).toBe(schema); + expect(change.table).toEqual(table); + if (newTable) { + expect(change.newTable).toEqual(newTable); + } + } }); async function getFromGTID(connectionManager: MySQLConnectionManager) { @@ -127,6 +506,7 @@ class TestBinLogEventHandler implements BinLogEventHandler { rowsUpdated = 0; rowsDeleted = 0; commitCount = 0; + schemaChanges: SchemaChange[] = []; unpause: ((value: void | PromiseLike) => void) | undefined; private pausedPromise: Promise | undefined; @@ -156,6 +536,9 @@ class TestBinLogEventHandler implements BinLogEventHandler { this.commitCount++; } + async onSchemaChange(change: SchemaChange) { + this.schemaChanges.push(change); + } async onTransactionStart(options: { timestamp: Date }) {} async onRotate() {} } diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 1e7708eb..5d35428b 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -4,7 +4,8 @@ import { ReplicationMetric } from '@powersync/service-types'; import { v4 as uuid } from 'uuid'; import { describe, expect, test } from 'vitest'; import { BinlogStreamTestContext } from './BinlogStreamUtils.js'; -import { describeWithStorage } from './util.js'; +import { createTestDb, describeWithStorage } from './util.js'; +import { qualifiedMySQLTable } from '@module/utils/mysql-utils.js'; const BASIC_SYNC_RULES = ` bucket_definitions: @@ -13,7 +14,7 @@ bucket_definitions: - SELECT id, description FROM "test_data" `; -describe('BigLog stream', () => { +describe('BinLogStream tests', () => { describeWithStorage({ timeout: 20_000 }, defineBinlogStreamTests); }); @@ -34,7 +35,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); + await context.startStreaming(); const testId = uuid(); await connectionManager.query( `INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)` @@ -48,7 +49,59 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { expect(endTxCount - startTxCount).toEqual(1); }); - test('replicating case sensitive table', async () => { + test('Replicate multi schema sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + default_schema_test_data: + data: + - SELECT id, description, num FROM "${connectionManager.databaseName}"."test_data" + multi_schema_test_data: + data: + - SELECT id, description, num FROM "multi_schema"."test_data" + `); + + await createTestDb(connectionManager, 'multi_schema'); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); + const testTable = qualifiedMySQLTable('test_data', 'multi_schema'); + await connectionManager.query( + `CREATE TABLE IF NOT EXISTS ${testTable} (id CHAR(36) PRIMARY KEY,description TEXT);` + ); + await context.replicateSnapshot(); + + const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; + const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; + + await context.startStreaming(); + + const testId = uuid(); + await connectionManager.query( + `INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)` + ); + + const testId2 = uuid(); + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('${testId2}', 'test2')`); + + const default_data = await context.getBucketData('default_schema_test_data[]'); + expect(default_data).toMatchObject([ + putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n }) + ]); + + const multi_schema_data = await context.getBucketData('multi_schema_test_data[]'); + expect(multi_schema_data).toMatchObject([putOp('test_data', { id: testId2, description: 'test2' })]); + + const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; + const endTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; + expect(endRowCount - startRowCount).toEqual(2); + expect(endTxCount - startTxCount).toEqual(2); + }); + + test('Replicate case sensitive table', async () => { + // MySQL inherits the case sensitivity of the underlying OS filesystem. + // So Unix-based systems will have case-sensitive tables, but Windows won't. + // https://dev.mysql.com/doc/refman/8.4/en/identifier-case-sensitivity.html await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` @@ -65,7 +118,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); + await context.startStreaming(); const testId = uuid(); await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`); @@ -79,50 +132,73 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { expect(endTxCount - startTxCount).toEqual(1); }); - // TODO: Not supported yet - // test('replicating TRUNCATE', async () => { - // await using context = await BinlogStreamTestContext.create(factory); - // const { connectionManager } = context; - // const syncRuleContent = ` - // bucket_definitions: - // global: - // data: - // - SELECT id, description FROM "test_data" - // by_test_data: - // parameters: SELECT id FROM test_data WHERE id = token_parameters.user_id - // data: [] - // `; - // await context.updateSyncRules(syncRuleContent); - // await connectionManager.query(`DROP TABLE IF EXISTS test_data`); - // await connectionManager.query( - // `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` - // ); - - // await context.replicateSnapshot(); - // context.startStreaming(); - - // const [{ test_id }] = pgwireRows( - // await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) - // ); - // await connectionManager.query(`TRUNCATE test_data`); - - // const data = await context.getBucketData('global[]'); - - // expect(data).toMatchObject([ - // putOp('test_data', { id: test_id, description: 'test1' }), - // removeOp('test_data', test_id) - // ]); - // }); - - test('replicating changing primary key', async () => { + test('Replicate matched wild card tables in sync rules', async () => { await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data_%"`); + + await connectionManager.query(`CREATE TABLE test_data_1 (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`CREATE TABLE test_data_2 (id CHAR(36) PRIMARY KEY, description TEXT)`); + + const testId11 = uuid(); + await connectionManager.query(`INSERT INTO test_data_1(id, description) VALUES('${testId11}','test11')`); + + const testId21 = uuid(); + await connectionManager.query(`INSERT INTO test_data_2(id, description) VALUES('${testId21}','test21')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + const testId12 = uuid(); + await connectionManager.query(`INSERT INTO test_data_1(id, description) VALUES('${testId12}', 'test12')`); + + const testId22 = uuid(); + await connectionManager.query(`INSERT INTO test_data_2(id, description) VALUES('${testId22}', 'test22')`); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data_1', { id: testId11, description: 'test11' }), + putOp('test_data_2', { id: testId21, description: 'test21' }), + putOp('test_data_1', { id: testId12, description: 'test12' }), + putOp('test_data_2', { id: testId22, description: 'test22' }) + ]); + }); + + test('Handle table TRUNCATE events', async () => { + await using context = await BinlogStreamTestContext.open(factory); + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + const testId = uuid(); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`); + await connectionManager.query(`TRUNCATE TABLE test_data`); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: testId, description: 'test1' }), + removeOp('test_data', testId) + ]); + }); + + test('Handle changes in a replicated table primary key', async () => { + await using context = await BinlogStreamTestContext.open(factory); await context.updateSyncRules(BASIC_SYNC_RULES); + const { connectionManager } = context; await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); await context.replicateSnapshot(); - context.startStreaming(); + await context.startStreaming(); const testId1 = uuid(); await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`); @@ -154,7 +230,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { ]); }); - test('initial sync', async () => { + test('Initial snapshot sync', async () => { await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(BASIC_SYNC_RULES); @@ -167,7 +243,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; await context.replicateSnapshot(); - context.startStreaming(); + await context.startStreaming(); const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const data = await context.getBucketData('global[]'); @@ -175,7 +251,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { expect(endRowCount - startRowCount).toEqual(1); }); - test('snapshot with date values', async () => { + test('Snapshot with date values', async () => { await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` @@ -195,7 +271,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { `); await context.replicateSnapshot(); - context.startStreaming(); + await context.startStreaming(); const data = await context.getBucketData('global[]'); expect(data).toMatchObject([ @@ -209,7 +285,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { ]); }); - test('replication with date values', async () => { + test('Replication with date values', async () => { await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` @@ -228,7 +304,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); + await context.startStreaming(); const testId = uuid(); await connectionManager.query(` @@ -259,7 +335,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { expect(endTxCount - startTxCount).toEqual(2); }); - test('table not in sync rules', async () => { + test('Replication for tables not in the sync rules are ignored', async () => { await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(BASIC_SYNC_RULES); @@ -271,7 +347,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); + await context.startStreaming(); await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`); const data = await context.getBucketData('global[]'); @@ -300,7 +376,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); await context.replicateSnapshot(); - context.startStreaming(); + await context.startStreaming(); await connectionManager.query( `INSERT INTO test_data(id, description, num) VALUES('${testId1}', 'test1', 1152921504606846976)` ); @@ -315,7 +391,7 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { await context.loadActiveSyncRules(); // Does not actually do a snapshot again - just does the required intialization. await context.replicateSnapshot(); - context.startStreaming(); + await context.startStreaming(); await connectionManager.query(`INSERT INTO test_data(id, description, num) VALUES('${testId2}', 'test2', 0)`); const data = await context.getBucketData('global[]'); diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index b83aa2bd..665be6c2 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -16,6 +16,7 @@ import { import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import mysqlPromise from 'mysql2/promise'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; +import timers from 'timers/promises'; /** * Tests operating on the binlog stream need to configure the stream and manage asynchronous @@ -115,11 +116,21 @@ export class BinlogStreamTestContext { this.replicationDone = true; } - startStreaming() { + async startStreaming() { if (!this.replicationDone) { throw new Error('Call replicateSnapshot() before startStreaming()'); } this.streamPromise = this.binlogStream.streamChanges(); + + // Wait for the replication to start before returning. + // This avoids a bunch of unpredictable race conditions that appear in testing + return new Promise(async (resolve) => { + while (this.binlogStream.isStartingReplication) { + await timers.setTimeout(50); + } + + resolve(); + }); } async getCheckpoint(options?: { timeout?: number }): Promise { diff --git a/modules/module-mysql/test/src/parser-utils.test.ts b/modules/module-mysql/test/src/parser-utils.test.ts new file mode 100644 index 00000000..802efc04 --- /dev/null +++ b/modules/module-mysql/test/src/parser-utils.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, test } from 'vitest'; +import { matchedSchemaChangeQuery } from '@module/utils/parser-utils.js'; + +describe('MySQL Parser Util Tests', () => { + test('matchedSchemaChangeQuery function', () => { + const matcher = (tableName: string) => tableName === 'users'; + + // DDL matches and table name matches + expect(matchedSchemaChangeQuery('ALTER TABLE users ADD COLUMN name VARCHAR(255)', [matcher])).toBeTruthy(); + expect(matchedSchemaChangeQuery('DROP TABLE users', [matcher])).toBeTruthy(); + expect(matchedSchemaChangeQuery('TRUNCATE TABLE users', [matcher])).toBeTruthy(); + expect(matchedSchemaChangeQuery('RENAME TABLE new_users TO users', [matcher])).toBeTruthy(); + + // Can handle backticks in table names + expect( + matchedSchemaChangeQuery('ALTER TABLE `clientSchema`.`users` ADD COLUMN name VARCHAR(255)', [matcher]) + ).toBeTruthy(); + + // DDL matches, but table name does not match + expect(matchedSchemaChangeQuery('DROP TABLE clientSchema.clients', [matcher])).toBeFalsy(); + // No DDL match + expect(matchedSchemaChangeQuery('SELECT * FROM users', [matcher])).toBeFalsy(); + }); +}); diff --git a/modules/module-mysql/test/src/schema-changes.test.ts b/modules/module-mysql/test/src/schema-changes.test.ts new file mode 100644 index 00000000..511ca902 --- /dev/null +++ b/modules/module-mysql/test/src/schema-changes.test.ts @@ -0,0 +1,663 @@ +import { compareIds, putOp, removeOp, test_utils } from '@powersync/service-core-tests'; +import { beforeAll, describe, expect, test } from 'vitest'; + +import { storage } from '@powersync/service-core'; +import { createTestDb, describeWithStorage, TEST_CONNECTION_OPTIONS } from './util.js'; +import { BinlogStreamTestContext } from './BinlogStreamUtils.js'; +import timers from 'timers/promises'; +import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; +import { getMySQLVersion, qualifiedMySQLTable, satisfiesVersion } from '@module/utils/mysql-utils.js'; + +describe('MySQL Schema Changes', () => { + describeWithStorage({ timeout: 20_000 }, defineTests); +}); + +const BASIC_SYNC_RULES = ` +bucket_definitions: + global: + data: + - SELECT id, * FROM "test_data" +`; + +const PUT_T1 = test_utils.putOp('test_data', { id: 't1', description: 'test1' }); +const PUT_T2 = test_utils.putOp('test_data', { id: 't2', description: 'test2' }); +const PUT_T3 = test_utils.putOp('test_data', { id: 't3', description: 'test3' }); + +const REMOVE_T1 = test_utils.removeOp('test_data', 't1'); +const REMOVE_T2 = test_utils.removeOp('test_data', 't2'); + +function defineTests(factory: storage.TestStorageFactory) { + let isMySQL57: boolean = false; + + beforeAll(async () => { + const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); + const connection = await connectionManager.getConnection(); + const version = await getMySQLVersion(connection); + isMySQL57 = satisfiesVersion(version, '5.7.x'); + connection.release(); + await connectionManager.end(); + }); + + test('Re-create table', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Drop a table and re-create it. + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + // Dropping the table immediately leads to a rare race condition where Zongji tries to get the table information + // for the previous write event, but the table is already gone. Without the table info the tablemap event can't be correctly + // populated and replication will fail. + await timers.setTimeout(50); + await connectionManager.query(`DROP TABLE test_data`); + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t3','test3')`); + + const data = await context.getBucketData('global[]'); + + // Initial inserts + expect(data.slice(0, 2)).toMatchObject([PUT_T1, PUT_T2]); + + // Truncate - order doesn't matter + expect(data.slice(2, 4).sort(compareIds)).toMatchObject([REMOVE_T1, REMOVE_T2]); + + // Due to the async nature of this replication test, + // the insert for t3 is picked up both in the snapshot and in the replication stream. + expect(data.slice(4)).toMatchObject([ + PUT_T3, // Snapshot insert + PUT_T3 // Insert from binlog replication stream + ]); + }); + + test('Create table: New table in is in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await context.replicateSnapshot(); + await context.startStreaming(); + + // Add table after initial replication + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([PUT_T1, PUT_T1]); + }); + + test('Create table: New table is created from existing data', async () => { + // Create table with select from is not allowed in MySQL 5.7 when enforce_gtid_consistency=ON + if (!isMySQL57) { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_data_from + ( + id CHAR(36) PRIMARY KEY, + description TEXT + )`); + await connectionManager.query(`INSERT INTO test_data_from(id, description) + VALUES ('t1', 'test1')`); + await connectionManager.query(`INSERT INTO test_data_from(id, description) + VALUES ('t2', 'test2')`); + await connectionManager.query(`INSERT INTO test_data_from(id, description) + VALUES ('t3', 'test3')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + // Add table after initial replication + await connectionManager.query(`CREATE TABLE test_data SELECT * FROM test_data_from`); + + const data = await context.getBucketData('global[]'); + + // Interestingly, the create with select triggers binlog row write events + expect(data).toMatchObject([ + // From snapshot + PUT_T1, + PUT_T2, + PUT_T3, + // From replication stream + PUT_T1, + PUT_T2, + PUT_T3 + ]); + } + }); + + test('Create table: New table is not in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await context.replicateSnapshot(); + await context.startStreaming(); + + // Add table after initial replication + await connectionManager.query(`CREATE TABLE test_data_ignored (id CHAR(36) PRIMARY KEY, description TEXT)`); + + await connectionManager.query(`INSERT INTO test_data_ignored(id, description) VALUES('t1','test ignored')`); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([]); + }); + + test('Rename table: Table not in the sync rules to one in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + // Rename table not that is not in sync rules -> in sync rules + await connectionManager.query(`CREATE TABLE test_data_old (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data_old(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`RENAME TABLE test_data_old TO test_data`); + + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + // Snapshot insert + PUT_T1, + PUT_T2, + // Replicated insert + PUT_T2 + ]); + }); + + test('Rename table: Table in the sync rules to another table in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT id, * FROM "test_data%" + `); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data1 (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data1(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`RENAME TABLE test_data1 TO test_data2`); + await connectionManager.query(`INSERT INTO test_data2(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial replication + putOp('test_data1', { id: 't1', description: 'test1' }), + // Initial truncate + removeOp('test_data1', 't1') + ]); + + expect(data.slice(2, 4).sort(compareIds)).toMatchObject([ + // Snapshot insert + putOp('test_data2', { id: 't1', description: 'test1' }), + putOp('test_data2', { id: 't2', description: 'test2' }) + ]); + expect(data.slice(4)).toMatchObject([ + // Replicated insert + // We may eventually be able to de-duplicate this + putOp('test_data2', { id: 't2', description: 'test2' }) + ]); + }); + + test('Rename table: Table in the sync rules to not in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`RENAME TABLE test_data TO test_data_not_in_sync_rules`); + await connectionManager.query(`INSERT INTO test_data_not_in_sync_rules(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + // Initial replication + PUT_T1, + // Truncate + REMOVE_T1 + ]); + }); + + test('Change Replication Identity default to full by dropping the primary key', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Change replica id from default (PK) to full + // Requires re-snapshotting the table. + + await context.updateSyncRules(BASIC_SYNC_RULES); + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`ALTER TABLE test_data DROP PRIMARY KEY;`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + // Truncate + REMOVE_T1 + ]); + + expect(data.slice(2)).toMatchObject([ + // Snapshot inserts + PUT_T1, + PUT_T2, + // Replicated insert + PUT_T2 + ]); + }); + + test('Change Replication Identity full by adding a column', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Change replica id from full by adding column + // Causes a re-import of the table. + // Other changes such as renaming column would have the same effect + + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + // No primary key, no unique column, so full replication identity will be used + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`ALTER TABLE test_data ADD COLUMN new_column TEXT`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + // Truncate + REMOVE_T1 + ]); + + // Snapshot - order doesn't matter + expect(data.slice(2)).toMatchObject([ + // Snapshot inserts + putOp('test_data', { id: 't1', description: 'test1', new_column: null }), + putOp('test_data', { id: 't2', description: 'test2', new_column: null }), + // Replicated insert + putOp('test_data', { id: 't2', description: 'test2', new_column: null }) + ]); + }); + + test('Change Replication Identity from full to index by adding a unique constraint', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Change replica id full by adding a unique index that can serve as the replication id + + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + // No primary key, no unique column, so full replication identity will be used + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`ALTER TABLE test_data ADD UNIQUE (id)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + // Truncate + REMOVE_T1 + ]); + + // Snapshot - order doesn't matter + expect(data.slice(2)).toMatchObject([ + // Snapshot inserts + PUT_T1, + PUT_T2, + // Replicated insert + PUT_T2 + ]); + }); + + test('Change Replication Identity from full to index by adding a unique index', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Change replica id full by adding a unique index that can serve as the replication id + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + // No primary key, no unique column, so full replication identity will be used + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`CREATE UNIQUE INDEX id_idx ON test_data (id)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + // Truncate + REMOVE_T1 + ]); + + // Snapshot - order doesn't matter + expect(data.slice(2)).toMatchObject([ + // Snapshot inserts + PUT_T1, + PUT_T2, + // Replicated insert + PUT_T2 + ]); + }); + + test('Change Replication Identity from index by dropping the unique constraint', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Change replica id full by adding a unique index that can serve as the replication id + + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + // Unique constraint on id + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description TEXT, UNIQUE (id))`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`ALTER TABLE test_data DROP INDEX id`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + // Truncate + REMOVE_T1 + ]); + + // Snapshot - order doesn't matter + expect(data.slice(2)).toMatchObject([ + // Snapshot inserts + PUT_T1, + PUT_T2, + // Replicated insert + PUT_T2 + ]); + }); + + test('Change Replication Identity default by modifying primary key column type', async () => { + await using context = await BinlogStreamTestContext.open(factory); + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`ALTER TABLE test_data MODIFY COLUMN id VARCHAR(36)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + // Truncate + REMOVE_T1 + ]); + + expect(data.slice(2)).toMatchObject([ + // Snapshot inserts + PUT_T1, + PUT_T2, + // Replicated insert + PUT_T2 + ]); + }); + + test('Change Replication Identity by changing the type of a column in a compound unique index', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Change index replica id by changing column type + // Causes a re-import of the table. + // Secondary functionality tested here is that replica id column order stays + // the same between initial and incremental replication. + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description CHAR(100))`); + await connectionManager.query(`ALTER TABLE test_data ADD INDEX (id, description)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + await connectionManager.query(`ALTER TABLE test_data MODIFY COLUMN id VARCHAR(36)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t3','test3')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial snapshot + PUT_T1, + // Streamed + PUT_T2 + ]); + + expect(data.slice(2, 4).sort(compareIds)).toMatchObject([ + // Truncate - any order + REMOVE_T1, + REMOVE_T2 + ]); + + // Snapshot - order doesn't matter + expect(data.slice(4, 7).sort(compareIds)).toMatchObject([PUT_T1, PUT_T2, PUT_T3]); + + expect(data.slice(7).sort(compareIds)).toMatchObject([ + // Replicated insert + PUT_T3 + ]); + }); + + test('Add column: New non replication identity column does not trigger re-sync', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Added column not in replication identity so it should not cause a re-import + + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`ALTER TABLE test_data ADD COLUMN new_column TEXT`); + await connectionManager.query( + `INSERT INTO test_data(id, description, new_column) VALUES('t2','test2', 'new_data')` + ); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 1)).toMatchObject([PUT_T1]); + + expect(data.slice(1)).toMatchObject([ + // Snapshot inserts + putOp('test_data', { id: 't2', description: 'test2', new_column: 'new_data' }) + ]); + }); + + test('Modify non replication identity column', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Changing the type of a column that is not part of the replication identity does not cause a re-sync of the table. + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + await connectionManager.query(`ALTER TABLE test_data MODIFY COLUMN description VARCHAR(100)`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t3','test3')`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial snapshot + PUT_T1, + // Streamed + PUT_T2 + ]); + + expect(data.slice(2)).toMatchObject([ + // Replicated insert + PUT_T3 + ]); + }); + + test('Drop a table in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Technically not a schema change, but fits here. + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description CHAR(100))`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t2','test2')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`DROP TABLE test_data`); + + const data = await context.getBucketData('global[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + PUT_T2 + ]); + + expect(data.slice(2).sort(compareIds)).toMatchObject([ + // Drop + REMOVE_T1, + REMOVE_T2 + ]); + }); + + test('Schema changes for tables in other schemas in the sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + // Technically not a schema change, but fits here. + await context.updateSyncRules(` + bucket_definitions: + multi_schema_test_data: + data: + - SELECT id, description, num FROM "multi_schema"."test_data" + `); + + const { connectionManager } = context; + await createTestDb(connectionManager, 'multi_schema'); + const testTable = qualifiedMySQLTable('test_data', 'multi_schema'); + await connectionManager.query( + `CREATE TABLE IF NOT EXISTS ${testTable} (id CHAR(36) PRIMARY KEY,description TEXT);` + ); + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('t1','test1')`); + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('t2','test2')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`DROP TABLE ${testTable}`); + + const data = await context.getBucketData('multi_schema_test_data[]'); + + expect(data.slice(0, 2)).toMatchObject([ + // Initial inserts + PUT_T1, + PUT_T2 + ]); + + expect(data.slice(2).sort(compareIds)).toMatchObject([ + // Drop + REMOVE_T1, + REMOVE_T2 + ]); + }); + + test('Changes for tables in schemas not in the sync rules are ignored', async () => { + await using context = await BinlogStreamTestContext.open(factory); + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { connectionManager } = context; + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36), description CHAR(100))`); + + await createTestDb(connectionManager, 'multi_schema'); + const testTable = qualifiedMySQLTable('test_data_ignored', 'multi_schema'); + await connectionManager.query( + `CREATE TABLE IF NOT EXISTS ${testTable} (id CHAR(36) PRIMARY KEY,description TEXT);` + ); + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('t1','test1')`); + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('t2','test2')`); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('t3','test3')`); + await connectionManager.query(`DROP TABLE ${testTable}`); + + // Force a commit on the watched schema to advance the checkpoint + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); + + const data = await context.getBucketData('global[]'); + + // Should only include the entry used to advance the checkpoint + expect(data).toMatchObject([PUT_T1]); + }); +} diff --git a/modules/module-mysql/test/src/util.ts b/modules/module-mysql/test/src/util.ts index 9126f744..8f1bab67 100644 --- a/modules/module-mysql/test/src/util.ts +++ b/modules/module-mysql/test/src/util.ts @@ -6,6 +6,7 @@ import mysqlPromise from 'mysql2/promise'; import { env } from './env.js'; import { describe, TestOptions } from 'vitest'; import { TestStorageFactory } from '@powersync/service-core'; +import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; export const TEST_URI = env.MYSQL_TEST_URI; @@ -52,3 +53,8 @@ export async function clearTestDb(connection: mysqlPromise.Connection) { } } } + +export async function createTestDb(connectionManager: MySQLConnectionManager, dbName: string) { + await connectionManager.query(`DROP DATABASE IF EXISTS ${dbName}`); + await connectionManager.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`); +} diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index df050372..c7176fd9 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -145,9 +145,9 @@ export class PostgresSyncRulesStorage async resolveTable(options: storage.ResolveTableOptions): Promise { const { group_id, connection_id, connection_tag, entity_descriptor } = options; - const { schema, name: table, objectId, replicationColumns } = entity_descriptor; + const { schema, name: table, objectId, replicaIdColumns } = entity_descriptor; - const columns = replicationColumns.map((column) => ({ + const normalizedReplicaIdColumns = replicaIdColumns.map((column) => ({ name: column.name, type: column.type, // The PGWire returns this as a BigInt. We want to store this as JSONB @@ -167,7 +167,7 @@ export class PostgresSyncRulesStorage AND relation_id = ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }} AND schema_name = ${{ type: 'varchar', value: schema }} AND table_name = ${{ type: 'varchar', value: table }} - AND replica_id_columns = ${{ type: 'jsonb', value: columns }} + AND replica_id_columns = ${{ type: 'jsonb', value: normalizedReplicaIdColumns }} ` .decoded(models.SourceTable) .first(); @@ -182,7 +182,7 @@ export class PostgresSyncRulesStorage AND connection_id = ${{ type: 'int4', value: connection_id }} AND schema_name = ${{ type: 'varchar', value: schema }} AND table_name = ${{ type: 'varchar', value: table }} - AND replica_id_columns = ${{ type: 'jsonb', value: columns }} + AND replica_id_columns = ${{ type: 'jsonb', value: normalizedReplicaIdColumns }} ` .decoded(models.SourceTable) .first(); @@ -209,7 +209,7 @@ export class PostgresSyncRulesStorage ${{ type: 'jsonb', value: { object_id: objectId } satisfies StoredRelationId }}, ${{ type: 'varchar', value: schema }}, ${{ type: 'varchar', value: table }}, - ${{ type: 'jsonb', value: columns }} + ${{ type: 'jsonb', value: normalizedReplicaIdColumns }} ) RETURNING * @@ -219,15 +219,15 @@ export class PostgresSyncRulesStorage sourceTableRow = row; } - const sourceTable = new storage.SourceTable( - sourceTableRow!.id, - connection_tag, - objectId, - schema, - table, - replicationColumns, - sourceTableRow!.snapshot_done ?? true - ); + const sourceTable = new storage.SourceTable({ + id: sourceTableRow!.id, + connectionTag: connection_tag, + objectId: objectId, + schema: schema, + name: table, + replicaIdColumns: replicaIdColumns, + snapshotComplete: sourceTableRow!.snapshot_done ?? true + }); if (!sourceTable.snapshotComplete) { sourceTable.snapshotStatus = { totalEstimatedCount: Number(sourceTableRow!.snapshot_total_estimated_count ?? -1n), @@ -285,19 +285,20 @@ export class PostgresSyncRulesStorage table: sourceTable, dropTables: truncatedTables.map( (doc) => - new storage.SourceTable( - doc.id, - connection_tag, - doc.relation_id?.object_id ?? 0, - doc.schema_name, - doc.table_name, - doc.replica_id_columns?.map((c) => ({ - name: c.name, - typeOid: c.typeId, - type: c.type - })) ?? [], - doc.snapshot_done ?? true - ) + new storage.SourceTable({ + id: doc.id, + connectionTag: connection_tag, + objectId: doc.relation_id?.object_id ?? 0, + schema: doc.schema_name, + name: doc.table_name, + replicaIdColumns: + doc.replica_id_columns?.map((c) => ({ + name: c.name, + typeOid: c.typeId, + type: c.type + })) ?? [], + snapshotComplete: doc.snapshot_done ?? true + }) ) }; }); diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index aa0eb012..29b8d4c7 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -467,15 +467,15 @@ export class PostgresBucketBatch } }); return tables.map((table) => { - const copy = new storage.SourceTable( - table.id, - table.connectionTag, - table.objectId, - table.schema, - table.table, - table.replicaIdColumns, - table.snapshotComplete - ); + const copy = new storage.SourceTable({ + id: table.id, + connectionTag: table.connectionTag, + objectId: table.objectId, + schema: table.schema, + name: table.name, + replicaIdColumns: table.replicaIdColumns, + snapshotComplete: table.snapshotComplete + }); copy.syncData = table.syncData; copy.syncParameters = table.syncParameters; return copy; diff --git a/modules/module-postgres/src/replication/PgRelation.ts b/modules/module-postgres/src/replication/PgRelation.ts index 08cb87c7..cc3d9a84 100644 --- a/modules/module-postgres/src/replication/PgRelation.ts +++ b/modules/module-postgres/src/replication/PgRelation.ts @@ -1,4 +1,4 @@ -import { ReplicationAssertionError, ServiceError } from '@powersync/lib-services-framework'; +import { ReplicationAssertionError } from '@powersync/lib-services-framework'; import { storage } from '@powersync/service-core'; import { PgoutputRelation } from '@powersync/service-jpgwire'; @@ -27,6 +27,6 @@ export function getPgOutputRelation(source: PgoutputRelation): storage.SourceEnt name: source.name, schema: source.schema, objectId: getRelId(source), - replicationColumns: getReplicaIdColumns(source) + replicaIdColumns: getReplicaIdColumns(source) } satisfies storage.SourceEntityDescriptor; } diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts index 438637cb..b826c215 100644 --- a/modules/module-postgres/src/replication/SnapshotQuery.ts +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -36,7 +36,7 @@ export class SimpleSnapshotQuery implements SnapshotQuery { ) {} public async initialize(): Promise { - await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.escapedIdentifier}`); + await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.qualifiedName}`); } public nextChunk(): AsyncIterableIterator { @@ -121,7 +121,7 @@ export class ChunkedSnapshotQuery implements SnapshotQuery { const escapedKeyName = escapeIdentifier(this.key.name); if (this.lastKey == null) { stream = this.connection.stream( - `SELECT * FROM ${this.table.escapedIdentifier} ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}` + `SELECT * FROM ${this.table.qualifiedName} ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}` ); } else { if (this.key.typeId == null) { @@ -129,7 +129,7 @@ export class ChunkedSnapshotQuery implements SnapshotQuery { } let type: StatementParam['type'] = Number(this.key.typeId); stream = this.connection.stream({ - statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`, + statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`, params: [{ value: this.lastKey, type }] }); } @@ -197,7 +197,7 @@ export class IdSnapshotQuery implements SnapshotQuery { throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`); } yield* this.connection.stream({ - statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`, + statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`, params: [ { type: type, diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 02242669..230906ec 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -256,7 +256,7 @@ export class WalStream { name, schema, objectId: relid, - replicationColumns: cresult.replicationColumns + replicaIdColumns: cresult.replicationColumns } as SourceEntityDescriptor, false ); diff --git a/modules/module-postgres/src/replication/replication-utils.ts b/modules/module-postgres/src/replication/replication-utils.ts index b9905d9b..57ec4ba7 100644 --- a/modules/module-postgres/src/replication/replication-utils.ts +++ b/modules/module-postgres/src/replication/replication-utils.ts @@ -315,7 +315,15 @@ export async function getDebugTableInfo(options: GetDebugTableInfoOptions): Prom const id_columns = id_columns_result?.replicationColumns ?? []; - const sourceTable = new storage.SourceTable(0, connectionTag, relationId ?? 0, schema, name, id_columns, true); + const sourceTable = new storage.SourceTable({ + id: 0, + connectionTag: connectionTag, + objectId: relationId ?? 0, + schema: schema, + name: name, + replicaIdColumns: id_columns, + snapshotComplete: true + }); const syncData = syncRules.tableSyncsData(sourceTable); const syncParameters = syncRules.tableSyncsParameters(sourceTable); @@ -342,7 +350,7 @@ export async function getDebugTableInfo(options: GetDebugTableInfoOptions): Prom let selectError = null; try { - await lib_postgres.retriedQuery(db, `SELECT * FROM ${sourceTable.escapedIdentifier} LIMIT 1`); + await lib_postgres.retriedQuery(db, `SELECT * FROM ${sourceTable.qualifiedName} LIMIT 1`); } catch (e) { selectError = { level: 'fatal', message: e.message }; } diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index e9c07c85..738db1c4 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -37,15 +37,15 @@ export function testRules(content: string): storage.PersistedSyncRulesContent { export function makeTestTable(name: string, replicaIdColumns?: string[] | undefined) { const relId = utils.hashData('table', name, (replicaIdColumns ?? ['id']).join(',')); const id = new bson.ObjectId('6544e3899293153fa7b38331'); - return new storage.SourceTable( - id, - storage.SourceTable.DEFAULT_TAG, - relId, - 'public', - name, - (replicaIdColumns ?? ['id']).map((column) => ({ name: column, type: 'VARCHAR', typeId: 25 })), - true - ); + return new storage.SourceTable({ + id: id, + connectionTag: storage.SourceTable.DEFAULT_TAG, + objectId: relId, + schema: 'public', + name: name, + replicaIdColumns: (replicaIdColumns ?? ['id']).map((column) => ({ name: column, type: 'VARCHAR', typeId: 25 })), + snapshotComplete: true + }); } export function getBatchData( diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 6a1eaab4..bc633093 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -105,7 +105,7 @@ export async function getSyncRulesStatus( const source: SourceTableInterface = { connectionTag: tag, schema: pattern.schema, - table: pattern.tablePattern + name: pattern.tablePattern }; const syncData = rules.tableSyncsData(source); const syncParameters = rules.tableSyncsParameters(source); diff --git a/packages/service-core/src/metrics/open-telemetry/util.ts b/packages/service-core/src/metrics/open-telemetry/util.ts index af4af406..a3b25659 100644 --- a/packages/service-core/src/metrics/open-telemetry/util.ts +++ b/packages/service-core/src/metrics/open-telemetry/util.ts @@ -7,6 +7,8 @@ import { OpenTelemetryMetricsFactory } from './OpenTelemetryMetricsFactory.js'; import { MetricsFactory } from '../metrics-interfaces.js'; import { logger } from '@powersync/lib-services-framework'; +import pkg from '../../../package.json' with { type: 'json' }; + export interface RuntimeMetadata { [key: string]: string | number | undefined; } @@ -61,7 +63,8 @@ export function createOpenTelemetryMetricsFactory(context: ServiceContext): Metr const meterProvider = new MeterProvider({ resource: new Resource( { - ['service']: 'PowerSync' + ['service']: 'PowerSync', + ['service.version']: pkg.version }, runtimeMetadata ), diff --git a/packages/service-core/src/storage/SourceEntity.ts b/packages/service-core/src/storage/SourceEntity.ts index 1de25388..a139eb08 100644 --- a/packages/service-core/src/storage/SourceEntity.ts +++ b/packages/service-core/src/storage/SourceEntity.ts @@ -10,17 +10,17 @@ export interface ColumnDescriptor { typeId?: number; } -// TODO: This needs to be consolidated with SourceTable into something new. export interface SourceEntityDescriptor { /** - * The internal id of the data source structure in the database. - * + * The internal id of the source entity structure in the database. * If undefined, the schema and name are used as the identifier. - * * If specified, this is specifically used to detect renames. */ objectId: number | string | undefined; schema: string; name: string; - replicationColumns: ColumnDescriptor[]; + /** + * The columns that are used to uniquely identify a record in the source entity. + */ + replicaIdColumns: ColumnDescriptor[]; } diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index 0279afe3..8e595154 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -1,6 +1,16 @@ import { DEFAULT_TAG } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; -import { ColumnDescriptor } from './SourceEntity.js'; +import { ColumnDescriptor, SourceEntityDescriptor } from './SourceEntity.js'; + +export interface SourceTableOptions { + id: any; + connectionTag: string; + objectId: number | string | undefined; + schema: string; + name: string; + replicaIdColumns: ColumnDescriptor[]; + snapshotComplete: boolean; +} export interface TableSnapshotStatus { totalEstimatedCount: number; @@ -8,7 +18,7 @@ export interface TableSnapshotStatus { lastKey: Uint8Array | null; } -export class SourceTable { +export class SourceTable implements SourceEntityDescriptor { static readonly DEFAULT_TAG = DEFAULT_TAG; /** @@ -45,37 +55,41 @@ export class SourceTable { */ public snapshotStatus: TableSnapshotStatus | undefined = undefined; - constructor( - public readonly id: any, - public readonly connectionTag: string, - public readonly objectId: number | string | undefined, - public readonly schema: string, - public readonly table: string, + public snapshotComplete: boolean; - public readonly replicaIdColumns: ColumnDescriptor[], - public snapshotComplete: boolean - ) {} + constructor(public readonly options: SourceTableOptions) { + this.snapshotComplete = options.snapshotComplete; + } - get hasReplicaIdentity() { - return this.replicaIdColumns.length > 0; + get id() { + return this.options.id; } - /** - * Use for postgres only. - * - * Usage: db.query({statement: `SELECT $1::regclass`, params: [{type: 'varchar', value: table.qualifiedName}]}) - */ - get qualifiedName() { - return this.escapedIdentifier; + get connectionTag() { + return this.options.connectionTag; + } + + get objectId() { + return this.options.objectId; + } + + get schema() { + return this.options.schema; + } + get name() { + return this.options.name; + } + + get replicaIdColumns() { + return this.options.replicaIdColumns; } /** - * Use for postgres and logs only. - * - * Usage: db.query(`SELECT * FROM ${table.escapedIdentifier}`) + * Sanitized name of the entity in the format of "{schema}.{entity name}" + * Suitable for safe use in Postgres queries. */ - get escapedIdentifier() { - return `${util.escapeIdentifier(this.schema)}.${util.escapeIdentifier(this.table)}`; + get qualifiedName() { + return `${util.escapeIdentifier(this.schema)}.${util.escapeIdentifier(this.name)}`; } get syncAny() { @@ -86,15 +100,15 @@ export class SourceTable { * In-memory clone of the table status. */ clone() { - const copy = new SourceTable( - this.id, - this.connectionTag, - this.objectId, - this.schema, - this.table, - this.replicaIdColumns, - this.snapshotComplete - ); + const copy = new SourceTable({ + id: this.id, + connectionTag: this.connectionTag, + objectId: this.objectId, + schema: this.schema, + name: this.name, + replicaIdColumns: this.replicaIdColumns, + snapshotComplete: this.snapshotComplete + }); copy.syncData = this.syncData; copy.syncParameters = this.syncParameters; copy.snapshotStatus = this.snapshotStatus; diff --git a/packages/sync-rules/src/BaseSqlDataQuery.ts b/packages/sync-rules/src/BaseSqlDataQuery.ts index a3bba1eb..75d6fcf2 100644 --- a/packages/sync-rules/src/BaseSqlDataQuery.ts +++ b/packages/sync-rules/src/BaseSqlDataQuery.ts @@ -93,7 +93,7 @@ export class BaseSqlDataQuery { if (this.sourceTable.isWildcard) { return { ...row, - _table_suffix: this.sourceTable.suffix(table.table) + _table_suffix: this.sourceTable.suffix(table.name) }; } else { return row; @@ -130,7 +130,7 @@ export class BaseSqlDataQuery { this.getColumnOutputsFor(schemaTable, output); result.push({ - name: this.getOutputName(schemaTable.table), + name: this.getOutputName(schemaTable.name), columns: Object.values(output) }); } diff --git a/packages/sync-rules/src/SourceTableInterface.ts b/packages/sync-rules/src/SourceTableInterface.ts index 47f0cfdc..09e5d12a 100644 --- a/packages/sync-rules/src/SourceTableInterface.ts +++ b/packages/sync-rules/src/SourceTableInterface.ts @@ -1,5 +1,5 @@ export interface SourceTableInterface { readonly connectionTag: string; readonly schema: string; - readonly table: string; + readonly name: string; } diff --git a/packages/sync-rules/src/SqlDataQuery.ts b/packages/sync-rules/src/SqlDataQuery.ts index dda09b74..9f6f75c1 100644 --- a/packages/sync-rules/src/SqlDataQuery.ts +++ b/packages/sync-rules/src/SqlDataQuery.ts @@ -204,7 +204,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { // anything. id = castAsText(id) ?? ''; } - const outputTable = this.getOutputName(table.table); + const outputTable = this.getOutputName(table.name); return bucketIds.map((bucketId) => { return { diff --git a/packages/sync-rules/src/StaticSchema.ts b/packages/sync-rules/src/StaticSchema.ts index aa27114c..3ae1c8c3 100644 --- a/packages/sync-rules/src/StaticSchema.ts +++ b/packages/sync-rules/src/StaticSchema.ts @@ -46,13 +46,13 @@ export interface SourceConnectionDefinition { class SourceTableDetails implements SourceTableInterface, SourceSchemaTable { readonly connectionTag: string; readonly schema: string; - readonly table: string; + readonly name: string; private readonly columns: Record; constructor(connection: SourceConnectionDefinition, schema: SourceSchemaDefinition, table: SourceTableDefinition) { this.connectionTag = connection.tag; this.schema = schema.name; - this.table = table.name; + this.name = table.name; this.columns = Object.fromEntries( table.columns.map((column) => { return [column.name, mapColumn(column)]; diff --git a/packages/sync-rules/src/TablePattern.ts b/packages/sync-rules/src/TablePattern.ts index 55c90ec9..89b98109 100644 --- a/packages/sync-rules/src/TablePattern.ts +++ b/packages/sync-rules/src/TablePattern.ts @@ -49,9 +49,9 @@ export class TablePattern { return false; } if (this.isWildcard) { - return table.table.startsWith(this.tablePrefix); + return table.name.startsWith(this.tablePrefix); } else { - return this.tablePattern == table.table; + return this.tablePattern == table.name; } } diff --git a/packages/sync-rules/src/events/SqlEventDescriptor.ts b/packages/sync-rules/src/events/SqlEventDescriptor.ts index 0947fc2a..ff09ab25 100644 --- a/packages/sync-rules/src/events/SqlEventDescriptor.ts +++ b/packages/sync-rules/src/events/SqlEventDescriptor.ts @@ -1,5 +1,4 @@ import { SqlRuleError } from '../errors.js'; -import { IdSequence } from '../IdSequence.js'; import { SourceTableInterface } from '../SourceTableInterface.js'; import { QueryParseResult } from '../SqlBucketDescriptor.js'; import { SyncRulesOptions } from '../SqlSyncRules.js'; @@ -43,7 +42,7 @@ export class SqlEventDescriptor { const matchingQuery = this.sourceQueries.find((q) => q.applies(options.sourceTable)); if (!matchingQuery) { return { - errors: [{ error: `No marching source query found for table ${options.sourceTable.table}` }] + errors: [{ error: `No matching source query found for table ${options.sourceTable.name}` }] }; } diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index dc699af8..ac6d9035 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -4,7 +4,7 @@ import { SourceTableInterface } from './SourceTableInterface.js'; import { SyncRulesOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; import { toSyncRulesParameters } from './utils.js'; -import { BucketDescription, BucketPriority } from './BucketDescription.js'; +import { BucketPriority } from './BucketDescription.js'; import { ParameterLookup } from './BucketParameterQuerier.js'; export interface SyncRules { @@ -341,7 +341,7 @@ export type CompiledClause = RowValueClause | ParameterMatchClause | ParameterVa export type TrueIfParametersMatch = FilterParameters[]; export interface SourceSchemaTable { - table: string; + name: string; getColumn(column: string): ColumnDefinition | undefined; getColumns(): ColumnDefinition[]; } diff --git a/packages/sync-rules/test/src/util.ts b/packages/sync-rules/test/src/util.ts index e1ed5b80..b780cd0c 100644 --- a/packages/sync-rules/test/src/util.ts +++ b/packages/sync-rules/test/src/util.ts @@ -10,7 +10,7 @@ export class TestSourceTable implements SourceTableInterface { readonly connectionTag = DEFAULT_TAG; readonly schema = 'test_schema'; - constructor(public readonly table: string) {} + constructor(public readonly name: string) {} } export const PARSE_OPTIONS = { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 943acc6b..97d1c7a2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.2.0 - version: 0.2.0 + specifier: ^0.4.0 + version: 0.4.0 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -278,6 +278,9 @@ importers: mysql2: specifier: ^3.11.0 version: 3.11.3 + node-sql-parser: + specifier: ^5.3.9 + version: 5.3.9 semver: specifier: ^7.5.4 version: 7.6.2 @@ -1281,8 +1284,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.2.0': - resolution: {integrity: sha512-ua/n7WFfoiXmqfgwLikcm/AaDE6+t5gFVTWHWsbiuRQMNtXE1F2gXpZJdwKhr8WsOCYkB/A1ZOgbJKi4tK342g==} + '@powersync/mysql-zongji@0.4.0': + resolution: {integrity: sha512-O5zGYF3mzHO50SOSj3/6EnXYebC2Lvu1BTashbbz6eLAwaR3TkxwMfPGqFEsuecIm22djSBlgXjKM2FzVVG/VQ==} engines: {node: '>=22.0.0'} '@powersync/service-jsonbig@0.17.10': @@ -1526,6 +1529,9 @@ packages: '@types/node@22.16.2': resolution: {integrity: sha512-Cdqa/eJTvt4fC4wmq1Mcc0CPUjp/Qy2FGqLza3z3pKymsI969TcZ54diNJv8UYUgeWxyb8FSbCkhdR6WqmUFhA==} + '@types/pegjs@0.10.6': + resolution: {integrity: sha512-eLYXDbZWXh2uxf+w8sXS8d6KSoXTswfps6fvCUuVAGN8eRpfe7h9eSRydxiSJvo9Bf+GzifsDOr9TMQlmJdmkw==} + '@types/pg-pool@2.0.4': resolution: {integrity: sha512-qZAvkv1K3QbmHHFYSNRYPkRjOWRLBYrL4B9c+wG0GSVGBw0NtJwPcgx/DSddeDJvRGMHCEQ4VMEVfuJ/0gZ3XQ==} @@ -2891,6 +2897,10 @@ packages: resolution: {integrity: sha512-2YEOR5qlI1zUFbGMLKNfsrR5JUvFg9LxIRVE+xJe962pfVLH0rnItqLzv96XVs1Y1UIR8FxsXAuvX/lYAWZ2BQ==} engines: {node: '>=8'} + node-sql-parser@5.3.9: + resolution: {integrity: sha512-yJuCNCUWqS296krMKooLIJcZy+Q0OL6ZsNDxrwqj+HdGMHVT0ChPIFF1vqCexDe51VWKEwhU65OgvKyiyRcQLg==} + engines: {node: '>=8'} + nodemon@3.1.4: resolution: {integrity: sha512-wjPBbFhtpJwmIeY2yP7QF+UKzPfltVGtfce1g/bB15/8vCGZj8uxD62b/b9M9/WVgme0NZudpownKN+c0plXlQ==} engines: {node: '>=10'} @@ -4742,7 +4752,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.2.0': + '@powersync/mysql-zongji@0.4.0': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.52 @@ -4972,6 +4982,8 @@ snapshots: dependencies: undici-types: 6.21.0 + '@types/pegjs@0.10.6': {} + '@types/pg-pool@2.0.4': dependencies: '@types/pg': 8.6.1 @@ -6374,6 +6386,11 @@ snapshots: dependencies: big-integer: 1.6.51 + node-sql-parser@5.3.9: + dependencies: + '@types/pegjs': 0.10.6 + big-integer: 1.6.52 + nodemon@3.1.4: dependencies: chokidar: 3.6.0