14
14
# limitations under the License.
15
15
import logging
16
16
17
+ from mock import patch
18
+
19
+ from synapse .api .room_versions import RoomVersion
17
20
from synapse .rest import admin
18
21
from synapse .rest .client .v1 import login , room
22
+ from synapse .rest .client .v2_alpha import sync
19
23
20
24
from tests .replication ._base import BaseMultiWorkerStreamTestCase
21
25
from tests .utils import USE_POSTGRES_FOR_TESTS
@@ -36,13 +40,17 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
36
40
admin .register_servlets_for_client_rest_resource ,
37
41
room .register_servlets ,
38
42
login .register_servlets ,
43
+ sync .register_servlets ,
39
44
]
40
45
41
46
def prepare (self , reactor , clock , hs ):
42
47
# Register a user who sends a message that we'll get notified about
43
48
self .other_user_id = self .register_user ("otheruser" , "pass" )
44
49
self .other_access_token = self .login ("otheruser" , "pass" )
45
50
51
+ self .room_creator = self .hs .get_room_creation_handler ()
52
+ self .store = hs .get_datastore ()
53
+
46
54
def default_config (self ):
47
55
conf = super ().default_config ()
48
56
conf ["redis" ] = {"enabled" : "true" }
@@ -53,6 +61,29 @@ def default_config(self):
53
61
}
54
62
return conf
55
63
64
+ def _create_room (self , room_id : str , user_id : str , tok : str ):
65
+ """Create a room with given room_id
66
+ """
67
+
68
+ # We control the room ID generation by patching out the
69
+ # `_generate_room_id` method
70
+ async def generate_room (
71
+ creator_id : str , is_public : bool , room_version : RoomVersion
72
+ ):
73
+ await self .store .store_room (
74
+ room_id = room_id ,
75
+ room_creator_user_id = creator_id ,
76
+ is_public = is_public ,
77
+ room_version = room_version ,
78
+ )
79
+ return room_id
80
+
81
+ with patch (
82
+ "synapse.handlers.room.RoomCreationHandler._generate_room_id"
83
+ ) as mock :
84
+ mock .side_effect = generate_room
85
+ self .helper .create_room_as (user_id , tok = tok )
86
+
56
87
def test_basic (self ):
57
88
"""Simple test to ensure that multiple rooms can be created and joined,
58
89
and that different rooms get handled by different instances.
@@ -100,3 +131,189 @@ def test_basic(self):
100
131
101
132
self .assertTrue (persisted_on_1 )
102
133
self .assertTrue (persisted_on_2 )
134
+
135
+ def test_vector_clock_token (self ):
136
+ """Tests that using a stream token with a vector clock component works
137
+ correctly with basic /sync and /messages usage.
138
+ """
139
+
140
+ self .make_worker_hs (
141
+ "synapse.app.generic_worker" , {"worker_name" : "worker1" },
142
+ )
143
+
144
+ worker_hs2 = self .make_worker_hs (
145
+ "synapse.app.generic_worker" , {"worker_name" : "worker2" },
146
+ )
147
+
148
+ sync_hs = self .make_worker_hs (
149
+ "synapse.app.generic_worker" , {"worker_name" : "sync" },
150
+ )
151
+
152
+ # Specially selected room IDs that get persisted on different workers.
153
+ room_id1 = "!foo:test"
154
+ room_id2 = "!baz:test"
155
+
156
+ self .assertEqual (
157
+ self .hs .config .worker .events_shard_config .get_instance (room_id1 ), "worker1"
158
+ )
159
+ self .assertEqual (
160
+ self .hs .config .worker .events_shard_config .get_instance (room_id2 ), "worker2"
161
+ )
162
+
163
+ user_id = self .register_user ("user" , "pass" )
164
+ access_token = self .login ("user" , "pass" )
165
+
166
+ store = self .hs .get_datastore ()
167
+
168
+ # Create two room on the different workers.
169
+ self ._create_room (room_id1 , user_id , access_token )
170
+ self ._create_room (room_id2 , user_id , access_token )
171
+
172
+ # The other user joins
173
+ self .helper .join (
174
+ room = room_id1 , user = self .other_user_id , tok = self .other_access_token
175
+ )
176
+ self .helper .join (
177
+ room = room_id2 , user = self .other_user_id , tok = self .other_access_token
178
+ )
179
+
180
+ # Do an initial sync so that we're up to date.
181
+ request , channel = self .make_request ("GET" , "/sync" , access_token = access_token )
182
+ self .render_on_worker (sync_hs , request )
183
+ next_batch = channel .json_body ["next_batch" ]
184
+
185
+ # We now gut wrench into the events stream MultiWriterIdGenerator on
186
+ # worker2 to mimic it getting stuck persisting an event. This ensures
187
+ # that when we send an event on worker1 we end up in a state where
188
+ # worker2 events stream position lags that on worker1, resulting in a
189
+ # RoomStreamToken with a non-empty instance map component.
190
+ #
191
+ # Worker2's event stream position will not advance until we call
192
+ # __aexit__ again.
193
+ actx = worker_hs2 .get_datastore ()._stream_id_gen .get_next ()
194
+ self .get_success (actx .__aenter__ ())
195
+
196
+ response = self .helper .send (room_id1 , body = "Hi!" , tok = self .other_access_token )
197
+ first_event_in_room1 = response ["event_id" ]
198
+
199
+ # Assert that the current stream token has an instance map component, as
200
+ # we are trying to test vector clock tokens.
201
+ room_stream_token = store .get_room_max_token ()
202
+ self .assertNotEqual (len (room_stream_token .instance_map ), 0 )
203
+
204
+ # Check that syncing still gets the new event, despite the gap in the
205
+ # stream IDs.
206
+ request , channel = self .make_request (
207
+ "GET" , "/sync?since={}" .format (next_batch ), access_token = access_token
208
+ )
209
+ self .render_on_worker (sync_hs , request )
210
+
211
+ # We should only see the new event and nothing else
212
+ self .assertIn (room_id1 , channel .json_body ["rooms" ]["join" ])
213
+ self .assertNotIn (room_id2 , channel .json_body ["rooms" ]["join" ])
214
+
215
+ events = channel .json_body ["rooms" ]["join" ][room_id1 ]["timeline" ]["events" ]
216
+ self .assertListEqual (
217
+ [first_event_in_room1 ], [event ["event_id" ] for event in events ]
218
+ )
219
+
220
+ # Get the next batch and makes sure its a vector clock style token.
221
+ vector_clock_token = channel .json_body ["next_batch" ]
222
+ self .assertTrue (vector_clock_token .startswith ("m" ))
223
+
224
+ # Now that we've got a vector clock token we finish the fake persisting
225
+ # an event we started above.
226
+ self .get_success (actx .__aexit__ (None , None , None ))
227
+
228
+ # Now try and send an event to the other rooom so that we can test that
229
+ # the vector clock style token works as a `since` token.
230
+ response = self .helper .send (room_id2 , body = "Hi!" , tok = self .other_access_token )
231
+ first_event_in_room2 = response ["event_id" ]
232
+
233
+ request , channel = self .make_request (
234
+ "GET" ,
235
+ "/sync?since={}" .format (vector_clock_token ),
236
+ access_token = access_token ,
237
+ )
238
+ self .render_on_worker (sync_hs , request )
239
+
240
+ self .assertNotIn (room_id1 , channel .json_body ["rooms" ]["join" ])
241
+ self .assertIn (room_id2 , channel .json_body ["rooms" ]["join" ])
242
+
243
+ events = channel .json_body ["rooms" ]["join" ][room_id2 ]["timeline" ]["events" ]
244
+ self .assertListEqual (
245
+ [first_event_in_room2 ], [event ["event_id" ] for event in events ]
246
+ )
247
+
248
+ next_batch = channel .json_body ["next_batch" ]
249
+
250
+ # We also want to test that the vector clock style token works with
251
+ # pagination. We do this by sending a couple of new events into the room
252
+ # and syncing again to get a prev_batch token for each room, then
253
+ # paginating from there back to the vector clock token.
254
+ self .helper .send (room_id1 , body = "Hi again!" , tok = self .other_access_token )
255
+ self .helper .send (room_id2 , body = "Hi again!" , tok = self .other_access_token )
256
+
257
+ request , channel = self .make_request (
258
+ "GET" , "/sync?since={}" .format (next_batch ), access_token = access_token
259
+ )
260
+ self .render_on_worker (sync_hs , request )
261
+
262
+ prev_batch1 = channel .json_body ["rooms" ]["join" ][room_id1 ]["timeline" ][
263
+ "prev_batch"
264
+ ]
265
+ prev_batch2 = channel .json_body ["rooms" ]["join" ][room_id2 ]["timeline" ][
266
+ "prev_batch"
267
+ ]
268
+
269
+ # Paginating back in the first room should not produce any results, as
270
+ # no events have happened in it. This tests that we are correctly
271
+ # filtering results based on the vector clock portion.
272
+ request , channel = self .make_request (
273
+ "GET" ,
274
+ "/rooms/{}/messages?from={}&to={}&dir=b" .format (
275
+ room_id1 , prev_batch1 , vector_clock_token
276
+ ),
277
+ access_token = access_token ,
278
+ )
279
+ self .render_on_worker (sync_hs , request )
280
+ self .assertListEqual ([], channel .json_body ["chunk" ])
281
+
282
+ # Paginating back on the second room should produce the first event
283
+ # again. This tests that pagination isn't completely broken.
284
+ request , channel = self .make_request (
285
+ "GET" ,
286
+ "/rooms/{}/messages?from={}&to={}&dir=b" .format (
287
+ room_id2 , prev_batch2 , vector_clock_token
288
+ ),
289
+ access_token = access_token ,
290
+ )
291
+ self .render_on_worker (sync_hs , request )
292
+ self .assertEqual (len (channel .json_body ["chunk" ]), 1 )
293
+ self .assertEqual (
294
+ channel .json_body ["chunk" ][0 ]["event_id" ], first_event_in_room2
295
+ )
296
+
297
+ # Paginating forwards should give the same results
298
+ request , channel = self .make_request (
299
+ "GET" ,
300
+ "/rooms/{}/messages?from={}&to={}&dir=f" .format (
301
+ room_id1 , vector_clock_token , prev_batch1
302
+ ),
303
+ access_token = access_token ,
304
+ )
305
+ self .render_on_worker (sync_hs , request )
306
+ self .assertListEqual ([], channel .json_body ["chunk" ])
307
+
308
+ request , channel = self .make_request (
309
+ "GET" ,
310
+ "/rooms/{}/messages?from={}&to={}&dir=f" .format (
311
+ room_id2 , vector_clock_token , prev_batch2 ,
312
+ ),
313
+ access_token = access_token ,
314
+ )
315
+ self .render_on_worker (sync_hs , request )
316
+ self .assertEqual (len (channel .json_body ["chunk" ]), 1 )
317
+ self .assertEqual (
318
+ channel .json_body ["chunk" ][0 ]["event_id" ], first_event_in_room2
319
+ )
0 commit comments