Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 67 additions & 116 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,14 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,

def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
self._loop_reading_cb = None
self._pending_data = None
self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)

self._reschedule_on_resume = False
self._loop.call_soon(self._loop_reading)
self._paused = False

def set_protocol(self, protocol):
if isinstance(protocol, protocols.BufferedProtocol):
self._loop_reading_cb = self._loop_reading__get_buffer
else:
self._loop_reading_cb = self._loop_reading__data_received

super().set_protocol(protocol)

if self.is_reading():
Expand All @@ -188,32 +182,39 @@ def pause_reading(self):
return
self._paused = True

if self._read_fut is not None and not self._read_fut.done():
# TODO: This is an ugly hack to cancel the current read future
# *and* avoid potential race conditions, as read cancellation
# goes through `future.cancel()` and `loop.call_soon()`.
# We then use this special attribute in the reader callback to
# exit *immediately* without doing any cleanup/rescheduling.
self._read_fut.__asyncio_cancelled_on_pause__ = True

self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True
# bpo-33694: Don't cancel self._read_fut because cancelling an
# overlapped WSASend() loss silently data with the current proactor
# implementation.
#
# If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend()
# completed (even if HasOverlappedIoCompleted() returns 0), but
# Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND
# error. Once the overlapped is ignored, the IOCP loop will ignores the
# completion I/O event and so not read the result of the overlapped
# WSARecv().

if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if self._closing or not self._paused:
return

self._paused = False
if self._reschedule_on_resume:
if self._read_fut is None:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False

data = self._pending_data
self._pending_data = None
if data is not None:
# Call the protocol methode after calling _loop_reading(),
# since the protocol can decide to pause reading again.
self._loop.call_soon(self._data_received, data)

if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def _loop_reading__on_eof(self):
def _eof_received(self):
if self._loop.get_debug():
logger.debug("%r received EOF", self)

Expand All @@ -227,18 +228,45 @@ def _loop_reading__on_eof(self):
if not keep_open:
self.close()

def _loop_reading(self, fut=None):
self._loop_reading_cb(fut)

def _loop_reading__data_received(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
def _data_received(self, data):
if self._paused:
# don't call any protocol method while reading is paused
assert self._pending_data is None
self._pending_data = data
return

if self._paused:
self._reschedule_on_resume = True
if not data:
self._eof_received()
return

if isinstance(self._protocol, protocols.BufferedProtocol):
nbytes = len(data)
if nbytes:
try:
buf = self._protocol.get_buffer(-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass nbytes to get_buffer(). Also, get_buffer() can return a smaller buffer than requested. So you better should use sslproto._feed_data_to_bufferred_proto helper to make sure that all of the received data is passed to the BufferedProtocol.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you better should use sslproto._feed_data_to_bufferred_proto helper

Ok, done.

if not len(buf):
raise RuntimeError('get_buffer() returned '
'an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return

# Copy data into the buffer
buf[:nbytes] = data

try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return
else:
self._protocol.data_received(data)

def _loop_reading(self, fut=None):
data = None
try:
if fut is not None:
Expand All @@ -261,8 +289,12 @@ def _loop_reading__data_received(self, fut):
# we got end-of-file so no need to reschedule a new read
return

# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
# bpo-33694: buffer_updated() has currently no fast path because
# of a data loss issue with WSASend() cancellation.

if not self._paused:
# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand All @@ -277,92 +309,11 @@ def _loop_reading__data_received(self, fut):
if not self._closing:
raise
else:
self._read_fut.add_done_callback(self._loop_reading__data_received)
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading)
finally:
if data:
self._protocol.data_received(data)
elif data == b'':
self._loop_reading__on_eof()

def _loop_reading__get_buffer(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return

nbytes = None
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
self._closing)
self._read_fut = None
try:
if fut.done():
nbytes = fut.result()
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(
exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise

if nbytes is not None:
if nbytes == 0:
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return

if self._closing or nbytes == 0:
# since close() has been called we ignore any read data
return

try:
buf = self._protocol.get_buffer(-1)
if not len(buf):
raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return

try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading__get_buffer)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise
if data is not None:
self._data_received(data)


class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Expand Down
4 changes: 4 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ def test_dont_pause_writing(self):
self.assertFalse(self.protocol.pause_writing.called)


@unittest.skip('FIXME: bpo-33694: these tests are too close '
'to the implementation and should be refactored or removed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):

def setUp(self):
Expand Down Expand Up @@ -551,6 +553,8 @@ def test_proto_type_switch(self):
self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
buf_proto.buffer_updated.assert_called_with(4)

@unittest.skip('FIXME: bpo-33694: this test is too close to the '
'implementation and should be refactored or removed')
def test_proto_buf_switch(self):
tr = self.socket_transport()
test_utils.run_briefly(self.loop)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
asyncio: Fix a race condition causing dataloss on
pause_reading()/resume_reading() when using the ProactorEventLoop.