Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit d64b70a

Browse files
authored
Merge pull request #6098 from matrix-org/erikj/cleanup_user_ips_2
Prune old rows in user_ips tables.
2 parents 9614d3c + a963181 commit d64b70a

File tree

7 files changed

+197
-11
lines changed

7 files changed

+197
-11
lines changed

changelog.d/6098.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for pruning old rows in `user_ips` table.

docs/sample_config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,12 @@ listeners:
316316
#
317317
redaction_retention_period: 7d
318318

319+
# How long to track users' last seen time and IPs in the database.
320+
#
321+
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
322+
#
323+
#user_ips_max_age: 14d
324+
319325

320326
## TLS ##
321327

synapse/config/server.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,13 @@ def read_config(self, config, **kwargs):
172172
else:
173173
self.redaction_retention_period = None
174174

175+
# How long to keep entries in the `users_ips` table.
176+
user_ips_max_age = config.get("user_ips_max_age", "28d")
177+
if user_ips_max_age is not None:
178+
self.user_ips_max_age = self.parse_duration(user_ips_max_age)
179+
else:
180+
self.user_ips_max_age = None
181+
175182
# Options to disable HS
176183
self.hs_disabled = config.get("hs_disabled", False)
177184
self.hs_disabled_message = config.get("hs_disabled_message", "")
@@ -736,6 +743,12 @@ def generate_config_section(
736743
# Defaults to `7d`. Set to `null` to disable.
737744
#
738745
redaction_retention_period: 7d
746+
747+
# How long to track users' last seen time and IPs in the database.
748+
#
749+
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
750+
#
751+
#user_ips_max_age: 14d
739752
"""
740753
% locals()
741754
)

synapse/metrics/background_process_metrics.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import logging
1717
import threading
18+
from asyncio import iscoroutine
19+
from functools import wraps
1820

1921
import six
2022

@@ -173,7 +175,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
173175
174176
Args:
175177
desc (str): a description for this background process type
176-
func: a function, which may return a Deferred
178+
func: a function, which may return a Deferred or a coroutine
177179
args: positional args for func
178180
kwargs: keyword args for func
179181
@@ -197,7 +199,17 @@ def run():
197199
_background_processes.setdefault(desc, set()).add(proc)
198200

199201
try:
200-
yield func(*args, **kwargs)
202+
result = func(*args, **kwargs)
203+
204+
# We probably don't have an ensureDeferred in our call stack to handle
205+
# coroutine results, so we need to ensureDeferred here.
206+
#
207+
# But we need this check because ensureDeferred doesn't like being
208+
# called on immediate values (as opposed to Deferreds or coroutines).
209+
if iscoroutine(result):
210+
result = defer.ensureDeferred(result)
211+
212+
return (yield result)
201213
except Exception:
202214
logger.exception("Background process '%s' threw an exception", desc)
203215
finally:
@@ -208,3 +220,20 @@ def run():
208220

209221
with PreserveLoggingContext():
210222
return run()
223+
224+
225+
def wrap_as_background_process(desc):
226+
"""Decorator that wraps a function that gets called as a background
227+
process.
228+
229+
Equivalent of calling the function with `run_as_background_process`
230+
"""
231+
232+
def wrap_as_background_process_inner(func):
233+
@wraps(func)
234+
def wrap_as_background_process_inner_2(*args, **kwargs):
235+
return run_as_background_process(desc, func, *args, **kwargs)
236+
237+
return wrap_as_background_process_inner_2
238+
239+
return wrap_as_background_process_inner

synapse/storage/background_updates.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,34 @@ def has_completed_background_updates(self):
140140
"background_updates",
141141
keyvalues=None,
142142
retcol="1",
143-
desc="check_background_updates",
143+
desc="has_completed_background_updates",
144144
)
145145
if not updates:
146146
self._all_done = True
147147
return True
148148

149149
return False
150150

151+
async def has_completed_background_update(self, update_name) -> bool:
152+
"""Check if the given background update has finished running.
153+
"""
154+
155+
if self._all_done:
156+
return True
157+
158+
if update_name in self._background_update_queue:
159+
return False
160+
161+
update_exists = await self._simple_select_one_onecol(
162+
"background_updates",
163+
keyvalues={"update_name": update_name},
164+
retcol="1",
165+
desc="has_completed_background_update",
166+
allow_none=True,
167+
)
168+
169+
return not update_exists
170+
151171
@defer.inlineCallbacks
152172
def do_next_background_update(self, desired_duration_ms):
153173
"""Does some amount of work on the next queued background update

synapse/storage/client_ips.py

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from twisted.internet import defer
2121

22-
from synapse.metrics.background_process_metrics import run_as_background_process
22+
from synapse.metrics.background_process_metrics import wrap_as_background_process
2323
from synapse.util.caches import CACHE_SIZE_FACTOR
2424

2525
from . import background_updates
@@ -42,6 +42,8 @@ def __init__(self, db_conn, hs):
4242

4343
super(ClientIpStore, self).__init__(db_conn, hs)
4444

45+
self.user_ips_max_age = hs.config.user_ips_max_age
46+
4547
self.register_background_index_update(
4648
"user_ips_device_index",
4749
index_name="user_ips_device_id",
@@ -100,6 +102,9 @@ def __init__(self, db_conn, hs):
100102
"before", "shutdown", self._update_client_ips_batch
101103
)
102104

105+
if self.user_ips_max_age:
106+
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
107+
103108
@defer.inlineCallbacks
104109
def _remove_user_ip_nonunique(self, progress, batch_size):
105110
def f(conn):
@@ -319,20 +324,19 @@ def insert_client_ip(
319324

320325
self._batch_row_update[key] = (user_agent, device_id, now)
321326

327+
@wrap_as_background_process("update_client_ips")
322328
def _update_client_ips_batch(self):
323329

324330
# If the DB pool has already terminated, don't try updating
325331
if not self.hs.get_db_pool().running:
326332
return
327333

328-
def update():
329-
to_update = self._batch_row_update
330-
self._batch_row_update = {}
331-
return self.runInteraction(
332-
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
333-
)
334+
to_update = self._batch_row_update
335+
self._batch_row_update = {}
334336

335-
return run_as_background_process("update_client_ips", update)
337+
return self.runInteraction(
338+
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
339+
)
336340

337341
def _update_client_ips_batch_txn(self, txn, to_update):
338342
if "user_ips" in self._unsafe_to_upsert_tables or (
@@ -496,3 +500,45 @@ def _devices_last_seen_update_txn(txn):
496500
yield self._end_background_update("devices_last_seen")
497501

498502
return updated
503+
504+
@wrap_as_background_process("prune_old_user_ips")
505+
async def _prune_old_user_ips(self):
506+
"""Removes entries in user IPs older than the configured period.
507+
"""
508+
509+
if self.user_ips_max_age is None:
510+
# Nothing to do
511+
return
512+
513+
if not await self.has_completed_background_update("devices_last_seen"):
514+
# Only start pruning if we have finished populating the devices
515+
# last seen info.
516+
return
517+
518+
# We do a slightly funky SQL delete to ensure we don't try and delete
519+
# too much at once (as the table may be very large from before we
520+
# started pruning).
521+
#
522+
# This works by finding the max last_seen that is less than the given
523+
# time, but has no more than N rows before it, deleting all rows with
524+
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
525+
# returns exactly one row).
526+
sql = """
527+
DELETE FROM user_ips
528+
WHERE last_seen <= (
529+
SELECT COALESCE(MAX(last_seen), -1)
530+
FROM (
531+
SELECT last_seen FROM user_ips
532+
WHERE last_seen <= ?
533+
ORDER BY last_seen ASC
534+
LIMIT 5000
535+
) AS u
536+
)
537+
"""
538+
539+
timestamp = self.clock.time_msec() - self.user_ips_max_age
540+
541+
def _prune_old_user_ips_txn(txn):
542+
txn.execute(sql, (timestamp,))
543+
544+
await self.runInteraction("_prune_old_user_ips", _prune_old_user_ips_txn)

tests/storage/test_client_ips.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,77 @@ def test_devices_last_seen_bg_update(self):
279279
r,
280280
)
281281

282+
def test_old_user_ips_pruned(self):
283+
# First make sure we have completed all updates.
284+
while not self.get_success(self.store.has_completed_background_updates()):
285+
self.get_success(self.store.do_next_background_update(100), by=0.1)
286+
287+
# Insert a user IP
288+
user_id = "@user:id"
289+
self.get_success(
290+
self.store.insert_client_ip(
291+
user_id, "access_token", "ip", "user_agent", "device_id"
292+
)
293+
)
294+
295+
# Force persisting to disk
296+
self.reactor.advance(200)
297+
298+
# We should see that in the DB
299+
result = self.get_success(
300+
self.store._simple_select_list(
301+
table="user_ips",
302+
keyvalues={"user_id": user_id},
303+
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
304+
desc="get_user_ip_and_agents",
305+
)
306+
)
307+
308+
self.assertEqual(
309+
result,
310+
[
311+
{
312+
"access_token": "access_token",
313+
"ip": "ip",
314+
"user_agent": "user_agent",
315+
"device_id": "device_id",
316+
"last_seen": 0,
317+
}
318+
],
319+
)
320+
321+
# Now advance by a couple of months
322+
self.reactor.advance(60 * 24 * 60 * 60)
323+
324+
# We should get no results.
325+
result = self.get_success(
326+
self.store._simple_select_list(
327+
table="user_ips",
328+
keyvalues={"user_id": user_id},
329+
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
330+
desc="get_user_ip_and_agents",
331+
)
332+
)
333+
334+
self.assertEqual(result, [])
335+
336+
# But we should still get the correct values for the device
337+
result = self.get_success(
338+
self.store.get_last_client_ip_by_device(user_id, "device_id")
339+
)
340+
341+
r = result[(user_id, "device_id")]
342+
self.assertDictContainsSubset(
343+
{
344+
"user_id": user_id,
345+
"device_id": "device_id",
346+
"ip": "ip",
347+
"user_agent": "user_agent",
348+
"last_seen": 0,
349+
},
350+
r,
351+
)
352+
282353

283354
class ClientIpAuthTestCase(unittest.HomeserverTestCase):
284355

0 commit comments

Comments
 (0)