Skip to content

bpo-33694: Fix race condition on proactor recv() #7498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 53 additions & 125 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,13 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,

def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
self._loop_reading_cb = None
self._pending_data = None
self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)

self._reschedule_on_resume = False
self._loop.call_soon(self._loop_reading)
self._paused = False

def set_protocol(self, protocol):
if isinstance(protocol, protocols.BufferedProtocol):
self._loop_reading_cb = self._loop_reading__get_buffer
else:
self._loop_reading_cb = self._loop_reading__data_received

super().set_protocol(protocol)

if self.is_reading():
# reset reading callback / buffers / self._read_fut
self.pause_reading()
self.resume_reading()

def is_reading(self):
return not self._paused and not self._closing

Expand All @@ -188,32 +174,39 @@ def pause_reading(self):
return
self._paused = True

if self._read_fut is not None and not self._read_fut.done():
# TODO: This is an ugly hack to cancel the current read future
# *and* avoid potential race conditions, as read cancellation
# goes through `future.cancel()` and `loop.call_soon()`.
# We then use this special attribute in the reader callback to
# exit *immediately* without doing any cleanup/rescheduling.
self._read_fut.__asyncio_cancelled_on_pause__ = True

self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True
# bpo-33694: Don't cancel self._read_fut because cancelling an
# overlapped WSASend() loss silently data with the current proactor
# implementation.
#
# If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend()
# completed (even if HasOverlappedIoCompleted() returns 0), but
# Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND
# error. Once the overlapped is ignored, the IOCP loop will ignores the
# completion I/O event and so not read the result of the overlapped
# WSARecv().

if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if self._closing or not self._paused:
return

self._paused = False
if self._reschedule_on_resume:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False
if self._read_fut is None:
self._loop.call_soon(self._loop_reading, None)

data = self._pending_data
self._pending_data = None
if data is not None:
# Call the protocol methode after calling _loop_reading(),
# since the protocol can decide to pause reading again.
self._loop.call_soon(self._data_received, data)

if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def _loop_reading__on_eof(self):
def _eof_received(self):
if self._loop.get_debug():
logger.debug("%r received EOF", self)

Expand All @@ -227,18 +220,30 @@ def _loop_reading__on_eof(self):
if not keep_open:
self.close()

def _loop_reading(self, fut=None):
self._loop_reading_cb(fut)

def _loop_reading__data_received(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
def _data_received(self, data):
if self._paused:
# Don't call any protocol method while reading is paused.
# The protocol will be called on resume_reading().
assert self._pending_data is None
self._pending_data = data
return

if self._paused:
self._reschedule_on_resume = True
if not data:
self._eof_received()
return

if isinstance(self._protocol, protocols.BufferedProtocol):
try:
protocols._feed_data_to_bufferred_proto(self._protocol, data)
except Exception as exc:
self._fatal_error(exc,
'Fatal error: protocol.buffer_updated() '
'call failed.')
return
else:
self._protocol.data_received(data)

def _loop_reading(self, fut=None):
data = None
try:
if fut is not None:
Expand All @@ -261,8 +266,12 @@ def _loop_reading__data_received(self, fut):
# we got end-of-file so no need to reschedule a new read
return

# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
# bpo-33694: buffer_updated() has currently no fast path because of
# a data loss issue caused by overlapped WSASend() cancellation.

if not self._paused:
# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand All @@ -277,92 +286,11 @@ def _loop_reading__data_received(self, fut):
if not self._closing:
raise
else:
self._read_fut.add_done_callback(self._loop_reading__data_received)
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading)
finally:
if data:
self._protocol.data_received(data)
elif data == b'':
self._loop_reading__on_eof()

def _loop_reading__get_buffer(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return

nbytes = None
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
self._closing)
self._read_fut = None
try:
if fut.done():
nbytes = fut.result()
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(
exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise

if nbytes is not None:
if nbytes == 0:
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return

if self._closing or nbytes == 0:
# since close() has been called we ignore any read data
return

try:
buf = self._protocol.get_buffer(-1)
if not len(buf):
raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return

try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading__get_buffer)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise
if data is not None:
self._data_received(data)


