Skip to content

Commit 42fbea6

Browse files
committed
WIP: consume queue on immediate shutdown
1 parent ee8d4df commit 42fbea6

File tree

2 files changed

+202
-282
lines changed

2 files changed

+202
-282
lines changed

Lib/multiprocessing/queues.py

Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626

2727
from .util import debug, info, Finalize, register_after_fork, is_exiting
2828

29-
_queue_alive = 0
30-
_queue_shutdown = 1
31-
_queue_shutdown_immediate = 2
32-
3329
#
3430
# Queue type using a pipe, buffer and thread
3531
#
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
5248
# For use by concurrent.futures
5349
self._ignore_epipe = False
5450
self._reset()
55-
self._shutdown_state = ctx.Value('i', _queue_alive)
51+
self._is_shutdown = ctx.Value('B', False)
5652

5753
if sys.platform != 'win32':
5854
register_after_fork(self, Queue._after_fork)
@@ -61,12 +57,12 @@ def __getstate__(self):
6157
context.assert_spawning(self)
6258
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
6359
self._rlock, self._wlock, self._sem, self._opid,
64-
self._shutdown_state)
60+
self._is_shutdown)
6561

6662
def __setstate__(self, state):
6763
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
6864
self._rlock, self._wlock, self._sem, self._opid,
69-
self._shutdown_state) = state
65+
self._is_shutdown) = state
7066
self._reset()
7167

7268
def _after_fork(self):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
8884
self._recv_bytes = self._reader.recv_bytes
8985
self._poll = self._reader.poll
9086

91-
def _is_alive(self):
92-
return self._shutdown_state.value == _queue_alive
93-
94-
def _is_shutdown(self):
95-
return self._shutdown_state.value == _queue_shutdown
96-
97-
def _is_shutdown_immediate(self):
98-
return self._shutdown_state.value == _queue_shutdown_immediate
99-
100-
def _set_shutdown(self):
101-
self._shutdown_state.value = _queue_shutdown
102-
103-
def _set_shutdown_immediate(self):
104-
self._shutdown_state.value = _queue_shutdown_immediate
105-
10687
def put(self, obj, block=True, timeout=None):
10788
if self._closed:
10889
raise ValueError(f"Queue {self!r} is closed")
109-
if not self._is_alive():
90+
if self._is_shutdown.value:
11091
raise ShutDown
11192
if not self._sem.acquire(block, timeout):
112-
if not self._is_alive():
93+
if self._is_shutdown.value:
11394
raise ShutDown
11495
raise Full
11596

11697
with self._notempty:
98+
if self._is_shutdown.value:
99+
raise ShutDown
117100
if self._thread is None:
118101
self._start_thread()
119102
self._buffer.append(obj)
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124107
raise ValueError(f"Queue {self!r} is closed")
125108
if block and timeout is None:
126109
with self._rlock:
127-
# checks shutdown state
128-
if (self._is_shutdown_immediate()
129-
or (self._is_shutdown() and self.empty())):
110+
if self._is_shutdown.value and self.empty():
130111
raise ShutDown
131112
res = self._recv_bytes()
132113
self._sem.release()
133114
else:
134115
if block:
135116
deadline = time.monotonic() + timeout
136117
if not self._rlock.acquire(block, timeout):
137-
if (self._is_shutdown_immediate()
138-
or (self._is_shutdown() and self.empty())):
118+
if self._is_shutdown.value and self.empty():
139119
raise ShutDown
140120
raise Empty
141121
try:
142122
if block:
143123
timeout = deadline - time.monotonic()
144124
if not self._poll(timeout):
145-
if not self._is_alive():
125+
if self._is_shutdown.value:
146126
raise ShutDown
147127
raise Empty
148128
elif not self._poll():
149-
if not self._is_alive():
129+
if self._is_shutdown.value:
150130
raise ShutDown
151131
raise Empty
152132

153-
# here queue is not empty
154-
if self._is_shutdown_immediate():
155-
raise ShutDown
156-
# here shutdown state queue is alive or shutdown
157133
res = self._recv_bytes()
158134
self._sem.release()
159135
finally:
@@ -178,18 +154,24 @@ def get_nowait(self):
178154
def put_nowait(self, obj):
179155
return self.put(obj, False)
180156

157+
def _clear(self):
158+
with self._rlock:
159+
while self._poll():
160+
self._recv_bytes()
161+
181162
def shutdown(self, immediate=False):
182163
if self._closed:
183164
raise ValueError(f"Queue {self!r} is closed")
184-
with self._shutdown_state.get_lock():
185-
if self._is_shutdown_immediate():
186-
return
165+
with self._is_shutdown.get_lock():
166+
self._is_shutdown.value = True
187167
if immediate:
188-
self._set_shutdown_immediate()
189-
with self._notempty:
190-
self._notempty.notify_all()
191-
else:
192-
self._set_shutdown()
168+
self._clear()
169+
# TODO: unblock all getters to check empty (then shutdown)
170+
for _ in range(self._maxsize):
171+
try:
172+
self._sem.release()
173+
except ValueError:
174+
break
193175

194176
def close(self):
195177
self._closed = True
@@ -384,14 +366,16 @@ def __setstate__(self, state):
384366
def put(self, obj, block=True, timeout=None):
385367
if self._closed:
386368
raise ValueError(f"Queue {self!r} is closed")
387-
if not self._is_alive():
369+
if self._is_shutdown.value:
388370
raise ShutDown
389371
if not self._sem.acquire(block, timeout):
390-
if not self._is_alive():
372+
if self._is_shutdown.value:
391373
raise ShutDown
392374
raise Full
393375

394376
with self._notempty, self._cond:
377+
if self._is_shutdown.value:
378+
raise ShutDown
395379
if self._thread is None:
396380
self._start_thread()
397381
self._buffer.append(obj)
@@ -400,27 +384,22 @@ def put(self, obj, block=True, timeout=None):
400384

401385
def task_done(self):
402386
with self._cond:
403-
if self._is_shutdown_immediate():
404-
raise ShutDown
405387
if not self._unfinished_tasks.acquire(False):
406388
raise ValueError('task_done() called too many times')
407389
if self._unfinished_tasks._semlock._is_zero():
408390
self._cond.notify_all()
409391

410392
def join(self):
411393
with self._cond:
412-
if self._is_shutdown_immediate():
413-
raise ShutDown
414394
if not self._unfinished_tasks._semlock._is_zero():
415395
self._cond.wait()
416-
if self._is_shutdown_immediate():
417-
raise ShutDown
418396

419-
def shutdown(self, immediate=False):
420-
with self._cond:
421-
is_alive = self._is_alive()
422-
super().shutdown(immediate)
423-
if is_alive:
397+
def _clear(self):
398+
with self._rlock:
399+
while self._poll():
400+
self._recv_bytes()
401+
self._unfinished_tasks.acquire(block=False)
402+
with self._cond:
424403
self._cond.notify_all()
425404

426405
#

0 commit comments

Comments
 (0)