Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c55293c

Browse files
authored
Re re introduce membership tables event stream ordering (#15356)
1 parent 8b3a502 commit c55293c

File tree

7 files changed

+163
-12
lines changed

7 files changed

+163
-12
lines changed

changelog.d/15356.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).

synapse/storage/databases/main/events.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,11 +1167,15 @@ def _update_current_state_txn(
11671167
# been inserted into room_memberships.
11681168
txn.execute_batch(
11691169
"""INSERT INTO current_state_events
1170-
(room_id, type, state_key, event_id, membership)
1171-
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
1170+
(room_id, type, state_key, event_id, membership, event_stream_ordering)
1171+
VALUES (
1172+
?, ?, ?, ?,
1173+
(SELECT membership FROM room_memberships WHERE event_id = ?),
1174+
(SELECT stream_ordering FROM events WHERE event_id = ?)
1175+
)
11721176
""",
11731177
[
1174-
(room_id, key[0], key[1], ev_id, ev_id)
1178+
(room_id, key[0], key[1], ev_id, ev_id, ev_id)
11751179
for key, ev_id in to_insert.items()
11761180
],
11771181
)
@@ -1198,11 +1202,15 @@ def _update_current_state_txn(
11981202
if to_insert:
11991203
txn.execute_batch(
12001204
"""INSERT INTO local_current_membership
1201-
(room_id, user_id, event_id, membership)
1202-
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
1205+
(room_id, user_id, event_id, membership, event_stream_ordering)
1206+
VALUES (
1207+
?, ?, ?,
1208+
(SELECT membership FROM room_memberships WHERE event_id = ?),
1209+
(SELECT stream_ordering FROM events WHERE event_id = ?)
1210+
)
12031211
""",
12041212
[
1205-
(room_id, key[1], ev_id, ev_id)
1213+
(room_id, key[1], ev_id, ev_id, ev_id)
12061214
for key, ev_id in to_insert.items()
12071215
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
12081216
],
@@ -1808,6 +1816,7 @@ def _store_room_members_txn(
18081816
table="room_memberships",
18091817
keys=(
18101818
"event_id",
1819+
"event_stream_ordering",
18111820
"user_id",
18121821
"sender",
18131822
"room_id",
@@ -1818,6 +1827,7 @@ def _store_room_members_txn(
18181827
values=[
18191828
(
18201829
event.event_id,
1830+
event.internal_metadata.stream_ordering,
18211831
event.state_key,
18221832
event.user_id,
18231833
event.room_id,
@@ -1850,6 +1860,7 @@ def _store_room_members_txn(
18501860
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
18511861
values={
18521862
"event_id": event.event_id,
1863+
"event_stream_ordering": event.internal_metadata.stream_ordering,
18531864
"membership": event.membership,
18541865
},
18551866
)

synapse/storage/databases/main/purge_events.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,14 +428,16 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
428428
"partial_state_events",
429429
"partial_state_rooms_servers",
430430
"partial_state_rooms",
431+
# Note: the _membership(s) tables have foreign keys to the `events` table
432+
# so must be deleted first.
433+
"local_current_membership",
434+
"room_memberships",
431435
"events",
432436
"federation_inbound_events_staging",
433-
"local_current_membership",
434437
"receipts_graph",
435438
"receipts_linearized",
436439
"room_aliases",
437440
"room_depth",
438-
"room_memberships",
439441
"room_stats_state",
440442
"room_stats_current",
441443
"room_stats_earliest_token",

synapse/storage/schema/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
SCHEMA_VERSION = 74 # remember to update the list below when updating
15+
SCHEMA_VERSION = 75 # remember to update the list below when updating
1616
"""Represents the expectations made by the codebase about the database schema
1717
1818
This should be incremented whenever the codebase changes its requirements on the
@@ -91,13 +91,19 @@
9191
- A query on `event_stream_ordering` column has now been disambiguated (i.e. the
9292
codebase can handle the `current_state_events`, `local_current_memberships` and
9393
`room_memberships` tables having an `event_stream_ordering` column).
94+
95+
Changes in SCHEMA_VERSION = 75:
96+
- The `event_stream_ordering` column in membership tables (`current_state_events`,
97+
`local_current_membership` & `room_memberships`) is now being populated for new
98+
rows. When the background job to populate historical rows lands this will
99+
become the compat schema version.
94100
"""
95101

96102

97103
SCHEMA_COMPAT_VERSION = (
98-
# The threads_id column must exist for event_push_actions, event_push_summary,
99-
# receipts_linearized, and receipts_graph.
100-
73
104+
# Queries against `event_stream_ordering` columns in membership tables must
105+
# be disambiguated.
106+
74
101107
)
102108
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
103109
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/* Copyright 2022 Beeper
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
17+
-- we use to improve database performance by reduring JOINs.
18+
19+
-- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables,
20+
-- which will be validated in a later migration. For all new/updated rows the FKEY will be checked.
21+
22+
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
23+
ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
24+
25+
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
26+
ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
27+
28+
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
29+
ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/* Copyright 2022 Beeper
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
17+
-- we use to improve database performance by reduring JOINs.
18+
19+
-- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is
20+
21+
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
22+
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
23+
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright 2022 Beeper
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
"""
17+
This migration adds triggers to the room membership tables to enforce consistency.
18+
Triggers cannot be expressed in .sql files, so we have to use a separate file.
19+
"""
20+
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
21+
from synapse.storage.types import Cursor
22+
23+
24+
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
25+
# Complain if the `event_stream_ordering` in membership tables doesn't match
26+
# the `stream_ordering` row with the same `event_id` in `events`.
27+
if isinstance(database_engine, Sqlite3Engine):
28+
for table in (
29+
"current_state_events",
30+
"local_current_membership",
31+
"room_memberships",
32+
):
33+
cur.execute(
34+
f"""
35+
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
36+
BEFORE INSERT ON {table}
37+
FOR EACH ROW
38+
BEGIN
39+
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
40+
WHERE EXISTS (
41+
SELECT 1 FROM events
42+
WHERE events.event_id = NEW.event_id
43+
AND events.stream_ordering != NEW.event_stream_ordering
44+
);
45+
END;
46+
"""
47+
)
48+
elif isinstance(database_engine, PostgresEngine):
49+
cur.execute(
50+
"""
51+
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
52+
BEGIN
53+
IF EXISTS (
54+
SELECT 1 FROM events
55+
WHERE events.event_id = NEW.event_id
56+
AND events.stream_ordering != NEW.event_stream_ordering
57+
) THEN
58+
RAISE EXCEPTION 'Incorrect event_stream_ordering';
59+
END IF;
60+
RETURN NEW;
61+
END;
62+
$BODY$ LANGUAGE plpgsql;
63+
"""
64+
)
65+
66+
for table in (
67+
"current_state_events",
68+
"local_current_membership",
69+
"room_memberships",
70+
):
71+
cur.execute(
72+
f"""
73+
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
74+
FOR EACH ROW
75+
EXECUTE PROCEDURE check_event_stream_ordering()
76+
"""
77+
)
78+
else:
79+
raise NotImplementedError("Unknown database engine")

0 commit comments

Comments
 (0)