|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
| 14 | +import datetime |
14 | 15 | import itertools
|
15 | 16 | import logging
|
16 | 17 | from queue import Empty, PriorityQueue
|
|
43 | 44 | )
|
44 | 45 | from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
45 | 46 | from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
46 |
| -from synapse.storage.engines import PostgresEngine |
| 47 | +from synapse.storage.engines import PostgresEngine, Sqlite3Engine |
47 | 48 | from synapse.types import JsonDict
|
48 | 49 | from synapse.util import json_encoder
|
49 | 50 | from synapse.util.caches.descriptors import cached
|
|
72 | 73 |
|
73 | 74 | logger = logging.getLogger(__name__)
|
74 | 75 |
|
| 76 | +BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int( |
| 77 | + datetime.timedelta(days=7).total_seconds() |
| 78 | +) |
| 79 | +BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int( |
| 80 | + datetime.timedelta(hours=1).total_seconds() |
| 81 | +) |
| 82 | + |
75 | 83 |
|
76 | 84 | # All the info we need while iterating the DAG while backfilling
|
77 | 85 | @attr.s(frozen=True, slots=True, auto_attribs=True)
|
@@ -715,96 +723,189 @@ def _get_auth_chain_difference_txn(
|
715 | 723 |
|
716 | 724 | @trace
|
717 | 725 | @tag_args
|
718 |
| - async def get_oldest_event_ids_with_depth_in_room( |
719 |
| - self, room_id: str |
| 726 | + async def get_backfill_points_in_room( |
| 727 | + self, |
| 728 | + room_id: str, |
720 | 729 | ) -> List[Tuple[str, int]]:
|
721 |
| - """Gets the oldest events(backwards extremities) in the room along with the |
722 |
| - aproximate depth. |
723 |
| -
|
724 |
| - We use this function so that we can compare and see if someones current |
725 |
| - depth at their current scrollback is within pagination range of the |
726 |
| - event extremeties. If the current depth is close to the depth of given |
727 |
| - oldest event, we can trigger a backfill. |
| 730 | + """ |
| 731 | + Gets the oldest events(backwards extremities) in the room along with the |
| 732 | + approximate depth. Sorted by depth, highest to lowest (descending). |
728 | 733 |
|
729 | 734 | Args:
|
730 | 735 | room_id: Room where we want to find the oldest events
|
731 | 736 |
|
732 | 737 | Returns:
|
733 |
| - List of (event_id, depth) tuples |
| 738 | + List of (event_id, depth) tuples. Sorted by depth, highest to lowest |
| 739 | + (descending) |
734 | 740 | """
|
735 | 741 |
|
736 |
| - def get_oldest_event_ids_with_depth_in_room_txn( |
| 742 | + def get_backfill_points_in_room_txn( |
737 | 743 | txn: LoggingTransaction, room_id: str
|
738 | 744 | ) -> List[Tuple[str, int]]:
|
739 |
| - # Assemble a dictionary with event_id -> depth for the oldest events |
| 745 | + # Assemble a tuple lookup of event_id -> depth for the oldest events |
740 | 746 | # we know of in the room. Backwards extremeties are the oldest
|
741 | 747 | # events we know of in the room but we only know of them because
|
742 |
| - # some other event referenced them by prev_event and aren't peristed |
743 |
| - # in our database yet (meaning we don't know their depth |
744 |
| - # specifically). So we need to look for the aproximate depth from |
| 748 | + # some other event referenced them by prev_event and aren't |
| 749 | + # persisted in our database yet (meaning we don't know their depth |
| 750 | + # specifically). So we need to look for the approximate depth from |
745 | 751 | # the events connected to the current backwards extremeties.
|
746 | 752 | sql = """
|
747 |
| - SELECT b.event_id, MAX(e.depth) FROM events as e |
| 753 | + SELECT backward_extrem.event_id, event.depth FROM events AS event |
748 | 754 | /**
|
749 | 755 | * Get the edge connections from the event_edges table
|
750 | 756 | * so we can see whether this event's prev_events points
|
751 | 757 | * to a backward extremity in the next join.
|
752 | 758 | */
|
753 |
| - INNER JOIN event_edges as g |
754 |
| - ON g.event_id = e.event_id |
| 759 | + INNER JOIN event_edges AS edge |
| 760 | + ON edge.event_id = event.event_id |
755 | 761 | /**
|
756 | 762 | * We find the "oldest" events in the room by looking for
|
757 | 763 | * events connected to backwards extremeties (oldest events
|
758 | 764 | * in the room that we know of so far).
|
759 | 765 | */
|
760 |
| - INNER JOIN event_backward_extremities as b |
761 |
| - ON g.prev_event_id = b.event_id |
762 |
| - WHERE b.room_id = ? AND g.is_state is ? |
763 |
| - GROUP BY b.event_id |
| 766 | + INNER JOIN event_backward_extremities AS backward_extrem |
| 767 | + ON edge.prev_event_id = backward_extrem.event_id |
| 768 | + /** |
| 769 | + * We use this info to make sure we don't retry to use a backfill point |
| 770 | + * if we've already attempted to backfill from it recently. |
| 771 | + */ |
| 772 | + LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info |
| 773 | + ON |
| 774 | + failed_backfill_attempt_info.room_id = backward_extrem.room_id |
| 775 | + AND failed_backfill_attempt_info.event_id = backward_extrem.event_id |
| 776 | + WHERE |
| 777 | + backward_extrem.room_id = ? |
| 778 | + /* We only care about non-state edges because we used to use |
| 779 | + * `event_edges` for two different sorts of "edges" (the current |
| 780 | + * event DAG, but also a link to the previous state, for state |
| 781 | + * events). These legacy state event edges can be distinguished by |
| 782 | + * `is_state` and are removed from the codebase and schema but |
| 783 | + * because the schema change is in a background update, it's not |
| 784 | + * necessarily safe to assume that it will have been completed. |
| 785 | + */ |
| 786 | + AND edge.is_state is ? /* False */ |
| 787 | + /** |
| 788 | + * Exponential back-off (up to the upper bound) so we don't retry the |
| 789 | + * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. |
| 790 | + * |
| 791 | + * We use `1 << n` as a power of 2 equivalent for compatibility |
| 792 | + * with older SQLites. The left shift equivalent only works with |
| 793 | + * powers of 2 because left shift is a binary operation (base-2). |
| 794 | + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. |
| 795 | + */ |
| 796 | + AND ( |
| 797 | + failed_backfill_attempt_info.event_id IS NULL |
| 798 | + OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */) |
| 799 | + ) |
| 800 | + /** |
| 801 | + * Sort from highest to the lowest depth. Then tie-break on |
| 802 | + * alphabetical order of the event_ids so we get a consistent |
| 803 | + * ordering which is nice when asserting things in tests. |
| 804 | + */ |
| 805 | + ORDER BY event.depth DESC, backward_extrem.event_id DESC |
764 | 806 | """
|
765 | 807 |
|
766 |
| - txn.execute(sql, (room_id, False)) |
| 808 | + if isinstance(self.database_engine, PostgresEngine): |
| 809 | + least_function = "least" |
| 810 | + elif isinstance(self.database_engine, Sqlite3Engine): |
| 811 | + least_function = "min" |
| 812 | + else: |
| 813 | + raise RuntimeError("Unknown database engine") |
| 814 | + |
| 815 | + txn.execute( |
| 816 | + sql % (least_function,), |
| 817 | + ( |
| 818 | + room_id, |
| 819 | + False, |
| 820 | + self._clock.time_msec(), |
| 821 | + 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, |
| 822 | + 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS, |
| 823 | + ), |
| 824 | + ) |
767 | 825 |
|
768 | 826 | return cast(List[Tuple[str, int]], txn.fetchall())
|
769 | 827 |
|
770 | 828 | return await self.db_pool.runInteraction(
|
771 |
| - "get_oldest_event_ids_with_depth_in_room", |
772 |
| - get_oldest_event_ids_with_depth_in_room_txn, |
| 829 | + "get_backfill_points_in_room", |
| 830 | + get_backfill_points_in_room_txn, |
773 | 831 | room_id,
|
774 | 832 | )
|
775 | 833 |
|
776 | 834 | @trace
|
777 | 835 | async def get_insertion_event_backward_extremities_in_room(
|
778 |
| - self, room_id: str |
| 836 | + self, |
| 837 | + room_id: str, |
779 | 838 | ) -> List[Tuple[str, int]]:
|
780 |
| - """Get the insertion events we know about that we haven't backfilled yet. |
781 |
| -
|
782 |
| - We use this function so that we can compare and see if someones current |
783 |
| - depth at their current scrollback is within pagination range of the |
784 |
| - insertion event. If the current depth is close to the depth of given |
785 |
| - insertion event, we can trigger a backfill. |
| 839 | + """ |
| 840 | + Get the insertion events we know about that we haven't backfilled yet |
| 841 | + along with the approximate depth. Sorted by depth, highest to lowest |
| 842 | + (descending). |
786 | 843 |
|
787 | 844 | Args:
|
788 | 845 | room_id: Room where we want to find the oldest events
|
789 | 846 |
|
790 | 847 | Returns:
|
791 |
| - List of (event_id, depth) tuples |
| 848 | + List of (event_id, depth) tuples. Sorted by depth, highest to lowest |
| 849 | + (descending) |
792 | 850 | """
|
793 | 851 |
|
794 | 852 | def get_insertion_event_backward_extremities_in_room_txn(
|
795 | 853 | txn: LoggingTransaction, room_id: str
|
796 | 854 | ) -> List[Tuple[str, int]]:
|
797 | 855 | sql = """
|
798 |
| - SELECT b.event_id, MAX(e.depth) FROM insertion_events as i |
| 856 | + SELECT |
| 857 | + insertion_event_extremity.event_id, event.depth |
799 | 858 | /* We only want insertion events that are also marked as backwards extremities */
|
800 |
| - INNER JOIN insertion_event_extremities as b USING (event_id) |
| 859 | + FROM insertion_event_extremities AS insertion_event_extremity |
801 | 860 | /* Get the depth of the insertion event from the events table */
|
802 |
| - INNER JOIN events AS e USING (event_id) |
803 |
| - WHERE b.room_id = ? |
804 |
| - GROUP BY b.event_id |
| 861 | + INNER JOIN events AS event USING (event_id) |
| 862 | + /** |
| 863 | + * We use this info to make sure we don't retry to use a backfill point |
| 864 | + * if we've already attempted to backfill from it recently. |
| 865 | + */ |
| 866 | + LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info |
| 867 | + ON |
| 868 | + failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id |
| 869 | + AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id |
| 870 | + WHERE |
| 871 | + insertion_event_extremity.room_id = ? |
| 872 | + /** |
| 873 | + * Exponential back-off (up to the upper bound) so we don't retry the |
| 874 | + * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc |
| 875 | + * |
| 876 | + * We use `1 << n` as a power of 2 equivalent for compatibility |
| 877 | + * with older SQLites. The left shift equivalent only works with |
| 878 | + * powers of 2 because left shift is a binary operation (base-2). |
| 879 | + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. |
| 880 | + */ |
| 881 | + AND ( |
| 882 | + failed_backfill_attempt_info.event_id IS NULL |
| 883 | + OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */) |
| 884 | + ) |
| 885 | + /** |
| 886 | + * Sort from highest to the lowest depth. Then tie-break on |
| 887 | + * alphabetical order of the event_ids so we get a consistent |
| 888 | + * ordering which is nice when asserting things in tests. |
| 889 | + */ |
| 890 | + ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC |
805 | 891 | """
|
806 | 892 |
|
807 |
| - txn.execute(sql, (room_id,)) |
| 893 | + if isinstance(self.database_engine, PostgresEngine): |
| 894 | + least_function = "least" |
| 895 | + elif isinstance(self.database_engine, Sqlite3Engine): |
| 896 | + least_function = "min" |
| 897 | + else: |
| 898 | + raise RuntimeError("Unknown database engine") |
| 899 | + |
| 900 | + txn.execute( |
| 901 | + sql % (least_function,), |
| 902 | + ( |
| 903 | + room_id, |
| 904 | + self._clock.time_msec(), |
| 905 | + 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, |
| 906 | + 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS, |
| 907 | + ), |
| 908 | + ) |
808 | 909 | return cast(List[Tuple[str, int]], txn.fetchall())
|
809 | 910 |
|
810 | 911 | return await self.db_pool.runInteraction(
|
@@ -1539,7 +1640,12 @@ async def get_next_staged_event_id_for_room(
|
1539 | 1640 | self,
|
1540 | 1641 | room_id: str,
|
1541 | 1642 | ) -> Optional[Tuple[str, str]]:
|
1542 |
| - """Get the next event ID in the staging area for the given room.""" |
| 1643 | + """ |
| 1644 | + Get the next event ID in the staging area for the given room. |
| 1645 | +
|
| 1646 | + Returns: |
| 1647 | + Tuple of the `origin` and `event_id` |
| 1648 | + """ |
1543 | 1649 |
|
1544 | 1650 | def _get_next_staged_event_id_for_room_txn(
|
1545 | 1651 | txn: LoggingTransaction,
|
|
0 commit comments