Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1e19ce0

Browse files
authored
Add 'failure_ts' column to 'destinations' table (#6016)
Track the time that a server started failing at, for general analysis purposes.
1 parent 850dcfd commit 1e19ce0

File tree

7 files changed

+195
-12
lines changed

7 files changed

+195
-12
lines changed

changelog.d/6016.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a 'failure_ts' column to the 'destinations' database table.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/* Copyright 2019 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
/*
17+
* Record the timestamp when a given server started failing
18+
*/
19+
ALTER TABLE destinations ADD failure_ts BIGINT;
20+
21+
/* as a rough approximation, we assume that the server started failing at
22+
* retry_interval before the last retry
23+
*/
24+
UPDATE destinations SET failure_ts = retry_last_ts - retry_interval
25+
WHERE retry_last_ts > 0;

synapse/storage/transactions.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def _get_destination_retry_timings(self, txn, destination):
165165
txn,
166166
table="destinations",
167167
keyvalues={"destination": destination},
168-
retcols=("destination", "retry_last_ts", "retry_interval"),
168+
retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
169169
allow_none=True,
170170
)
171171

@@ -174,12 +174,15 @@ def _get_destination_retry_timings(self, txn, destination):
174174
else:
175175
return None
176176

177-
def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
177+
def set_destination_retry_timings(
178+
self, destination, failure_ts, retry_last_ts, retry_interval
179+
):
178180
"""Sets the current retry timings for a given destination.
179181
Both timings should be zero if retrying is no longer occuring.
180182
181183
Args:
182184
destination (str)
185+
failure_ts (int|None) - when the server started failing (ms since epoch)
183186
retry_last_ts (int) - time of last retry attempt in unix epoch ms
184187
retry_interval (int) - how long until next retry in ms
185188
"""
@@ -189,30 +192,34 @@ def set_destination_retry_timings(self, destination, retry_last_ts, retry_interv
189192
"set_destination_retry_timings",
190193
self._set_destination_retry_timings,
191194
destination,
195+
failure_ts,
192196
retry_last_ts,
193197
retry_interval,
194198
)
195199

196200
def _set_destination_retry_timings(
197-
self, txn, destination, retry_last_ts, retry_interval
201+
self, txn, destination, failure_ts, retry_last_ts, retry_interval
198202
):
199203

200204
if self.database_engine.can_native_upsert:
201205
# Upsert retry time interval if retry_interval is zero (i.e. we're
202206
# resetting it) or greater than the existing retry interval.
203207

204208
sql = """
205-
INSERT INTO destinations (destination, retry_last_ts, retry_interval)
206-
VALUES (?, ?, ?)
209+
INSERT INTO destinations (
210+
destination, failure_ts, retry_last_ts, retry_interval
211+
)
212+
VALUES (?, ?, ?, ?)
207213
ON CONFLICT (destination) DO UPDATE SET
214+
failure_ts = EXCLUDED.failure_ts,
208215
retry_last_ts = EXCLUDED.retry_last_ts,
209216
retry_interval = EXCLUDED.retry_interval
210217
WHERE
211218
EXCLUDED.retry_interval = 0
212219
OR destinations.retry_interval < EXCLUDED.retry_interval
213220
"""
214221

215-
txn.execute(sql, (destination, retry_last_ts, retry_interval))
222+
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
216223

217224
return
218225

@@ -225,7 +232,7 @@ def _set_destination_retry_timings(
225232
txn,
226233
table="destinations",
227234
keyvalues={"destination": destination},
228-
retcols=("retry_last_ts", "retry_interval"),
235+
retcols=("failure_ts", "retry_last_ts", "retry_interval"),
229236
allow_none=True,
230237
)
231238

