From 297945bad0f64a71d78cdc0a610d137a5902e76a Mon Sep 17 00:00:00 2001 From: jeeminso Date: Fri, 18 Jul 2025 11:55:51 -0400 Subject: [PATCH 1/3] Improve metadata bwc test for Logical Replication --- tests/bwc/test_rolling_upgrade.py | 54 +++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index 53fa50e9..e81791f0 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -2,7 +2,7 @@ from crate.client import connect from crate.client.exceptions import ProgrammingError -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy ROLLING_UPGRADES_V4 = ( # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug @@ -41,7 +41,7 @@ UpgradePath('5.8.x', '5.9.x'), UpgradePath('5.9.x', '5.10.x'), UpgradePath('5.10.x', '6.0.x'), - UpgradePath('6.0.x', 'latest-nightly'), + UpgradePath('6.0.x', 'branch:jeeminso/jeeminso/lr-broken-subscription'), ) @@ -88,6 +88,10 @@ def _test_rolling_upgrade(self, path, nodes): } cluster = self._new_cluster(path.from_version, nodes, settings=settings) cluster.start() + replica_cluster = None + if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: + replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) + replica_cluster.start() with connect(cluster.node().http_url, error_trace=True) as conn: c = conn.cursor() c.execute("create user arthur with (password = 'secret')") @@ -152,6 +156,33 @@ def _test_rolling_upgrade(self, path, nodes): # Add the shards of the new partition primaries expected_active_shards += shards + # Set up tables for logical replications + def num_docs_x(cursor): + cursor.execute("select count(*) from doc.x") + return cursor.fetchall()[0][0] + + def num_docs_rx(cursor): + cursor.execute("select count(*) from doc.rx") + return cursor.fetchall()[0][0] + + if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: + c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)") + c.execute("create publication p for table doc.x") + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: + rc = replica_conn.cursor() + transport_port = cluster.node().addresses.transport.port + replica_transport_port = replica_cluster.node().addresses.transport.port + assert 4300 <= transport_port <= 4310 and 4300 <= replica_transport_port <= 4310 + rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)") + rc.execute("create publication rp for table doc.rx") + rc.execute(f"create subscription rs connection 'crate://localhost:{transport_port}?user=crate&sslmode=sniff' publication p") + wait_for_active_shards(rc, 2) # doc.rx created via create-table and doc.x that is subscribed + assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0)) + c.execute(f"create subscription s connection 'crate://localhost:{replica_transport_port}?user=crate&sslmode=sniff' publication rp") + expected_active_shards += 2 + wait_for_active_shards(c, expected_active_shards) + assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0)) + for idx, node in enumerate(cluster): # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. # Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works. @@ -282,6 +313,25 @@ def _test_rolling_upgrade(self, path, nodes): c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx]) self.assertEqual(c.fetchall(), [[partition_version]]) + # Ensure logical replications works + if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: + rc = replica_conn.cursor() + + # Cannot drop replicated tables + with self.assertRaises(ProgrammingError): + rc.execute("drop table doc.x") + c.execute("drop table doc.rx") + + count = num_docs_x(rc) + count2 = num_docs_rx(c) + + c.execute("insert into doc.x values (1)") + rc.execute("insert into doc.rx values (1)") + + assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1)) + assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1)) + # Finally validate that all shards (primaries and replicas) of all partitions are started # and writes into the partitioned table while upgrading were successful with connect(cluster.node().http_url, error_trace=True) as conn: From c95f54803cbb7dc651c3fd00e04c777e90122ac0 Mon Sep 17 00:00:00 2001 From: jeeminso Date: Fri, 25 Jul 2025 12:40:59 -0400 Subject: [PATCH 2/3] temp --- tests/bwc/test_rolling_upgrade.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index e81791f0..69117bbb 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -41,7 +41,7 @@ UpgradePath('5.8.x', '5.9.x'), UpgradePath('5.9.x', '5.10.x'), UpgradePath('5.10.x', '6.0.x'), - UpgradePath('6.0.x', 'branch:jeeminso/jeeminso/lr-broken-subscription'), + UpgradePath('6.0.x', 'latest-nightly'), ) From 4dfc293d53de17bbaab6026f2e96c550a160c735 Mon Sep 17 00:00:00 2001 From: jeeminso Date: Sun, 27 Jul 2025 12:44:32 -0400 Subject: [PATCH 3/3] Run with 6.0 instead of 6.0.x --- tests/bwc/test_rolling_upgrade.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index 69117bbb..0a27fb67 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -40,8 +40,8 @@ UpgradePath('5.7.x', '5.8.x'), UpgradePath('5.8.x', '5.9.x'), UpgradePath('5.9.x', '5.10.x'), - UpgradePath('5.10.x', '6.0.x'), - UpgradePath('6.0.x', 'latest-nightly'), + UpgradePath('5.10.x', '6.0'), + UpgradePath('6.0', 'latest-nightly'), )