Skip to content

Commit 16e9c09

Browse files
committed
Add threading queue shutdown
* Include docs
1 parent f508800 commit 16e9c09

File tree

7 files changed

+296
-2
lines changed

7 files changed

+296
-2
lines changed

Doc/library/queue.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions:
9393
on a :class:`Queue` object which is full.
9494

9595

96+
.. exception:: ShutDown
97+
98+
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
99+
a :class:`Queue` object which has been shut down.
100+
101+
.. versionadded:: 3.12
102+
103+
96104
.. _queueobjects:
97105

98106
Queue Objects
@@ -135,6 +143,8 @@ provide the public methods described below.
135143
immediately available, else raise the :exc:`Full` exception (*timeout* is
136144
ignored in that case).
137145

146+
Raises :exc:`ShutDown` if the queue has been shut down.
147+
138148

139149
.. method:: Queue.put_nowait(item)
140150

@@ -155,6 +165,9 @@ provide the public methods described below.
155165
an uninterruptible wait on an underlying lock. This means that no exceptions
156166
can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`.
157167

168+
Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if
169+
the queue has been shut down immediately.
170+
158171

159172
.. method:: Queue.get_nowait()
160173

@@ -177,6 +190,8 @@ fully processed by daemon consumer threads.
177190
Raises a :exc:`ValueError` if called more times than there were items placed in
178191
the queue.
179192

193+
Raises :exc:`ShutDown` if the queue has been shut down immediately.
194+
180195

181196
.. method:: Queue.join()
182197

@@ -187,6 +202,8 @@ fully processed by daemon consumer threads.
187202
indicate that the item was retrieved and all work on it is complete. When the
188203
count of unfinished tasks drops to zero, :meth:`join` unblocks.
189204

205+
Raises :exc:`ShutDown` if the queue has been shut down immediately.
206+
190207

191208
Example of how to wait for enqueued tasks to be completed::
192209

@@ -214,6 +231,25 @@ Example of how to wait for enqueued tasks to be completed::
214231
print('All work completed')
215232

216233

234+
Terminating queues
235+
^^^^^^^^^^^^^^^^^^
236+
237+
:class:`Queue` objects can be made to prevent further interaction by shutting
238+
them down.
239+
240+
.. method:: Queue.shutdown(immediate=False)
241+
242+
Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`.
243+
244+
By default, gets will only raise once the queue is empty. Set
245+
*immediate* to true to make gets raise immediately instead.
246+
247+
All blocked callers of put() will be unblocked, and also get()
248+
and join() if *immediate* is true.
249+
250+
.. versionadded:: 3.12
251+
252+
217253
SimpleQueue Objects
218254
-------------------
219255

Lib/asyncio/queues.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
1+
__all__ = (
2+
'Queue',
3+
'PriorityQueue',
4+
'LifoQueue',
5+
'QueueFull',
6+
'QueueEmpty',
7+
'QueueShutDown',
8+
)
29

310
import collections
411
import heapq
@@ -18,6 +25,16 @@ class QueueFull(Exception):
1825
pass
1926

2027

28+
class QueueShutDown(Exception):
29+
"""Raised when putting on to or getting from a shut-down Queue."""
30+
pass
31+
32+
33+
_queue_alive = "alive"
34+
_queue_shutdown = "shutdown"
35+
_queue_shutdown_immediate = "shutdown-immediate"
36+
37+
2138
class Queue(mixins._LoopBoundMixin):
2239
"""A queue, useful for coordinating producer and consumer coroutines.
2340
@@ -41,6 +58,7 @@ def __init__(self, maxsize=0):
4158
self._finished = locks.Event()
4259
self._finished.set()
4360
self._init(maxsize)
61+
self.shutdown_state = _queue_alive
4462

4563
# These three are overridable in subclasses.
4664

@@ -113,6 +131,8 @@ async def put(self, item):
113131
Put an item into the queue. If the queue is full, wait until a free
114132
slot is available before adding item.
115133
"""
134+
if self.shutdown_state != _queue_alive:
135+
raise QueueShutDown
116136
while self.full():
117137
putter = self._get_loop().create_future()
118138
self._putters.append(putter)
@@ -132,13 +152,17 @@ async def put(self, item):
132152
# the call. Wake up the next in line.
133153
self._wakeup_next(self._putters)
134154
raise
155+
if self.shutdown_state != _queue_alive:
156+
raise QueueShutDown
135157
return self.put_nowait(item)
136158

137159
def put_nowait(self, item):
138160
"""Put an item into the queue without blocking.
139161
140162
If no free slot is immediately available, raise QueueFull.
141163
"""
164+
if self.shutdown_state != _queue_alive:
165+
raise QueueShutDown
142166
if self.full():
143167
raise QueueFull
144168
self._put(item)
@@ -151,7 +175,11 @@ async def get(self):
151175
152176
If queue is empty, wait until an item is available.
153177
"""
178+
if self.shutdown_state == _queue_shutdown_immediate:
179+
raise QueueShutDown
154180
while self.empty():
181+
if self.shutdown_state != _queue_alive:
182+
raise QueueShutDown
155183
getter = self._get_loop().create_future()
156184
self._getters.append(getter)
157185
try:
@@ -170,6 +198,8 @@ async def get(self):
170198
# the call. Wake up the next in line.
171199
self._wakeup_next(self._getters)
172200
raise
201+
if self.shutdown_state == _queue_shutdown_immediate:
202+
raise QueueShutDown
173203
return self.get_nowait()
174204

175205
def get_nowait(self):
@@ -178,7 +208,11 @@ def get_nowait(self):
178208
Return an item if one is immediately available, else raise QueueEmpty.
179209
"""
180210
if self.empty():
211+
if self.shutdown_state != _queue_alive:
212+
raise QueueShutDown
181213
raise QueueEmpty
214+
elif self.shutdown_state == _queue_shutdown_immediate:
215+
raise QueueShutDown
182216
item = self._get()
183217
self._wakeup_next(self._putters)
184218
return item
@@ -214,6 +248,27 @@ async def join(self):
214248
if self._unfinished_tasks > 0:
215249
await self._finished.wait()
216250

251+
def shutdown(self, immediate=False):
252+
"""Shut-down the queue, making queue gets and puts raise.
253+
254+
By default, gets will only raise once the queue is empty. Set
255+
'immediate' to True to make gets raise immediately instead.
256+
257+
All blocked callers of put() will be unblocked, and also get()
258+
and join() if 'immediate'. The QueueShutDown exception is raised.
259+
"""
260+
if immediate:
261+
self.shutdown_state = _queue_shutdown_immediate
262+
while self._getters:
263+
getter = self._getters.popleft()
264+
if not getter.done():
265+
getter.set_result(None)
266+
else:
267+
self.shutdown_state = _queue_shutdown
268+
while self._putters:
269+
putter = self._putters.popleft()
270+
if not putter.done():
271+
putter.set_result(None)
217272

218273
class PriorityQueue(Queue):
219274
"""A subclass of Queue; retrieves entries in priority order (lowest first).

Lib/multiprocessing/queues.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import types
1818
import weakref
1919
import errno
20+
import ctypes
2021

21-
from queue import Empty, Full
22+
from queue import Empty, Full, ShutDown
2223

2324
import _multiprocessing
2425

@@ -28,6 +29,10 @@
2829

2930
from .util import debug, info, Finalize, register_after_fork, is_exiting
3031

32+
_queue_alive = 0
33+
_queue_shutdown = 1
34+
_queue_shutdown_immediate = 2
35+
3136
#
3237
# Queue type using a pipe, buffer and thread
3338
#
@@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx):
5055
# For use by concurrent.futures
5156
self._ignore_epipe = False
5257
self._reset()
58+
self._shutdown_state = context._default_context.Value(
59+
ctypes.c_uint8, lock=self._rlock
60+
)
5361

5462
if sys.platform != 'win32':
5563
register_after_fork(self, Queue._after_fork)
@@ -86,20 +94,28 @@ def _reset(self, after_fork=False):
8694
def put(self, obj, block=True, timeout=None):
8795
if self._closed:
8896
raise ValueError(f"Queue {self!r} is closed")
97+
if self._shutdown_state.value != _queue_alive:
98+
raise ShutDown
8999
if not self._sem.acquire(block, timeout):
90100
raise Full
91101

92102
with self._notempty:
103+
if self._shutdown_state.value != _queue_alive:
104+
raise ShutDown
93105
if self._thread is None:
94106
self._start_thread()
95107
self._buffer.append(obj)
96108
self._notempty.notify()
97109

98110
def get(self, block=True, timeout=None):
111+
if self._shutdown_state.value == _queue_shutdown_immediate:
112+
raise ShutDown
99113
if self._closed:
100114
raise ValueError(f"Queue {self!r} is closed")
101115
if block and timeout is None:
102116
with self._rlock:
117+
if self._shutdown_state.value != _queue_alive:
118+
raise ShutDown
103119
res = self._recv_bytes()
104120
self._sem.release()
105121
else:
@@ -111,13 +127,19 @@ def get(self, block=True, timeout=None):
111127
if block:
112128
timeout = deadline - time.monotonic()
113129
if not self._poll(timeout):
130+
if self._shutdown_state.value != _queue_alive:
131+
raise ShutDown
114132
raise Empty
133+
if self._shutdown_state.value != _queue_alive :
134+
raise ShutDown
115135
elif not self._poll():
116136
raise Empty
117137
res = self._recv_bytes()
118138
self._sem.release()
119139
finally:
120140
self._rlock.release()
141+
if self._shutdown_state.value == _queue_shutdown:
142+
raise ShutDown
121143
# unserialize the data after having released the lock
122144
return _ForkingPickler.loads(res)
123145

@@ -329,6 +351,8 @@ def task_done(self):
329351

330352
def join(self):
331353
with self._cond:
354+
if self._shutdown_state.value == _queue_shutdown_immediate:
355+
return
332356
if not self._unfinished_tasks._semlock._is_zero():
333357
self._cond.wait()
334358

0 commit comments

Comments
 (0)