Skip to content

Commit f7e3768

Browse files
Add unit tests for task run recorder redelivery behavior
Co-Authored-By: Nate Nowack <[email protected]>
1 parent 3ccf835 commit f7e3768

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

tests/server/services/test_task_run_recorder.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,3 +1173,65 @@ async def test_subsequent_updates_move_update_timestamp(session: AsyncSession):
11731173
assert second_update_timestamp is not None
11741174

11751175
assert second_update_timestamp > first_update_timestamp
1176+
1177+
1178+
async def test_event_retried_on_persist_failure(
1179+
pending_event: ReceivedEvent,
1180+
monkeypatch: pytest.MonkeyPatch,
1181+
caplog: pytest.LogCaptureFixture,
1182+
):
1183+
"""Test that events are retried when handle_task_run_events fails."""
1184+
call_count = 0
1185+
1186+
async def mock_handle_task_run_events(events, depth=0):
1187+
nonlocal call_count
1188+
call_count += 1
1189+
if call_count == 1:
1190+
raise Exception("Simulated DB failure")
1191+
1192+
monkeypatch.setattr(
1193+
"prefect.server.services.task_run_recorder.handle_task_run_events",
1194+
mock_handle_task_run_events,
1195+
)
1196+
1197+
async with task_run_recorder.consumer(
1198+
write_batch_size=1, flush_every=1, max_persist_retries=2
1199+
) as handler:
1200+
with caplog.at_level("ERROR"):
1201+
await handler(message(pending_event))
1202+
await asyncio.sleep(1.5)
1203+
1204+
assert call_count == 2
1205+
assert "1 to retry" in caplog.text
1206+
assert "0 dropped" in caplog.text
1207+
1208+
1209+
async def test_event_dropped_after_max_retries_exceeded(
1210+
pending_event: ReceivedEvent,
1211+
monkeypatch: pytest.MonkeyPatch,
1212+
caplog: pytest.LogCaptureFixture,
1213+
):
1214+
"""Test that events are dropped after exceeding max_persist_retries."""
1215+
call_count = 0
1216+
1217+
async def mock_handle_task_run_events(events, depth=0):
1218+
nonlocal call_count
1219+
call_count += 1
1220+
raise Exception("Simulated persistent DB failure")
1221+
1222+
monkeypatch.setattr(
1223+
"prefect.server.services.task_run_recorder.handle_task_run_events",
1224+
mock_handle_task_run_events,
1225+
)
1226+
1227+
async with task_run_recorder.consumer(
1228+
write_batch_size=1, flush_every=1, max_persist_retries=1
1229+
) as handler:
1230+
with caplog.at_level("ERROR"):
1231+
await handler(message(pending_event))
1232+
await asyncio.sleep(1.5)
1233+
1234+
assert call_count == 2
1235+
assert "Dropping event" in caplog.text
1236+
assert "after 2 failed attempts" in caplog.text
1237+
assert "1 dropped" in caplog.text

0 commit comments

Comments
 (0)