Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c5aaa80

Browse files
committed
Merge commit '7f837959e' into anoa/dinsic_release_1_21_x
* commit '7f837959e': Convert directory, e2e_room_keys, end_to_end_keys, monthly_active_users database to async (#8042) Convert additional database stores to async/await (#8045)
2 parents d84510c + 7f83795 commit c5aaa80

16 files changed

+248
-272
lines changed

changelog.d/8042.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert various parts of the codebase to async/await.

changelog.d/8045.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert various parts of the codebase to async/await.

synapse/storage/databases/main/client_ips.py

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
# limitations under the License.
1515

1616
import logging
17-
18-
from twisted.internet import defer
17+
from typing import Dict, Optional, Tuple
1918

2019
from synapse.metrics.background_process_metrics import wrap_as_background_process
2120
from synapse.storage._base import SQLBaseStore
@@ -82,21 +81,19 @@ def __init__(self, database: DatabasePool, db_conn, hs):
8281
"devices_last_seen", self._devices_last_seen_update
8382
)
8483

85-
@defer.inlineCallbacks
86-
def _remove_user_ip_nonunique(self, progress, batch_size):
84+
async def _remove_user_ip_nonunique(self, progress, batch_size):
8785
def f(conn):
8886
txn = conn.cursor()
8987
txn.execute("DROP INDEX IF EXISTS user_ips_user_ip")
9088
txn.close()
9189

