-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Catch-up after Federation Outage #8096
Changes from all commits
18d900e
07a415c
5bc321a
c60e259
20c896a
d232200
967d8c1
d910798
74a6f4f
c1b32ae
5de9313
759e027
6c52666
9ba56cb
af13948
558af38
44765e9
56aaa17
84dbc43
2c740a7
d77e444
33874d4
c1a2b68
16eec5c
92517e9
ef4680d
de5caf0
3e308f9
b0bdadd
843403f
ad7124d
b1fd67b
e6890c7
7cfecf3
bf51d2f
7589a03
8d9f4ba
b60ad35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Send events to homeservers that they may have missed in rooms during a period of unreachability. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
# limitations under the License. | ||
import datetime | ||
import logging | ||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple | ||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast | ||
|
||
from prometheus_client import Counter | ||
|
||
|
@@ -92,6 +92,20 @@ def __init__( | |
self._destination = destination | ||
self.transmission_loop_running = False | ||
|
||
# True whilst we are sending events that the remote homeserver missed | ||
# because it was unreachable. | ||
# New events will only be sent once this is finished, at which point | ||
# _catching_up is flipped to False. | ||
self._catching_up = True | ||
|
||
# the maximum stream order to catch up to (PDUs after this are expected | ||
# to be in the main transmission queue), inclusive | ||
self._catch_up_max_stream_order = None # type: Optional[int] | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Cache of the last successfully-transmitted stream ordering for this | ||
# destination (we are the only updater so this is safe) | ||
self._last_successful_stream_order = None # type: Optional[int] | ||
|
||
# a list of tuples of (pending pdu, order) | ||
self._pending_pdus = [] # type: List[Tuple[EventBase, int]] | ||
|
||
|
@@ -137,8 +151,15 @@ def send_pdu(self, pdu: EventBase, order: int) -> None: | |
|
||
Args: | ||
pdu: pdu to send | ||
order | ||
order: an arbitrary order for the PDU — NOT the stream ordering | ||
""" | ||
if ( | ||
self._catch_up_max_stream_order is not None | ||
and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when would this happen? I'm struggling to imagine how we could end up here with an event with a lower stream ordering than the catch-up max. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a (potentially hypothetical?) race condition. # track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
pdu.event_id,
pdu.internal_metadata.stream_ordering,
)
# X <---
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order) I'm not sure if ^ has an opportunity to race or not; but even if it doesn't now, what about if someone innocently comes along and plops an await in there (at the X)? The uncertainty made me feel like I should try and make this robust and 'just deal with it'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. N.B. needs scrutiny about whether that needs to check we actually have |
||
): | ||
# we are in catch-up mode and this PDU is already scheduled to be | ||
# part of the catch-up | ||
return | ||
self._pending_pdus.append((pdu, order)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it seems to me that we shouldn't add new events to the queue while we are catching up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you note later, I set a high-water-mark. Anything after this watermark should be handled in the normal way. (Part of this is also to reduce the use of If we are backing off for a long time, it will clear the queue and high-water-mark when it attempts a transaction on the next line. |
||
self.attempt_new_transaction() | ||
|
||
|
@@ -219,6 +240,16 @@ async def _transaction_transmission_loop(self) -> None: | |
# hence why we throw the result away. | ||
await get_retry_limiter(self._destination, self._clock, self._store) | ||
|
||
if self._catching_up: | ||
# we're catching up, so we should send old events instead | ||
# in this case, we don't send anything from the new queue | ||
# this keeps the catching-up logic simple | ||
await self._catch_up_transmission_loop() | ||
if self._catching_up: | ||
# if we aren't actually caught up yet, shouldn't carry on to | ||
# the main loop | ||
return | ||
|
||
pending_pdus = [] | ||
while True: | ||
# We have to keep 2 free slots for presence and rr_edus | ||
|
@@ -326,6 +357,15 @@ async def _transaction_transmission_loop(self) -> None: | |
|
||
self._last_device_stream_id = device_stream_id | ||
self._last_device_list_stream_id = dev_list_id | ||
|
||
if pending_pdus: | ||
final_pdu, _ = pending_pdus[-1] | ||
self._last_successful_stream_order = ( | ||
final_pdu.internal_metadata.stream_ordering | ||
) | ||
await self._store.set_destination_last_successful_stream_ordering( | ||
self._destination, self._last_successful_stream_order | ||
) | ||
else: | ||
break | ||
except NotRetryingDestination as e: | ||
|
@@ -359,7 +399,12 @@ async def _transaction_transmission_loop(self) -> None: | |
self._pending_edus_keyed = {} | ||
self._pending_presence = {} | ||
self._pending_rrs = {} | ||
|
||
self._catching_up = True | ||
# reset max catch up since we have dropped PDUs here | ||
self._catch_up_max_stream_order = None | ||
except FederationDeniedError as e: | ||
# remote server is not in our federation whitelist | ||
logger.info(e) | ||
except HttpResponseException as e: | ||
logger.warning( | ||
|
@@ -368,6 +413,10 @@ async def _transaction_transmission_loop(self) -> None: | |
e.code, | ||
e, | ||
) | ||
|
||
self._catching_up = True | ||
# reset max catch up since we have dropped PDUs here | ||
self._catch_up_max_stream_order = None | ||
except RequestSendFailed as e: | ||
logger.warning( | ||
"TX [%s] Failed to send transaction: %s", self._destination, e | ||
|
@@ -377,16 +426,122 @@ async def _transaction_transmission_loop(self) -> None: | |
logger.info( | ||
"Failed to send event %s to %s", p.event_id, self._destination | ||
) | ||
|
||
self._catching_up = True | ||
# reset max catch up since we have dropped PDUs here | ||
self._catch_up_max_stream_order = None | ||
except Exception: | ||
logger.exception("TX [%s] Failed to send transaction", self._destination) | ||
for p, _ in pending_pdus: | ||
logger.info( | ||
"Failed to send event %s to %s", p.event_id, self._destination | ||
) | ||
|
||
self._catching_up = True | ||
# reset max catch up since we have dropped PDUs here | ||
self._catch_up_max_stream_order = None | ||
finally: | ||
# We want to be *very* sure we clear this after we stop processing | ||
self.transmission_loop_running = False | ||
|
||
async def _catch_up_transmission_loop(self) -> None: | ||
if self._last_successful_stream_order is None: | ||
# first catch-up, so get from database | ||
self._last_successful_stream_order = await self._store.get_destination_last_successful_stream_ordering( | ||
self._destination | ||
) | ||
|
||
if self._last_successful_stream_order is None: | ||
# if it's still None, then this means we don't have the information | ||
# in our database (oh, the perils of being a new feature). | ||
# So we can't actually do anything here, and in this case, we don't | ||
# know what to catch up, sadly. | ||
# Trying to catch up right now is futile, so let's stop. | ||
richvdh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._catching_up = False | ||
return | ||
|
||
if self._catch_up_max_stream_order is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: why do we need a high-water-mark at all? why not just keep going until There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (doing so without care will introduce races though...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we could do that if you are keen — as you say though, needs more thought. The advantage of this approach is that it's easy to keep the logic in my head. Was keen not to give a chance for any nasty bugs to crawl in, because it'd be nice to have confidence in this. |
||
# this is our first catch-up so we need to determine how much we | ||
# want to catch-up. | ||
if self._pending_pdus: | ||
# we have PDUs already in the main queue so no need to ask the | ||
# database | ||
first_non_catch_up_pdu, _ = self._pending_pdus[0] | ||
# -1 because we wish to exclude that one — we don't need to catch | ||
# it up as it's in our main queue | ||
self._catch_up_max_stream_order = ( | ||
first_non_catch_up_pdu.internal_metadata.stream_ordering - 1 | ||
) | ||
else: | ||
# we don't have any PDUs in the main queue so instead find out | ||
# the largest stream order that we know of that has, once upon a | ||
# time, been queued for this destination (i.e. this is what we | ||
# *should* have sent if the remote server was reachable). | ||
self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order( | ||
self._destination | ||
) | ||
if self._catch_up_max_stream_order is None: | ||
# not enough info to catch up | ||
self._catching_up = False | ||
return | ||
|
||
# get at most 50 catchup room/PDUs | ||
while self._last_successful_stream_order < self._catch_up_max_stream_order: | ||
event_ids = await self._store.get_catch_up_room_event_ids( | ||
self._destination, | ||
self._last_successful_stream_order, | ||
self._catch_up_max_stream_order, | ||
) | ||
|
||
if not event_ids: | ||
# I don't believe this *should* happen unless someone has been | ||
# tinkering with the database, but I also have limited foresight, | ||
# so let's handle this properly | ||
logger.warning( | ||
"Unexpectedly, no event IDs were found for catch-up: " | ||
"last successful = %d, max catch up = %d", | ||
self._last_successful_stream_order, | ||
self._catch_up_max_stream_order, | ||
) | ||
self._catching_up = False | ||
break | ||
|
||
# fetch the relevant events from the event store | ||
# - redacted behaviour of REDACT is fine, since we only send metadata | ||
# of redacted events to the destination. | ||
# - don't need to worry about rejected events as we do not actively | ||
# forward received events over federation. | ||
events = await self._store.get_events_as_list(event_ids) | ||
if not events: | ||
raise AssertionError( | ||
"No events retrieved when we asked for %r. " | ||
"This should not happen." % event_ids | ||
) | ||
|
||
# zip them together with their stream orderings | ||
catch_up_pdus = [ | ||
(event, event.internal_metadata.stream_ordering) for event in events | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as noted elsewhere: I think the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll put it on my list, then :) |
||
] | ||
|
||
success = await self._transaction_manager.send_new_transaction( | ||
self._destination, catch_up_pdus, [] | ||
) | ||
|
||
if not success: | ||
return | ||
|
||
sent_transactions_counter.inc() | ||
final_pdu, _ = catch_up_pdus[-1] | ||
self._last_successful_stream_order = cast( | ||
int, final_pdu.internal_metadata.stream_ordering | ||
) | ||
await self._store.set_destination_last_successful_stream_ordering( | ||
self._destination, self._last_successful_stream_order | ||
) | ||
|
||
# once we have reached this point, catch-up is done! | ||
self._catching_up = False | ||
|
||
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: | ||
if not self._pending_rrs: | ||
return | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1092,7 +1092,7 @@ async def simple_select_one_onecol( | |
self, | ||
table: str, | ||
keyvalues: Dict[str, Any], | ||
retcol: Iterable[str], | ||
retcol: str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please can you take these fixes to a separate PR? |
||
allow_none: bool = False, | ||
desc: str = "simple_select_one_onecol", | ||
) -> Optional[Any]: | ||
|
@@ -1122,7 +1122,7 @@ def simple_select_one_onecol_txn( | |
txn: LoggingTransaction, | ||
table: str, | ||
keyvalues: Dict[str, Any], | ||
retcol: Iterable[str], | ||
retcol: str, | ||
allow_none: bool = False, | ||
) -> Optional[Any]: | ||
ret = cls.simple_select_onecol_txn( | ||
|
@@ -1139,10 +1139,7 @@ def simple_select_one_onecol_txn( | |
|
||
@staticmethod | ||
def simple_select_onecol_txn( | ||
txn: LoggingTransaction, | ||
table: str, | ||
keyvalues: Dict[str, Any], | ||
retcol: Iterable[str], | ||
txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any], retcol: str, | ||
) -> List[Any]: | ||
sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* Copyright 2020 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
-- This schema delta alters the schema to enable 'catching up' remote homeservers | ||
-- after there has been a connectivity problem for any reason. | ||
|
||
-- This stores, for each (destination, room) pair and stream_ordering of the | ||
-- latest event for that destination. | ||
CREATE TABLE IF NOT EXISTS destination_rooms ( | ||
-- the destination in question. | ||
-- Can not be a foreign key because rows in the `destinations` table will | ||
-- only be created when we back off or when we successfully send a | ||
-- transaction. | ||
destination TEXT NOT NULL, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd kinda like to see the logic shuffled so that this can be a foreign key; however at the very least it needs a comment saying why it can't be one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, would it suffice to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably. It would be nice to avoid doing so on every PDU (for example by doing it when we first create a |
||
-- the ID of the room in question | ||
room_id TEXT NOT NULL, | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
-- the stream_ordering of the event | ||
stream_ordering INTEGER NOT NULL, | ||
PRIMARY KEY (destination, room_id), | ||
FOREIGN KEY (room_id) REFERENCES rooms (room_id) | ||
ON DELETE CASCADE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
FOREIGN KEY (stream_ordering) REFERENCES events (stream_ordering) | ||
ON DELETE CASCADE | ||
); | ||
|
||
-- this column tracks the stream_ordering of the event that was most recently | ||
-- successfully transmitted to the destination. | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
-- A value of NULL means that we have not sent an event successfully yet | ||
-- (at least, not since the introduction of this column). | ||
ALTER TABLE destinations | ||
ADD COLUMN last_successful_stream_ordering INTEGER; |
Uh oh!
There was an error while loading. Please reload this page.