Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 8d46fac

Browse files
authored
Delete messages from device_inbox table when deleting device (#10969)
Fixes: #9346
1 parent a930da3 commit 8d46fac

File tree

6 files changed

+256
-15
lines changed

6 files changed

+256
-15
lines changed

changelog.d/10969.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine.

synapse/storage/databases/main/deviceinbox.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
from synapse.logging.opentracing import log_kv, set_tag, trace
2020
from synapse.replication.tcp.streams import ToDeviceStream
2121
from synapse.storage._base import SQLBaseStore, db_to_json
22-
from synapse.storage.database import DatabasePool
22+
from synapse.storage.database import DatabasePool, LoggingTransaction
2323
from synapse.storage.engines import PostgresEngine
2424
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
25+
from synapse.types import JsonDict
2526
from synapse.util import json_encoder
2627
from synapse.util.caches.expiringcache import ExpiringCache
2728
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -555,6 +556,7 @@ def _add_messages_to_local_device_inbox_txn(
555556

556557
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
557558
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
559+
REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
558560

559561
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
560562
super().__init__(database, db_conn, hs)
@@ -570,6 +572,11 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
570572
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
571573
)
572574

575+
self.db_pool.updates.register_background_update_handler(
576+
self.REMOVE_DELETED_DEVICES,
577+
self._remove_deleted_devices_from_device_inbox,
578+
)
579+
573580
async def _background_drop_index_device_inbox(self, progress, batch_size):
574581
def reindex_txn(conn):
575582
txn = conn.cursor()
@@ -582,6 +589,89 @@ def reindex_txn(conn):
582589

583590
return 1
584591

592+
async def _remove_deleted_devices_from_device_inbox(
593+
self, progress: JsonDict, batch_size: int
594+
) -> int:
595+
"""A background update that deletes all device_inboxes for deleted devices.
596+
597+
This should only need to be run once (when users upgrade to v1.46.0)
598+
599+
Args:
600+
progress: JsonDict used to store progress of this background update
601+
batch_size: the maximum number of rows to retrieve in a single select query
602+
603+
Returns:
604+
The number of deleted rows
605+
"""
606+
607+
def _remove_deleted_devices_from_device_inbox_txn(
608+
txn: LoggingTransaction,
609+
) -> int:
610+
"""stream_id is not unique
611+
we need to use an inclusive `stream_id >= ?` clause,
612+
since we might not have deleted all dead device messages for the stream_id
613+
returned from the previous query
614+
615+
Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
616+
to avoid problems of deleting a large number of rows all at once
617+
due to a single device having lots of device messages.
618+
"""
619+
620+
last_stream_id = progress.get("stream_id", 0)
621+
622+
sql = """
623+
SELECT device_id, user_id, stream_id
624+
FROM device_inbox
625+
WHERE
626+
stream_id >= ?
627+
AND (device_id, user_id) NOT IN (
628+
SELECT device_id, user_id FROM devices
629+
)
630+
ORDER BY stream_id
631+
LIMIT ?
632+
"""
633+
634+
txn.execute(sql, (last_stream_id, batch_size))
635+
rows = txn.fetchall()
636+
637+
num_deleted = 0
638+
for row in rows:
639+
num_deleted += self.db_pool.simple_delete_txn(
640+
txn,
641+
"device_inbox",
642+
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
643+
)
644+
645+
if rows:
646+
# send more than stream_id to progress
647+
# otherwise it can happen in large deployments that
648+
# no change of status is visible in the log file
649+
# it may be that the stream_id does not change in several runs
650+
self.db_pool.updates._background_update_progress_txn(
651+
txn,
652+
self.REMOVE_DELETED_DEVICES,
653+
{
654+
"device_id": rows[-1][0],
655+
"user_id": rows[-1][1],
656+
"stream_id": rows[-1][2],
657+
},
658+
)
659+
660+
return num_deleted
661+
662+
number_deleted = await self.db_pool.runInteraction(
663+
"_remove_deleted_devices_from_device_inbox",
664+
_remove_deleted_devices_from_device_inbox_txn,
665+
)
666+
667+
# The task is finished when no more lines are deleted.
668+
if not number_deleted:
669+
await self.db_pool.updates._end_background_update(
670+
self.REMOVE_DELETED_DEVICES
671+
)
672+
673+
return number_deleted
674+
585675

586676
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
587677
pass

synapse/storage/databases/main/devices.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,19 +1134,14 @@ async def store_device(
11341134
raise StoreError(500, "Problem storing device.")
11351135

11361136
async def delete_device(self, user_id: str, device_id: str) -> None:
1137-
"""Delete a device.
1137+
"""Delete a device and its device_inbox.
11381138
11391139
Args:
11401140
user_id: The ID of the user which owns the device
11411141
device_id: The ID of the device to delete
11421142
"""
1143-
await self.db_pool.simple_delete_one(
1144-
table="devices",
1145-
keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
1146-
desc="delete_device",
1147-
)
11481143

1149-
self.device_id_exists_cache.invalidate((user_id, device_id))
1144+
await self.delete_devices(user_id, [device_id])
11501145

11511146
async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
11521147
"""Deletes several devices.
@@ -1155,13 +1150,25 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
11551150
user_id: The ID of the user which owns the devices
11561151
device_ids: The IDs of the devices to delete
11571152
"""
1158-
await self.db_pool.simple_delete_many(
1159-
table="devices",
1160-
column="device_id",
1161-
iterable=device_ids,
1162-
keyvalues={"user_id": user_id, "hidden": False},
1163-
desc="delete_devices",
1164-
)
1153+
1154+
def _delete_devices_txn(txn: LoggingTransaction) -> None:
1155+
self.db_pool.simple_delete_many_txn(
1156+
txn,
1157+
table="devices",
1158+
column="device_id",
1159+
values=device_ids,
1160+
keyvalues={"user_id": user_id, "hidden": False},
1161+
)
1162+
1163+
self.db_pool.simple_delete_many_txn(
1164+
txn,
1165+
table="device_inbox",
1166+
column="device_id",
1167+
values=device_ids,
1168+
keyvalues={"user_id": user_id},
1169+
)
1170+
1171+
await self.db_pool.runInteraction("delete_devices", _delete_devices_txn)
11651172
for device_id in device_ids:
11661173
self.device_id_exists_cache.invalidate((user_id, device_id))
11671174

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/* Copyright 2021 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
17+
-- Remove messages from the device_inbox table which were orphaned
18+
-- when a device was deleted using Synapse earlier than 1.46.0.
19+
-- This runs as background task, but may take a bit to finish.
20+
21+
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
22+
(6402, 'remove_deleted_devices_from_device_inbox', '{}');

tests/handlers/test_device.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,37 @@ def test_delete_device(self):
160160
# we'd like to check the access token was invalidated, but that's a
161161
# bit of a PITA.
162162

163+
def test_delete_device_and_device_inbox(self):
164+
self._record_users()
165+
166+
# add an device_inbox
167+
self.get_success(
168+
self.store.db_pool.simple_insert(
169+
"device_inbox",
170+
{
171+
"user_id": user1,
172+
"device_id": "abc",
173+
"stream_id": 1,
174+
"message_json": "{}",
175+
},
176+
)
177+
)
178+
179+
# delete the device
180+
self.get_success(self.handler.delete_device(user1, "abc"))
181+
182+
# check that the device_inbox was deleted
183+
res = self.get_success(
184+
self.store.db_pool.simple_select_one(
185+
table="device_inbox",
186+
keyvalues={"user_id": user1, "device_id": "abc"},
187+
retcols=("user_id", "device_id"),
188+
allow_none=True,
189+
desc="get_device_id_from_device_inbox",
190+
)
191+
)
192+
self.assertIsNone(res)
193+
163194
def test_update_device(self):
164195
self._record_users()
165196

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Copyright 2021 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from synapse.rest import admin
16+
from synapse.rest.client import devices
17+
18+
from tests.unittest import HomeserverTestCase
19+
20+
21+
class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase):
22+
23+
servlets = [
24+
admin.register_servlets,
25+
devices.register_servlets,
26+
]
27+
28+
def prepare(self, reactor, clock, hs):
29+
self.store = hs.get_datastore()
30+
self.user_id = self.register_user("foo", "pass")
31+
32+
def test_background_remove_deleted_devices_from_device_inbox(self):
33+
"""Test that the background task to delete old device_inboxes works properly."""
34+
35+
# create a valid device
36+
self.get_success(
37+
self.store.store_device(self.user_id, "cur_device", "display_name")
38+
)
39+
40+
# Add device_inbox to devices
41+
self.get_success(
42+
self.store.db_pool.simple_insert(
43+
"device_inbox",
44+
{
45+
"user_id": self.user_id,
46+
"device_id": "cur_device",
47+
"stream_id": 1,
48+
"message_json": "{}",
49+
},
50+
)
51+
)
52+
self.get_success(
53+
self.store.db_pool.simple_insert(
54+
"device_inbox",
55+
{
56+
"user_id": self.user_id,
57+
"device_id": "old_device",
58+
"stream_id": 2,
59+
"message_json": "{}",
60+
},
61+
)
62+
)
63+
64+
# Insert and run the background update.
65+
self.get_success(
66+
self.store.db_pool.simple_insert(
67+
"background_updates",
68+
{
69+
"update_name": "remove_deleted_devices_from_device_inbox",
70+
"progress_json": "{}",
71+
},
72+
)
73+
)
74+
75+
# ... and tell the DataStore that it hasn't finished all updates yet
76+
self.store.db_pool.updates._all_done = False
77+
78+
self.wait_for_background_updates()
79+
80+
# Make sure the background task deleted old device_inbox
81+
res = self.get_success(
82+
self.store.db_pool.simple_select_onecol(
83+
table="device_inbox",
84+
keyvalues={},
85+
retcol="device_id",
86+
desc="get_device_id_from_device_inbox",
87+
)
88+
)
89+
self.assertEqual(1, len(res))
90+
self.assertEqual(res[0], "cur_device")

0 commit comments

Comments
 (0)