92-
yield self.db_pool.runWithConnection(f)
93-
yield self.db_pool.updates._end_background_update(
90+
await self.db_pool.runWithConnection(f)
91+
await self.db_pool.updates._end_background_update(
9492
"user_ips_drop_nonunique_index"
9593
)
9694
return 1
9795

98-
@defer.inlineCallbacks
99-
def _analyze_user_ip(self, progress, batch_size):
96+
async def _analyze_user_ip(self, progress, batch_size):
10097
# Background update to analyze user_ips table before we run the
10198
# deduplication background update. The table may not have been analyzed
10299
# for ages due to the table locks.
@@ -106,14 +103,13 @@ def _analyze_user_ip(self, progress, batch_size):
106103
def user_ips_analyze(txn):
107104
txn.execute("ANALYZE user_ips")
108105

109-
yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
106+
await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
110107

111-
yield self.db_pool.updates._end_background_update("user_ips_analyze")
108+
await self.db_pool.updates._end_background_update("user_ips_analyze")
112109

113110
return 1
114111

115-
@defer.inlineCallbacks
116-
def _remove_user_ip_dupes(self, progress, batch_size):
112+
async def _remove_user_ip_dupes(self, progress, batch_size):
117113
# This works function works by scanning the user_ips table in batches
118114
# based on `last_seen`. For each row in a batch it searches the rest of
119115
# the table to see if there are any duplicates, if there are then they
@@ -140,7 +136,7 @@ def get_last_seen(txn):
140136
return None
141137

142138
# Get a last seen that has roughly `batch_size` since `begin_last_seen`
143-
end_last_seen = yield self.db_pool.runInteraction(
139+
end_last_seen = await self.db_pool.runInteraction(
144140
"user_ips_dups_get_last_seen", get_last_seen
145141
)
146142

@@ -275,15 +271,14 @@ def remove(txn):
275271
txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
276272
)
277273

278-
yield self.db_pool.runInteraction("user_ips_dups_remove", remove)
274+
await self.db_pool.runInteraction("user_ips_dups_remove", remove)
279275

280276
if last:
281-
yield self.db_pool.updates._end_background_update("user_ips_remove_dupes")
277+
await self.db_pool.updates._end_background_update("user_ips_remove_dupes")
282278

283279
return batch_size
284280

285-
@defer.inlineCallbacks
286-
def _devices_last_seen_update(self, progress, batch_size):
281+
async def _devices_last_seen_update(self, progress, batch_size):
287282
"""Background update to insert last seen info into devices table
288283
"""
289284

@@ -346,12 +341,12 @@ def _devices_last_seen_update_txn(txn):
346341

347342
return len(rows)
348343

349-
updated = yield self.db_pool.runInteraction(
344+
updated = await self.db_pool.runInteraction(
350345
"_devices_last_seen_update", _devices_last_seen_update_txn
351346
)
352347

353348
if not updated:
354-
yield self.db_pool.updates._end_background_update("devices_last_seen")
349+
await self.db_pool.updates._end_background_update("devices_last_seen")
355350

356351
return updated
357352

@@ -460,25 +455,25 @@ def _update_client_ips_batch_txn(self, txn, to_update):
460455
# Failed to upsert, log and continue
461456
logger.error("Failed to insert client IP %r: %r", entry, e)
462457

463-
@defer.inlineCallbacks
464-
def get_last_client_ip_by_device(self, user_id, device_id):
458+
async def get_last_client_ip_by_device(
459+
self, user_id: str, device_id: Optional[str]
460+
) -> Dict[Tuple[str, str], dict]:
465461
"""For each device_id listed, give the user_ip it was last seen on
466462
467463
Args:
468-
user_id (str)
469-
device_id (str): If None fetches all devices for the user
464+
user_id: The user to fetch devices for.
465+
device_id: If None fetches all devices for the user
470466
471467
Returns:
472-
defer.Deferred: resolves to a dict, where the keys
473-
are (user_id, device_id) tuples. The values are also dicts, with
474-
keys giving the column names
468+
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
469+
keys giving the column names from the devices table.
475470
"""
476471

477472
keyvalues = {"user_id": user_id}
478473
if device_id is not None:
479474
keyvalues["device_id"] = device_id
480475

481-
res = yield self.db_pool.simple_select_list(
476+
res = await self.db_pool.simple_select_list(
482477
table="devices",
483478
keyvalues=keyvalues,
484479
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
@@ -500,8 +495,7 @@ def get_last_client_ip_by_device(self, user_id, device_id):
500495
}
501496
return ret
502497

503-
@defer.inlineCallbacks
504-
def get_user_ip_and_agents(self, user):
498+
async def get_user_ip_and_agents(self, user):
505499
user_id = user.to_string()
506500
results = {}
507501

@@ -511,7 +505,7 @@ def get_user_ip_and_agents(self, user):
511505
user_agent, _, last_seen = self._batch_row_update[key]
512506
results[(access_token, ip)] = (user_agent, last_seen)
513507

514-
rows = yield self.db_pool.simple_select_list(
508+
rows = await self.db_pool.simple_select_list(
515509
table="user_ips",
516510
keyvalues={"user_id": user_id},
517511
retcols=["access_token", "ip", "user_agent", "last_seen"],

synapse/storage/databases/main/devices.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit):
136136
master_key_by_user = {}
137137
self_signing_key_by_user = {}
138138
for user in users:
139-
cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master")
139+
cross_signing_key = yield defer.ensureDeferred(
140+
self.get_e2e_cross_signing_key(user, "master")
141+
)
140142
if cross_signing_key:
141143
key_id, verify_key = get_verify_key_from_cross_signing_key(
142144
cross_signing_key
@@ -149,8 +151,8 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit):
149151
"device_id": verify_key.version,
150152
}
151153

152-
cross_signing_key = yield self.get_e2e_cross_signing_key(
153-
user, "self_signing"
154+
cross_signing_key = yield defer.ensureDeferred(
155+
self.get_e2e_cross_signing_key(user, "self_signing")
154156
)
155157
if cross_signing_key:
156158
key_id, verify_key = get_verify_key_from_cross_signing_key(
@@ -246,7 +248,7 @@ def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_m
246248
destination (str): The host the device updates are intended for
247249
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
248250
query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
249-
user_id/device_id to update stream_id and the relevent json-encoded
251+
user_id/device_id to update stream_id and the relevant json-encoded
250252
opentracing context
251253
252254
Returns:
@@ -599,7 +601,7 @@ async def get_all_device_list_changes_for_remotes(
599601
between the requested tokens due to the limit.
600602
601603
The token returned can be used in a subsequent call to this
602-
function to get further updatees.
604+
function to get further updates.
603605
604606
The updates are a list of 2-tuples of stream ID and the row data
605607
"""

synapse/storage/databases/main/directory.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,29 @@
1414
# limitations under the License.
1515

1616
from collections import namedtuple
17-
from typing import Optional
18-
19-
from twisted.internet import defer
17+
from typing import Iterable, Optional
2018

2119
from synapse.api.errors import SynapseError
2220
from synapse.storage._base import SQLBaseStore
21+
from synapse.types import RoomAlias
2322
from synapse.util.caches.descriptors import cached
2423

2524
RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers"))
2625

2726

2827
class DirectoryWorkerStore(SQLBaseStore):
29-
@defer.inlineCallbacks
30-
def get_association_from_room_alias(self, room_alias):
31-
""" Get's the room_id and server list for a given room_alias
28+
async def get_association_from_room_alias(
29+
self, room_alias: RoomAlias
30+
) -> Optional[RoomAliasMapping]:
31+
"""Gets the room_id and server list for a given room_alias
3232
3333
Args:
34-
room_alias (RoomAlias)
34+
room_alias: The alias to translate to an ID.
3535
3636
Returns:
37-
Deferred: results in namedtuple with keys "room_id" and
38-
"servers" or None if no association can be found
37+
The room alias mapping or None if no association can be found.
3938
"""
40-
room_id = yield self.db_pool.simple_select_one_onecol(
39+
room_id = await self.db_pool.simple_select_one_onecol(
4140
"room_aliases",
4241
{"room_alias": room_alias.to_string()},
4342
"room_id",
@@ -48,7 +47,7 @@ def get_association_from_room_alias(self, room_alias):
4847
if not room_id:
4948
return None
5049

51-
servers = yield self.db_pool.simple_select_onecol(
50+
servers = await self.db_pool.simple_select_onecol(
5251
"room_alias_servers",
5352
{"room_alias": room_alias.to_string()},
5453
"server",
@@ -79,18 +78,20 @@ def get_aliases_for_room(self, room_id):
7978

8079

8180
class DirectoryStore(DirectoryWorkerStore):
82-
@defer.inlineCallbacks
83-
def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
81+
async def create_room_alias_association(
82+
self,
83+
room_alias: RoomAlias,
84+
room_id: str,
85+
servers: Iterable[str],
86+
creator: Optional[str] = None,
87+
) -> None:
8488
""" Creates an association between a room alias and room_id/servers
8589
8690
Args:
87-
room_alias (RoomAlias)
88-
room_id (str)
89-
servers (list)
90-
creator (str): Optional user_id of creator.
91-
92-
Returns:
93-
Deferred
91+
room_alias: The alias to create.
92+
room_id: The target of the alias.
93+
servers: A list of servers through which it may be possible to join the room
94+
creator: Optional user_id of creator.
9495
"""
9596

9697
def alias_txn(txn):
@@ -118,24 +119,22 @@ def alias_txn(txn):
118119
)
119120

120121
try:
121-
ret = yield self.db_pool.runInteraction(
122+
await self.db_pool.runInteraction(
122123
"create_room_alias_association", alias_txn
123124
)
124125
except self.database_engine.module.IntegrityError:
125126
raise SynapseError(
126127
409, "Room alias %s already exists" % room_alias.to_string()
127128
)
128-
return ret
129129

130-
@defer.inlineCallbacks
131-
def delete_room_alias(self, room_alias):
132-
room_id = yield self.db_pool.runInteraction(
130+
async def delete_room_alias(self, room_alias: RoomAlias) -> str:
131+
room_id = await self.db_pool.runInteraction(
133132
"delete_room_alias", self._delete_room_alias_txn, room_alias
134133
)
135134

136135
return room_id
137136

138-
def _delete_room_alias_txn(self, txn, room_alias):
137+
def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str:
139138
txn.execute(
140139
"SELECT room_id FROM room_aliases WHERE room_alias = ?",
141140
(room_alias.to_string(),),

0 commit comments

Comments
 (0)