Skip to content

Yvesdup queue shutdown changes #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def _format(self):
result += f' _putters[{len(self._putters)}]'
if self._unfinished_tasks:
result += f' tasks={self._unfinished_tasks}'
if self._shutdown_state is not _QueueState.alive:
result += f' shutdown={self._shutdown_state.value}'
return result

def qsize(self):
Expand Down Expand Up @@ -133,7 +135,7 @@ async def put(self, item):
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
"""
if self._shutdown_state != _QueueState.alive:
if self._shutdown_state is not _QueueState.alive:
raise QueueShutDown
while self.full():
putter = self._get_loop().create_future()
Expand All @@ -154,7 +156,7 @@ async def put(self, item):
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
if self._shutdown_state != _QueueState.alive:
if self._shutdown_state is not _QueueState.alive:
raise QueueShutDown
return self.put_nowait(item)

Expand All @@ -163,7 +165,7 @@ def put_nowait(self, item):

If no free slot is immediately available, raise QueueFull.
"""
if self._shutdown_state != _QueueState.alive:
if self._shutdown_state is not _QueueState.alive:
raise QueueShutDown
if self.full():
raise QueueFull
Expand All @@ -177,10 +179,10 @@ async def get(self):

If queue is empty, wait until an item is available.
"""
if self._shutdown_state == _QueueState.shutdown_immediate:
if self._shutdown_state is _QueueState.shutdown_immediate:
raise QueueShutDown
while self.empty():
if self._shutdown_state != _QueueState.alive:
if self._shutdown_state is not _QueueState.alive:
raise QueueShutDown
getter = self._get_loop().create_future()
self._getters.append(getter)
Expand All @@ -200,7 +202,7 @@ async def get(self):
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise
if self._shutdown_state == _QueueState.shutdown_immediate:
if self._shutdown_state is _QueueState.shutdown_immediate:
raise QueueShutDown
return self.get_nowait()

Expand All @@ -210,10 +212,10 @@ def get_nowait(self):
Return an item if one is immediately available, else raise QueueEmpty.
"""
if self.empty():
if self._shutdown_state != _QueueState.alive:
if self._shutdown_state is not _QueueState.alive:
raise QueueShutDown
raise QueueEmpty
elif self._shutdown_state == _QueueState.shutdown_immediate:
elif self._shutdown_state is _QueueState.shutdown_immediate:
raise QueueShutDown
item = self._get()
self._wakeup_next(self._putters)
Expand All @@ -233,6 +235,8 @@ def task_done(self):
Raises ValueError if called more times than there were items placed in
the queue.
"""
if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
raise QueueShutDown
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
Expand All @@ -247,8 +251,12 @@ async def join(self):
indicate that the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
raise QueueShutDown
if self._unfinished_tasks > 0:
await self._finished.wait()
if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
raise QueueShutDown

def shutdown(self, immediate=False):
"""Shut-down the queue, making queue gets and puts raise.
Expand All @@ -259,20 +267,24 @@ def shutdown(self, immediate=False):
All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'. The QueueShutDown exception is raised.
"""
if self._shutdown_state is _QueueState.shutdown_immediate:
return
# here _shutdown_state is ALIVE or SHUTDOWN
if immediate:
self._shutdown_state = _QueueState.shutdown_immediate
while self._getters:
getter = self._getters.popleft()
if not getter.done():
getter.set_result(None)
else:
# Release 'blocked' tasks/coros via `.join()`
self._finished.set()
elif self._shutdown_state is _QueueState.alive: # here
self._shutdown_state = _QueueState.shutdown
while self._putters:
putter = self._putters.popleft()
if not putter.done():
putter.set_result(None)
# Release 'joined' tasks/coros
self._finished.set()


class PriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Expand Down
50 changes: 37 additions & 13 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, maxsize=0, *, ctx):
self._ignore_epipe = False
self._reset()
self._shutdown_state = context._default_context.Value(
ctypes.c_uint8, lock=self._rlock
ctypes.c_uint8, _queue_alive, lock=True
)

if sys.platform != 'win32':
Expand All @@ -65,11 +65,13 @@ def __init__(self, maxsize=0, *, ctx):
def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state) = state
self._reset()

def _after_fork(self):
Expand Down Expand Up @@ -100,21 +102,19 @@ def put(self, obj, block=True, timeout=None):
raise Full

with self._notempty:
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def get(self, block=True, timeout=None):
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
if block and timeout is None:
with self._rlock:
if self._shutdown_state.value != _queue_alive:
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
res = self._recv_bytes()
self._sem.release()
Expand All @@ -127,18 +127,18 @@ def get(self, block=True, timeout=None):
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if self._shutdown_state.value != _queue_alive:
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
raise Empty
if self._shutdown_state.value != _queue_alive :
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
if self._shutdown_state.value == _queue_shutdown:
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)
Expand All @@ -159,6 +159,17 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def shutdown(self, immediate=True):
with self._shutdown_state.get_lock():
if self._shutdown_state.value == _queue_shutdown_immediate:
return
if immediate:
self._shutdown_state.value = _queue_shutdown_immediate
with self._notempty:
self._notempty.notify_all() # cf from @EpicWink
else:
self._shutdown_state.value = _queue_shutdown

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -340,6 +351,8 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if not self._sem.acquire(block, timeout):
raise Full

Expand All @@ -352,17 +365,28 @@ def put(self, obj, block=True, timeout=None):

def task_done(self):
with self._cond:
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if self._shutdown_state.value == _queue_shutdown_immediate:
return
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown

def shutdown(self, immediate=True):
initial_shutdown = self._shutdown_state.value
super().shutdown(immediate)
if initial_shutdown == _queue_alive:
with self._cond:
self._cond.notify_all() # here to check YD

#
# Simplified Queue type -- really just a locked pipe
Expand Down
39 changes: 22 additions & 17 deletions Lib/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'''A multi-producer, multi-consumer queue.'''

import enum
import threading
import types
from collections import deque
Expand Down Expand Up @@ -29,9 +30,10 @@ class ShutDown(Exception):
'''Raised when put/get with shut-down queue.'''


_queue_alive = "alive"
_queue_shutdown = "shutdown"
_queue_shutdown_immediate = "shutdown-immediate"
class _QueueState(enum.Enum):
ALIVE = "alive"
SHUTDOWN = "shutdown"
SHUTDOWN_IMMEDIATE = "shutdown-immediate"


class Queue:
Expand Down Expand Up @@ -64,7 +66,7 @@ def __init__(self, maxsize=0):
self.unfinished_tasks = 0

# Queue shut-down state
self.shutdown_state = _queue_alive
self.shutdown_state = _QueueState.ALIVE

def task_done(self):
'''Indicate that a formerly enqueued task is complete.
Expand Down Expand Up @@ -99,7 +101,7 @@ def join(self):
'''
with self.all_tasks_done:
while self.unfinished_tasks:
if self.shutdown_state == _queue_shutdown_immediate:
if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
return
self.all_tasks_done.wait()

Expand Down Expand Up @@ -144,7 +146,7 @@ def put(self, item, block=True, timeout=None):
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
with self.not_full:
if self.maxsize > 0:
Expand All @@ -154,7 +156,7 @@ def put(self, item, block=True, timeout=None):
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
Expand All @@ -165,7 +167,7 @@ def put(self, item, block=True, timeout=None):
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
self._put(item)
self.unfinished_tasks += 1
Expand All @@ -182,35 +184,35 @@ def get(self, block=True, timeout=None):
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
if self.shutdown_state == _queue_shutdown_immediate:
if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
raise ShutDown
with self.not_empty:
if not block:
if not self._qsize():
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
raise Empty
elif timeout is None:
while not self._qsize():
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
self.not_empty.wait()
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.shutdown_state != _queue_alive:
if self.shutdown_state is not _QueueState.ALIVE:
raise ShutDown
if self.shutdown_state == _queue_shutdown_immediate:
if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
raise ShutDown
item = self._get()
self.not_full.notify()
Expand Down Expand Up @@ -242,16 +244,19 @@ def shutdown(self, immediate=False):
and join() if 'immediate'. The ShutDown exception is raised.
'''
with self.mutex:
if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE:
return

if immediate:
self.shutdown_state = _queue_shutdown_immediate
self.shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE
self.not_empty.notify_all()
# set self.unfinished_tasks to 0
# to break the loop in 'self.join()'
# when quits from `wait()`
self.unfinished_tasks = 0
self.all_tasks_done.notify_all()
else:
self.shutdown_state = _queue_shutdown
self.shutdown_state = _QueueState.SHUTDOWN
self.not_full.notify_all()

# Override these methods to implement other queue organizations
Expand Down
Loading