Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 94 additions & 40 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,14 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)

self._reschedule_on_resume = False
self._loop.call_soon(self._loop_reading)
self._paused = False
self._pending_data = None
self._pending_nbytes = None
self._current_buffer = None

def _protocol_use_buffer(self):
return (self._loop_reading_cb == self._loop_reading__get_buffer)

def set_protocol(self, protocol):
if isinstance(protocol, protocols.BufferedProtocol):
Expand All @@ -188,17 +193,10 @@ def pause_reading(self):
return
self._paused = True

if self._read_fut is not None and not self._read_fut.done():
# TODO: This is an ugly hack to cancel the current read future
# *and* avoid potential race conditions, as read cancellation
# goes through `future.cancel()` and `loop.call_soon()`.
# We then use this special attribute in the reader callback to
# exit *immediately* without doing any cleanup/rescheduling.
self._read_fut.__asyncio_cancelled_on_pause__ = True

self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True
# bpo-33694: Cancelling an overlapped WSARecv() might loose data
# with the current proactor implementation: don't cancel it to prevent
# data loss. Instead, put received data into _pending_data and stop
# rescheduling new overlapped WSARecv().

if self._loop.get_debug():
logger.debug("%r pauses reading", self)
Expand All @@ -207,9 +205,24 @@ def resume_reading(self):
if self._closing or not self._paused:
return
self._paused = False
if self._reschedule_on_resume:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False

data = self._pending_data
self._pending_data = None
if data is not None:
# don't call it right now since protocol.data_received() can
# call pause_reading()
self._loop.call_soon(self._data_received, data)

nbytes = self._pending_nbytes
self._pending_nbytes = None
if nbytes is not None:
# don't call it right now since protocol.buffer_updated() can
# call pause_reading()
self._loop.call_soon(self._nbytes_received, nbytes)

if self._read_fut is None:
self._loop_reading(None)

if self._loop.get_debug():
logger.debug("%r resumes reading", self)

Expand All @@ -230,15 +243,25 @@ def _loop_reading__on_eof(self):
def _loop_reading(self, fut=None):
self._loop_reading_cb(fut)

def _data_received(self, data):
assert isinstance(data, bytes)
if not self._protocol_use_buffer():
if data:
self._protocol.data_received(data)
else:
self._loop_reading__on_eof()
else:
# FIXME handle error on get_buffer() :-(
buf = self._protocol.get_buffer(-1)
nbytes = len(data)
buf[nbytes:] = data
self._loop.call_soon(self._nbytes_received, nbytes)

def _loop_reading__data_received(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return

data = None
try:
if fut is not None:
Expand All @@ -257,12 +280,24 @@ def _loop_reading__data_received(self, fut):
data = None
return

if data is not None:
if self._paused:
assert self._pending_data is None
self._pending_data = data
else:
self._data_received(data)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this code because it was easier for me, to be able to implement the switch from data_received to buffer_updated: "if self._protocol_use_buffer(): self._loop_reading__get_buffer(None)" below. I'm not sure if moving this code changes the semantics? It shouldn't, but 3 proactor tests are failing.


if data == b'':
# we got end-of-file so no need to reschedule a new read
return

# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
if self._protocol_use_buffer():
self._loop_reading__get_buffer(None)
return

if not self._paused:
# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand All @@ -277,22 +312,38 @@ def _loop_reading__data_received(self, fut):
if not self._closing:
raise
else:
self._read_fut.add_done_callback(self._loop_reading__data_received)
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading__data_received)

def _nbytes_received(self, nbytes):
try:
if self._protocol_use_buffer():
self._current_buffer = None
if nbytes == 0:
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return True
else:
data = bytes(memoryview(self._current_buffer)[:nbytes])
self._data_received(data)

return False
finally:
if data:
self._protocol.data_received(data)
elif data == b'':
self._loop_reading__on_eof()
self._current_buffer = None

def _loop_reading__get_buffer(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return

nbytes = None
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
Expand Down Expand Up @@ -320,19 +371,21 @@ def _loop_reading__get_buffer(self, fut):
raise

if nbytes is not None:
if nbytes == 0:
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
if self._paused:
assert self._pending_nbytes is None
self._pending_nbytes = nbytes
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
if self._nbytes_received(nbytes):
return

if not self._protocol_use_buffer():
self._loop_reading__data_received(None)
return

if self._paused:
# don't schedule a new read if paused
return

if self._closing or nbytes == 0:
# since close() has been called we ignore any read data
return
Expand All @@ -348,6 +401,7 @@ def _loop_reading__get_buffer(self, fut):

try:
# schedule a new read
self._current_buffer = buf
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading__get_buffer)
except ConnectionAbortedError as exc:
Expand Down