class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Expand Down
19 changes: 19 additions & 0 deletions Lib/asyncio/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,22 @@ def pipe_connection_lost(self, fd, exc):

def process_exited(self):
"""Called when subprocess has exited."""


def _feed_data_to_bufferred_proto(proto, data):
data_len = len(data)
while data_len:
buf = proto.get_buffer(data_len)
buf_len = len(buf)
if not buf_len:
raise RuntimeError('get_buffer() returned an empty buffer')

if buf_len >= data_len:
buf[:data_len] = data
proto.buffer_updated(data_len)
return
else:
buf[:buf_len] = data[:buf_len]
proto.buffer_updated(buf_len)
data = data[buf_len:]
data_len = len(data)
21 changes: 1 addition & 20 deletions Lib/asyncio/sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def data_received(self, data):
if chunk:
try:
if self._app_protocol_is_buffer:
_feed_data_to_bufferred_proto(
protocols._feed_data_to_bufferred_proto(
self._app_protocol, chunk)
else:
self._app_protocol.data_received(chunk)
Expand Down Expand Up @@ -721,22 +721,3 @@ def _abort(self):
self._transport.abort()
finally:
self._finalize()


def _feed_data_to_bufferred_proto(proto, data):
data_len = len(data)
while data_len:
buf = proto.get_buffer(data_len)
buf_len = len(buf)
if not buf_len:
raise RuntimeError('get_buffer() returned an empty buffer')

if buf_len >= data_len:
buf[:data_len] = data
proto.buffer_updated(data_len)
return
else:
buf[:buf_len] = data[:buf_len]
proto.buffer_updated(buf_len)
data = data[buf_len:]
data_len = len(data)
4 changes: 4 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ def test_dont_pause_writing(self):
self.assertFalse(self.protocol.pause_writing.called)


@unittest.skip('FIXME: bpo-33694: these tests are too close '
'to the implementation and should be refactored or removed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):

def setUp(self):
Expand Down Expand Up @@ -551,6 +553,8 @@ def test_proto_type_switch(self):
self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
buf_proto.buffer_updated.assert_called_with(4)

@unittest.skip('FIXME: bpo-33694: this test is too close to the '
'implementation and should be refactored or removed')
def test_proto_buf_switch(self):
tr = self.socket_transport()
test_utils.run_briefly(self.loop)
Expand Down
13 changes: 7 additions & 6 deletions Lib/test/test_asyncio/test_sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import asyncio
from asyncio import log
from asyncio import protocols
from asyncio import sslproto
from asyncio import tasks
from test.test_asyncio import utils as test_utils
Expand Down Expand Up @@ -189,28 +190,28 @@ def buffer_updated(self, nsize):

for usemv in [False, True]:
proto = Proto(1, usemv)
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
protocols._feed_data_to_bufferred_proto(proto, b'12345')
self.assertEqual(proto.data, b'12345')

proto = Proto(2, usemv)
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
protocols._feed_data_to_bufferred_proto(proto, b'12345')
self.assertEqual(proto.data, b'12345')

proto = Proto(2, usemv)
sslproto._feed_data_to_bufferred_proto(proto, b'1234')
protocols._feed_data_to_bufferred_proto(proto, b'1234')
self.assertEqual(proto.data, b'1234')

proto = Proto(4, usemv)
sslproto._feed_data_to_bufferred_proto(proto, b'1234')
protocols._feed_data_to_bufferred_proto(proto, b'1234')
self.assertEqual(proto.data, b'1234')

proto = Proto(100, usemv)
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
protocols._feed_data_to_bufferred_proto(proto, b'12345')
self.assertEqual(proto.data, b'12345')

proto = Proto(0, usemv)
with self.assertRaisesRegex(RuntimeError, 'empty buffer'):
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
protocols._feed_data_to_bufferred_proto(proto, b'12345')

def test_start_tls_client_reg_proto_1(self):
HELLO_MSG = b'1' * self.PAYLOAD_SIZE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
asyncio: Fix a race condition causing data loss on
pause_reading()/resume_reading() when using the ProactorEventLoop.