Description
I'm attempting to upgrade a django project but running into some problems. An automated test that uses channels_redis
and websockets is failing, and it seems to indicate a real bug. I think this bug is pretty serious and could result in a lot of dropped events.
I think channels_redis
4.1.0 and other versions are not compatible with the Redis client redis-py 4.5.4. I think the problem does not occur with redis 4.5.3. (However, I encounter a different problem with 4.5.3 as well...)
This issue report is not as concise and complete as I would like. (In particular I would like to be able to say more about what I observe with earlier redis client versions, particularly redis-py 4.5.3.) But I think it's important to get this conversation started soon, so I'm submitting it now anyway.
Reproducing this issue
After days of digging in code that I'm not familiar with, I've found a convincing explanation for this change in behavior and written a failing test in a fork of the channels_redis repo. (I run this test directly on MacOS 11.7 with Python 3.10.5 or 3.11.3) Here is a copy of that test:
@pytest.mark.asyncio
async def test_groups(channel_layer):
channel_layer = RedisChannelLayer(hosts=TEST_HOSTS)
channel_name1 = await channel_layer.new_channel(prefix="specific")
channel_name2 = await channel_layer.new_channel(prefix="specific")
await channel_layer.group_add("test-group", channel_name1)
with pytest.raises(asyncio.TimeoutError):
async with async_timeout.timeout(1):
await channel_layer.receive(channel_name1)
await channel_layer.group_discard("test-group", channel_name1)
await channel_layer.group_add("test-group", channel_name2)
#await asyncio.sleep(5) # see below
await channel_layer.group_send("test-group", {"type": "message.1"})
async with async_timeout.timeout(1):
assert (await channel_layer.receive(channel_name2))["type"] == "message.1"
await channel_layer.flush()
If I run this test with redis-py 4.5.4 then it fails near the end waiting for receive()
, unless I add a 5 second sleep before group_send()
or comment out asyncio.shield()
in redis.asyncio.client.Redis.execute_command()
(just like the test in my project below).
To run this test, I do the following:
- Check out the
main
git branch (as it was April 8th,ba6dfcd633be7d505f739cc3c294853222350e48
) in the channels_redis git repo. - Paste the test into
tests/test_core.py
- (Set up a virtualenv and activate it.)
python -m pip install -e .[tests]
pytest tests/test_core.py::test_groups
How I encountered this issue
The automated test in the django project I'm working on has a name like test_when_user1_clicks_button_in_browser_then_user2_can_immediately_see_update_in_second_browser
, and basically does this:
- Create a Google Chrome browser for one test user.
- Create a second Google Chrome browser for a second test user.
- Log in, in both browsers.
- Login redirects to a page that has javascript that opens websocket connections during or immediately after page load.
- When this websocket connects, the backend calls
RedisChannelLayer.group_add()
once. When it disconnects,group_discard()
is called once (passing the same group name and channel name that were passed togroup_add()
). - In practice, either because of an implicit wait somewhere or by chance, the websocket connection consistently opens in both browsers at this point in the test.
- When this websocket connects, the backend calls
- Refresh both windows.
- In practice, the websocket connection consistently disconnects and connects in both browsers.
- (This refresh is a side effect of how the test suite is set up and isn't needed when running only one test in the test suite, for example with
python3 manage.py test my_tests.MyTests. test_when_user1_clicks_button_in_browser_then_user2_can_immediately_see_update_in_second_browser
. The test suite is setup to login during suite setup and refresh before each test to have a clean page state. But this extra refresh is part of what causes the bug to repro.)
- In one window, click a button that sends an XHR request to a django endpoint that calls
RedisChannelLayer.group_send()
. - In the other window, wait for the event to come from the websocket.
This test passes with Python 3.10.5 on Debian 11 Bullseye (inside a Docker container running on MacOS 11.7 Big Sur with Docker Desktop Engine 20.10.23) with the following dependencies:
- channels-redis 3.4.1
- aioredis 1.3.1
- hiredis 2.2.2
- django 3.2.18
- channels 3.0.4
- asgiref 3.3.4
- twisted 22.10.0
This test fails on step 7 with the following dependencies: The event never makes it to the other window via the websocket:
- channels-redis 4.1.0
- redis 4.5.4
- (No aioredis)
- (No hiredis. No particular reason hiredis was removed. Maybe I'll add it again after this issue is resolved.)
- django 4.1.8
- channels 4.0.0
- asgiref 3.6.0
- twisted 22.10.0
Also, when the test fails:
- There is no error at any level of the call stack, at least as deep as
redis.asyncio.client.Redis.execute_command()
. - I've observed
CancelledError
to be caught, logged by my own temporary code, and re-raised by my own temporary code.CancelledError
is expected when a websocket disconnects, so that's not surprising. - Doing any of the following makes the test pass:
- Adding a 5 second sleep before step 6.
- Disabling the redirect from login (in step 4), and instead loading a blank page after login.
- Commenting out
asyncio.shield()
inredis.asyncio.client.Redis.execute_command()
. - (Installing the other package versions mentioned above, instead.)
- The following do not make the test pass:
- Increasing the wait in step 7 to 20 seconds.
What changed
Here is what I think caused this change in behavior:
- Prior to 4.5.4, the redis client did not use
asyncio.shield()
around the low level call to send theBZPOPMIN
command to the Redis server. channels_redis
executesBZPOPMIN
inRedisChannelLayer._brpop_with_clean()
.- Refresher on
BZPOPMIN
: It blocks until it pops something or the specified time interval has elapsed.channels_redis
specifies 5 seconds by default. - I think that
channels_redis
assumes that it can abort aBZPOPMIN
command usingtask.cancel()
, but due to theasyncio.shield()
that is now in the redis client, I think what happens is thattask.cancel()
cancels the code that handles a successfulBZPOPMIN
, and then the Redis server continues running theBZPOPMIN
command. If after the task is cancelled the stray command returns a non-empty result, then that result gets dropped, and is never delivered.
Here is why I think that:
- There is backup logic in the code near the
BZPOPMIN
call, but it doesn't add a backup to redis until afterBZPOPMIN
returns, which means that in the case of a task that is canceled whileBZPOPMIN
is in progress, the event doesn't get backed up. - When I add
time.sleep(5)
in my tests beforegroup_send()
, the test passes, which seems to indicate that a strayBZPOPMIN
consumes the event when there is nosleep
, and times out and does not consume the event when there is asleep
. - When I replace
await asyncio.shield(...)
withawait ...
in the redis client, the test passes. - When I modify
channels_redis
to import an old version ofaioredis
instead ofredis.asyncio.client
, and addshield()
in aioredis, then the test fails.- I have not attempted to repro this recently and I'm not fully confident it's true anymore.
- As mentioned above, when all websocket clients that are created during the project test never disconnect (and thus never cancel during
BZPOPMIN
), the test passes. - When I add a lot of print statements to channels_redis, the redis client, etc. and watch that while running the tests, and watch the output of
redis-cli monitor
, then what I see seems to allow the possibility thatBZPOPMIN
andshield()
is the problem, and doesn't suggest any other explanations as far as I've been able to tell.- (I said "print statements" but technically I'm appending to a log file.)
How to fix: I don't know yet
I'm not sure yet what the best solution would be to fix this. Some ideas:
- It seems like perhaps the existing backup functionality could be extended.
- I read in a channels spec somewhere that messages should be delivered no more than once. Assuming that's relevant here, then I suppose to satisfy that requirement any fix will have to guarantee that a backup can't possibly be put back into a queue while an old event is still being processed, which might happen if an old event takes a long time to process, or if events are popped from backups after only a short time.
- Maybe the Redis client actually shouldn't use
shield()
. Or maybe it should at least not runBZPOPMIN
insideshield()
. - I noticed that there are existing channels_redis tests that are similar to mine, that are passing a different
prefix
in eachnew_channel()
call, rather than the same prefix in every call. Maybechannels_redis
should always require a unique prefix in every call tonew_channel()
. I assume this would reduce performance and increase computing resource usage.
Possible workaround
Passing a prefix
to new_channel()
almost looks like an option, but:
- for the project I'm working on, which uses
channels.generic.websocket.WebsocketConsumer
, passing a prefix tonew_channel()
doesn't seem to be configurable.new_channel()
is called without arguments inchannels.consumer.AsyncConsumer.__call__()
. So__call__()
could be redefined, in a subclass in this project, to pass a prefix tonew_channel()
. Ornew_channel()
could be redefined in a subclass of RedisChannelLayer (probably better becausenew_channel()
is a lot simpler than__call__()
). So it can be done, but there's room for improvement. - I'm not sure specifying prefix is the right thing to do for a subclass of WebsocketConsumer. I don't know what all the implications are of doing that. It seems like it would at least probably reduce performance and increase computing resource usage.
Other thoughts
I wonder what all this means for the channels_redis
receive lock. If a stray BZPOPMIN
is still running, and another task tries to get the receive lock, then I think it will get the receive lock and start a new BZPOPMIN
while the old one is still running, which seems to be something that channels_redis
is trying to avoid. But depending on what the fix is for dropped events due to stray BZPOPMIN
, maybe this won't matter. For example, maybe it will be fine if a stray BZPOPMIN
is still running for a couple seconds after a new BZPOPMIN
starts.