diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 337ed0fb204751..e7551e2dced142 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -163,9 +163,14 @@ def __init__(self, loop, sock, protocol, waiter=None, self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) - self._reschedule_on_resume = False self._loop.call_soon(self._loop_reading) self._paused = False + self._pending_data = None + self._pending_nbytes = None + self._current_buffer = None + + def _protocol_use_buffer(self): + return (self._loop_reading_cb == self._loop_reading__get_buffer) def set_protocol(self, protocol): if isinstance(protocol, protocols.BufferedProtocol): @@ -188,17 +193,10 @@ def pause_reading(self): return self._paused = True - if self._read_fut is not None and not self._read_fut.done(): - # TODO: This is an ugly hack to cancel the current read future - # *and* avoid potential race conditions, as read cancellation - # goes through `future.cancel()` and `loop.call_soon()`. - # We then use this special attribute in the reader callback to - # exit *immediately* without doing any cleanup/rescheduling. - self._read_fut.__asyncio_cancelled_on_pause__ = True - - self._read_fut.cancel() - self._read_fut = None - self._reschedule_on_resume = True + # bpo-33694: Cancelling an overlapped WSARecv() might loose data + # with the current proactor implementation: don't cancel it to prevent + # data loss. Instead, put received data into _pending_data and stop + # rescheduling new overlapped WSARecv(). if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -207,9 +205,24 @@ def resume_reading(self): if self._closing or not self._paused: return self._paused = False - if self._reschedule_on_resume: - self._loop.call_soon(self._loop_reading, self._read_fut) - self._reschedule_on_resume = False + + data = self._pending_data + self._pending_data = None + if data is not None: + # don't call it right now since protocol.data_received() can + # call pause_reading() + self._loop.call_soon(self._data_received, data) + + nbytes = self._pending_nbytes + self._pending_nbytes = None + if nbytes is not None: + # don't call it right now since protocol.buffer_updated() can + # call pause_reading() + self._loop.call_soon(self._nbytes_received, nbytes) + + if self._read_fut is None: + self._loop_reading(None) + if self._loop.get_debug(): logger.debug("%r resumes reading", self) @@ -230,15 +243,25 @@ def _loop_reading__on_eof(self): def _loop_reading(self, fut=None): self._loop_reading_cb(fut) + def _data_received(self, data): + assert isinstance(data, bytes) + if not self._protocol_use_buffer(): + if data: + self._protocol.data_received(data) + else: + self._loop_reading__on_eof() + else: + # FIXME handle error on get_buffer() :-( + buf = self._protocol.get_buffer(-1) + nbytes = len(data) + buf[nbytes:] = data + self._loop.call_soon(self._nbytes_received, nbytes) + def _loop_reading__data_received(self, fut): if (fut is not None and getattr(fut, '__asyncio_cancelled_on_pause__', False)): return - if self._paused: - self._reschedule_on_resume = True - return - data = None try: if fut is not None: @@ -257,12 +280,24 @@ def _loop_reading__data_received(self, fut): data = None return + if data is not None: + if self._paused: + assert self._pending_data is None + self._pending_data = data + else: + self._data_received(data) + if data == b'': # we got end-of-file so no need to reschedule a new read return - # reschedule a new read - self._read_fut = self._loop._proactor.recv(self._sock, 32768) + if self._protocol_use_buffer(): + self._loop_reading__get_buffer(None) + return + + if not self._paused: + # reschedule a new read + self._read_fut = self._loop._proactor.recv(self._sock, 32768) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') @@ -277,22 +312,38 @@ def _loop_reading__data_received(self, fut): if not self._closing: raise else: - self._read_fut.add_done_callback(self._loop_reading__data_received) + if not self._paused: + self._read_fut.add_done_callback(self._loop_reading__data_received) + + def _nbytes_received(self, nbytes): + try: + if self._protocol_use_buffer(): + self._current_buffer = None + if nbytes == 0: + # we got end-of-file so no need to reschedule a new read + self._loop_reading__on_eof() + else: + try: + self._protocol.buffer_updated(nbytes) + except Exception as exc: + self._fatal_error( + exc, + 'Fatal error: ' + 'protocol.buffer_updated() call failed.') + return True + else: + data = bytes(memoryview(self._current_buffer)[:nbytes]) + self._data_received(data) + + return False finally: - if data: - self._protocol.data_received(data) - elif data == b'': - self._loop_reading__on_eof() + self._current_buffer = None def _loop_reading__get_buffer(self, fut): if (fut is not None and getattr(fut, '__asyncio_cancelled_on_pause__', False)): return - if self._paused: - self._reschedule_on_resume = True - return - nbytes = None if fut is not None: assert self._read_fut is fut or (self._read_fut is None and @@ -320,19 +371,21 @@ def _loop_reading__get_buffer(self, fut): raise if nbytes is not None: - if nbytes == 0: - # we got end-of-file so no need to reschedule a new read - self._loop_reading__on_eof() + if self._paused: + assert self._pending_nbytes is None + self._pending_nbytes = nbytes else: - try: - self._protocol.buffer_updated(nbytes) - except Exception as exc: - self._fatal_error( - exc, - 'Fatal error: ' - 'protocol.buffer_updated() call failed.') + if self._nbytes_received(nbytes): return + if not self._protocol_use_buffer(): + self._loop_reading__data_received(None) + return + + if self._paused: + # don't schedule a new read if paused + return + if self._closing or nbytes == 0: # since close() has been called we ignore any read data return @@ -348,6 +401,7 @@ def _loop_reading__get_buffer(self, fut): try: # schedule a new read + self._current_buffer = buf self._read_fut = self._loop._proactor.recv_into(self._sock, buf) self._read_fut.add_done_callback(self._loop_reading__get_buffer) except ConnectionAbortedError as exc: