Skip to content

Commit 5014be0

Browse files
authored
Add an explicit HTTP input closed event (#94)
1 parent 122a117 commit 5014be0

File tree

2 files changed

+15
-11
lines changed

2 files changed

+15
-11
lines changed

python/restate/server_context.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -318,14 +318,7 @@ async def leave(self):
318318
# {'type': 'http.request', 'body': b'', 'more_body': True}
319319
# {'type': 'http.request', 'body': b'', 'more_body': False}
320320
# {'type': 'http.disconnect'}
321-
while True:
322-
event = await self.receive()
323-
if event is None:
324-
break
325-
if event.get('type') == 'http.disconnect':
326-
break
327-
if event.get('type') == 'http.request' and event.get('more_body', False) is False:
328-
break
321+
await self.receive.block_until_http_input_closed()
329322
# finally, we close our side
330323
# it is important to do it, after the other side has closed his side,
331324
# because some asgi servers (like hypercorn) will remove the stream

python/restate/server_types.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,19 @@ class ReceiveChannel:
9898

9999
def __init__(self, receive: Receive) -> None:
100100
self._queue = asyncio.Queue[Union[ASGIReceiveEvent, RestateEvent]]()
101+
self._http_input_closed = asyncio.Event()
102+
self._disconnected = asyncio.Event()
101103

102104
async def loop():
103105
"""Receive loop."""
104-
while True:
106+
while not self._disconnected.is_set():
105107
event = await receive()
108+
if event.get('type') == 'http.request' and not event.get('more_body', False):
109+
self._http_input_closed.set()
110+
elif event.get('type') == 'http.disconnect':
111+
self._http_input_closed.set()
112+
self._disconnected.set()
106113
await self._queue.put(event)
107-
if event.get('type') == 'http.disconnect':
108-
break
109114

110115
self._task = asyncio.create_task(loop())
111116

@@ -115,12 +120,18 @@ async def __call__(self) -> ASGIReceiveEvent | RestateEvent:
115120
self._queue.task_done()
116121
return what
117122

123+
async def block_until_http_input_closed(self) -> None:
124+
"""Wait until the HTTP input is closed"""
125+
await self._http_input_closed.wait()
126+
118127
async def enqueue_restate_event(self, what: RestateEvent):
119128
"""Add a message."""
120129
await self._queue.put(what)
121130

122131
async def close(self):
123132
"""Close the channel."""
133+
self._http_input_closed.set()
134+
self._disconnected.set()
124135
if self._task.done():
125136
return
126137
self._task.cancel()

0 commit comments

Comments
 (0)