@@ -235,6 +242,7 @@ def _set_destination_retry_timings(
235242
table="destinations",
236243
values={
237244
"destination": destination,
245+
"failure_ts": failure_ts,
238246
"retry_last_ts": retry_last_ts,
239247
"retry_interval": retry_interval,
240248
},
@@ -245,6 +253,7 @@ def _set_destination_retry_timings(
245253
"destinations",
246254
keyvalues={"destination": destination},
247255
updatevalues={
256+
"failure_ts": failure_ts,
248257
"retry_last_ts": retry_last_ts,
249258
"retry_interval": retry_interval,
250259
},

synapse/util/retryutils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,13 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
8080
# We aren't ready to retry that destination.
8181
raise
8282
"""
83+
failure_ts = None
8384
retry_last_ts, retry_interval = (0, 0)
8485

8586
retry_timings = yield store.get_destination_retry_timings(destination)
8687

8788
if retry_timings:
89+
failure_ts = retry_timings["failure_ts"]
8890
retry_last_ts, retry_interval = (
8991
retry_timings["retry_last_ts"],
9092
retry_timings["retry_interval"],
@@ -108,6 +110,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
108110
destination,
109111
clock,
110112
store,
113+
failure_ts,
111114
retry_interval,
112115
backoff_on_failure=backoff_on_failure,
113116
**kwargs
@@ -120,6 +123,7 @@ def __init__(
120123
destination,
121124
clock,
122125
store,
126+
failure_ts,
123127
retry_interval,
124128
backoff_on_404=False,
125129
backoff_on_failure=True,
@@ -133,6 +137,8 @@ def __init__(
133137
destination (str)
134138
clock (Clock)
135139
store (DataStore)
140+
failure_ts (int|None): when this destination started failing (in ms since
141+
the epoch), or zero if the last request was successful
136142
retry_interval (int): The next retry interval taken from the
137143
database in milliseconds, or zero if the last request was
138144
successful.
@@ -145,6 +151,7 @@ def __init__(
145151
self.store = store
146152
self.destination = destination
147153

154+
self.failure_ts = failure_ts
148155
self.retry_interval = retry_interval
149156
self.backoff_on_404 = backoff_on_404
150157
self.backoff_on_failure = backoff_on_failure
@@ -186,6 +193,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
186193
logger.debug(
187194
"Connection to %s was successful; clearing backoff", self.destination
188195
)
196+
self.failure_ts = None
189197
retry_last_ts = 0
190198
self.retry_interval = 0
191199
elif not self.backoff_on_failure:
@@ -211,11 +219,17 @@ def __exit__(self, exc_type, exc_val, exc_tb):
211219
)
212220
retry_last_ts = int(self.clock.time_msec())
213221

222+
if self.failure_ts is None:
223+
self.failure_ts = retry_last_ts
224+
214225
@defer.inlineCallbacks
215226
def store_retry_timings():
216227
try:
217228
yield self.store.set_destination_retry_timings(
218-
self.destination, retry_last_ts, self.retry_interval
229+
self.destination,
230+
self.failure_ts,
231+
retry_last_ts,
232+
self.retry_interval,
219233
)
220234
except Exception:
221235
logger.exception("Failed to store destination_retry_timings")

tests/handlers/test_typing.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ def prepare(self, reactor, clock, hs):
9999
self.event_source = hs.get_event_sources().sources["typing"]
100100

101101
self.datastore = hs.get_datastore()
102-
retry_timings_res = {"destination": "", "retry_last_ts": 0, "retry_interval": 0}
102+
retry_timings_res = {
103+
"destination": "",
104+
"retry_last_ts": 0,
105+
"retry_interval": 0,
106+
"failure_ts": None,
107+
}
103108
self.datastore.get_destination_retry_timings.return_value = defer.succeed(
104109
retry_timings_res
105110
)

tests/storage/test_transactions.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@ def test_get_set_transactions(self):
2929
r = self.get_success(d)
3030
self.assertIsNone(r)
3131

32-
d = self.store.set_destination_retry_timings("example.com", 50, 100)
32+
d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100)
3333
self.get_success(d)
3434

3535
d = self.store.get_destination_retry_timings("example.com")
3636
r = self.get_success(d)
3737

38-
self.assert_dict({"retry_last_ts": 50, "retry_interval": 100}, r)
38+
self.assert_dict(
39+
{"retry_last_ts": 50, "retry_interval": 100, "failure_ts": 1000}, r
40+
)
3941

4042
def test_initial_set_transactions(self):
4143
"""Tests that we can successfully set the destination retries (there
4244
was a bug around invalidating the cache that broke this)
4345
"""
44-
d = self.store.set_destination_retry_timings("example.com", 50, 100)
46+
d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100)
4547
self.get_success(d)

tests/util/test_retryutils.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2019 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from synapse.util.retryutils import (
16+
MIN_RETRY_INTERVAL,
17+
RETRY_MULTIPLIER,
18+
NotRetryingDestination,
19+
get_retry_limiter,
20+
)
21+
22+
from tests.unittest import HomeserverTestCase
23+
24+
25+
class RetryLimiterTestCase(HomeserverTestCase):
26+
def test_new_destination(self):
27+
"""A happy-path case with a new destination and a successful operation"""
28+
store = self.hs.get_datastore()
29+
d = get_retry_limiter("test_dest", self.clock, store)
30+
self.pump()
31+
limiter = self.successResultOf(d)
32+
33+
# advance the clock a bit before making the request
34+
self.pump(1)
35+
36+
with limiter:
37+
pass
38+
39+
d = store.get_destination_retry_timings("test_dest")
40+
self.pump()
41+
new_timings = self.successResultOf(d)
42+
self.assertIsNone(new_timings)
43+
44+
def test_limiter(self):
45+
"""General test case which walks through the process of a failing request"""
46+
store = self.hs.get_datastore()
47+
48+
d = get_retry_limiter("test_dest", self.clock, store)
49+
self.pump()
50+
limiter = self.successResultOf(d)
51+
52+
self.pump(1)
53+
try:
54+
with limiter:
55+
self.pump(1)
56+
failure_ts = self.clock.time_msec()
57+
raise AssertionError("argh")
58+
except AssertionError:
59+
pass
60+
61+
# wait for the update to land
62+
self.pump()
63+
64+
d = store.get_destination_retry_timings("test_dest")
65+
self.pump()
66+
new_timings = self.successResultOf(d)
67+
self.assertEqual(new_timings["failure_ts"], failure_ts)
68+
self.assertEqual(new_timings["retry_last_ts"], failure_ts)
69+
self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL)
70+
71+
# now if we try again we should get a failure
72+
d = get_retry_limiter("test_dest", self.clock, store)
73+
self.pump()
74+
self.failureResultOf(d, NotRetryingDestination)
75+
76+
#
77+
# advance the clock and try again
78+
#
79+
80+
self.pump(MIN_RETRY_INTERVAL)
81+
d = get_retry_limiter("test_dest", self.clock, store)
82+
self.pump()
83+
limiter = self.successResultOf(d)
84+
85+
self.pump(1)
86+
try:
87+
with limiter:
88+
self.pump(1)
89+
retry_ts = self.clock.time_msec()
90+
raise AssertionError("argh")
91+
except AssertionError:
92+
pass
93+
94+
# wait for the update to land
95+
self.pump()
96+
97+
d = store.get_destination_retry_timings("test_dest")
98+
self.pump()
99+
new_timings = self.successResultOf(d)
100+
self.assertEqual(new_timings["failure_ts"], failure_ts)
101+
self.assertEqual(new_timings["retry_last_ts"], retry_ts)
102+
self.assertGreaterEqual(
103+
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5
104+
)
105+
self.assertLessEqual(
106+
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0
107+
)
108+
109+
#
110+
# one more go, with success
111+
#
112+
self.pump(MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0)
113+
d = get_retry_limiter("test_dest", self.clock, store)
114+
self.pump()
115+
limiter = self.successResultOf(d)
116+
117+
self.pump(1)
118+
with limiter:
119+
self.pump(1)
120+
121+
# wait for the update to land
122+
self.pump()
123+
124+
d = store.get_destination_retry_timings("test_dest")
125+
self.pump()
126+
new_timings = self.successResultOf(d)
127+
self.assertIsNone(new_timings)

0 commit comments

Comments
 (0)