From 2de496a825d2f50026c1f6909a1fcb02227f60c4 Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Fri, 25 Mar 2022 10:24:40 -0700 Subject: [PATCH] Make OutStream._buffer thread safe --- ipykernel/iostream.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 50f86d243..87536f801 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -376,7 +376,8 @@ def __init__( self._flush_pending = False self._subprocess_flush_pending = False self._io_loop = pub_thread.io_loop - self._new_buffer() + self._buffer_lock = threading.RLock() + self._buffer = StringIO() self.echo = None self._isatty = bool(isatty) @@ -528,7 +529,8 @@ def write(self, string: str) -> int: is_child = (not self._is_master_process()) # only touch the buffer in the IO thread to avoid races - self.pub_thread.schedule(lambda: self._buffer.write(string)) + with self._buffer_lock: + self._buffer.write(string) if is_child: # mp.Pool cannot be trusted to flush promptly (or ever), # and this helps. @@ -553,17 +555,15 @@ def writable(self): return True def _flush_buffer(self): - """clear the current buffer and return the current buffer data. - - This should only be called in the IO thread. - """ - data = '' - if self._buffer is not None: - buf = self._buffer - self._new_buffer() - data = buf.getvalue() - buf.close() + """clear the current buffer and return the current buffer data.""" + buf = self._rotate_buffer() + data = buf.getvalue() + buf.close() return data - def _new_buffer(self): - self._buffer = StringIO() + def _rotate_buffer(self): + """Returns the current buffer and replaces it with an empty buffer.""" + with self._buffer_lock: + old_buffer = self._buffer + self._buffer = StringIO() + return old_buffer