|
18 | 18 | from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
|
19 | 19 |
|
20 | 20 | from synapse.api.constants import EventTypes
|
| 21 | +from synapse.config._base import Config |
| 22 | +from synapse.metrics.background_process_metrics import wrap_as_background_process |
21 | 23 | from synapse.replication.tcp.streams import BackfillStream, CachesStream
|
22 | 24 | from synapse.replication.tcp.streams.events import (
|
23 | 25 | EventsStream,
|
|
52 | 54 | # As above, but for invalidating room caches on room deletion
|
53 | 55 | DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
|
54 | 56 |
|
| 57 | +# How long between cache invalidation table cleanups, once we have caught up |
| 58 | +# with the backlog. |
| 59 | +REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") |
| 60 | + |
| 61 | +# How long between cache invalidation table cleanups, before we have caught |
| 62 | +# up with the backlog. |
| 63 | +CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") |
| 64 | + |
| 65 | +# Maximum number of cache invalidation rows to delete at once. |
| 66 | +CLEAN_UP_MAX_BATCH_SIZE = 20_000 |
| 67 | + |
| 68 | +# Keep cache invalidations for 7 days |
| 69 | +# (This is likely to be quite excessive.) |
| 70 | +RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d") |
| 71 | + |
55 | 72 |
|
56 | 73 | class CacheInvalidationWorkerStore(SQLBaseStore):
|
57 | 74 | def __init__(
|
@@ -98,6 +115,18 @@ def __init__(
|
98 | 115 | else:
|
99 | 116 | self._cache_id_gen = None
|
100 | 117 |
|
| 118 | + # Occasionally clean up the cache invalidations stream table by deleting |
| 119 | + # old rows. |
| 120 | + # This is only applicable when Postgres is in use; this table is unused |
| 121 | + # and not populated at all when SQLite is the active database engine. |
| 122 | + if hs.config.worker.run_background_tasks and isinstance( |
| 123 | + self.database_engine, PostgresEngine |
| 124 | + ): |
| 125 | + self.hs.get_clock().call_later( |
| 126 | + CATCH_UP_CLEANUP_INTERVAL_MS / 1000, |
| 127 | + self._clean_up_cache_invalidation_wrapper, |
| 128 | + ) |
| 129 | + |
101 | 130 | async def get_all_updated_caches(
|
102 | 131 | self, instance_name: str, last_id: int, current_id: int, limit: int
|
103 | 132 | ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
@@ -554,3 +583,104 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
|
554 | 583 | return self._cache_id_gen.get_current_token_for_writer(instance_name)
|
555 | 584 | else:
|
556 | 585 | return 0
|
| 586 | + |
| 587 | + @wrap_as_background_process("clean_up_old_cache_invalidations") |
| 588 | + async def _clean_up_cache_invalidation_wrapper(self) -> None: |
| 589 | + """ |
| 590 | + Clean up cache invalidation stream table entries occasionally. |
| 591 | + If we are behind (i.e. there are entries old enough to |
| 592 | + be deleted but too many of them to be deleted in one go), |
| 593 | + then we run slightly more frequently. |
| 594 | + """ |
| 595 | + delete_up_to: int = ( |
| 596 | + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS |
| 597 | + ) |
| 598 | + |
| 599 | + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) |
| 600 | + |
| 601 | + # Vary how long we wait before calling again depending on whether we |
| 602 | + # are still sifting through backlog or we have caught up. |
| 603 | + if in_backlog: |
| 604 | + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS |
| 605 | + else: |
| 606 | + next_interval = REGULAR_CLEANUP_INTERVAL_MS |
| 607 | + |
| 608 | + self.hs.get_clock().call_later( |
| 609 | + next_interval / 1000, self._clean_up_cache_invalidation_wrapper |
| 610 | + ) |
| 611 | + |
| 612 | + async def _clean_up_batch_of_old_cache_invalidations( |
| 613 | + self, delete_up_to_millisec: int |
| 614 | + ) -> bool: |
| 615 | + """ |
| 616 | + Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). |
| 617 | +
|
| 618 | + Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. |
| 619 | +
|
| 620 | + Returns true if and only if we were limited by batch size (i.e. we are in backlog: |
| 621 | + there are more things to clean up). |
| 622 | + """ |
| 623 | + |
| 624 | + def _clean_up_batch_of_old_cache_invalidations_txn( |
| 625 | + txn: LoggingTransaction, |
| 626 | + ) -> bool: |
| 627 | + # First get the earliest stream ID |
| 628 | + txn.execute( |
| 629 | + """ |
| 630 | + SELECT stream_id FROM cache_invalidation_stream_by_instance |
| 631 | + ORDER BY stream_id ASC |
| 632 | + LIMIT 1 |
| 633 | + """ |
| 634 | + ) |
| 635 | + row = txn.fetchone() |
| 636 | + if row is None: |
| 637 | + return False |
| 638 | + earliest_stream_id: int = row[0] |
| 639 | + |
| 640 | + # Then find the last stream ID of the range we will delete |
| 641 | + txn.execute( |
| 642 | + """ |
| 643 | + SELECT stream_id FROM cache_invalidation_stream_by_instance |
| 644 | + WHERE stream_id <= ? AND invalidation_ts <= ? |
| 645 | + ORDER BY stream_id DESC |
| 646 | + LIMIT 1 |
| 647 | + """, |
| 648 | + (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec), |
| 649 | + ) |
| 650 | + row = txn.fetchone() |
| 651 | + if row is None: |
| 652 | + return False |
| 653 | + cutoff_stream_id: int = row[0] |
| 654 | + |
| 655 | + # Determine whether we are caught up or still catching up |
| 656 | + txn.execute( |
| 657 | + """ |
| 658 | + SELECT invalidation_ts FROM cache_invalidation_stream_by_instance |
| 659 | + WHERE stream_id > ? |
| 660 | + ORDER BY stream_id ASC |
| 661 | + LIMIT 1 |
| 662 | + """, |
| 663 | + (cutoff_stream_id,), |
| 664 | + ) |
| 665 | + row = txn.fetchone() |
| 666 | + if row is None: |
| 667 | + in_backlog = False |
| 668 | + else: |
| 669 | + # We are in backlog if the next row could have been deleted |
| 670 | + # if we didn't have such a small batch size |
| 671 | + in_backlog = row[0] <= delete_up_to_millisec |
| 672 | + |
| 673 | + txn.execute( |
| 674 | + """ |
| 675 | + DELETE FROM cache_invalidation_stream_by_instance |
| 676 | + WHERE ? <= stream_id AND stream_id <= ? |
| 677 | + """, |
| 678 | + (earliest_stream_id, cutoff_stream_id), |
| 679 | + ) |
| 680 | + |
| 681 | + return in_backlog |
| 682 | + |
| 683 | + return await self.db_pool.runInteraction( |
| 684 | + "clean_up_old_cache_invalidations", |
| 685 | + _clean_up_batch_of_old_cache_invalidations_txn, |
| 686 | + ) |
0 commit comments