From bc7a64068b43ec75b2372f851f34cadd0d212a76 Mon Sep 17 00:00:00 2001 From: Matt Page Date: Tue, 16 Jan 2024 16:09:36 -0800 Subject: [PATCH 01/10] Use ParkingLot to manage waiting threads --- Modules/_queuemodule.c | 179 ++++++++++++++++++++++++----------------- 1 file changed, 103 insertions(+), 76 deletions(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index 8fca3cdd0deb18..53aa5efbedcfd7 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -3,8 +3,9 @@ #endif #include "Python.h" -#include "pycore_ceval.h" // _PyEval_MakePendingCalls() +#include "pycore_ceval.h" // Py_MakePendingCalls() #include "pycore_moduleobject.h" // _PyModule_GetState() +#include "pycore_parking_lot.h" #include "pycore_time.h" // _PyTime_t #include @@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf) return item; } -// Returns 0 on success or -1 if the buffer failed to grow +// Returns 0 on success or -1 if the buffer failed to grow. +// +// Steals a reference to item. static int RingBuf_Put(RingBuf *buf, PyObject *item) { @@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item) return -1; } } - buf->items[buf->put_idx] = Py_NewRef(item); + buf->items[buf->put_idx] = item; buf->put_idx = (buf->put_idx + 1) % buf->items_cap; buf->num_items++; return 0; @@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf) typedef struct { PyObject_HEAD - PyThread_type_lock lock; - int locked; + + // Are there threads waiting for items + uint8_t threads_waiting; + + // Items in the queue RingBuf buf; + PyObject *weakreflist; } simplequeueobject; @@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self) PyTypeObject *tp = Py_TYPE(self); PyObject_GC_UnTrack(self); - if (self->lock != NULL) { - /* Unlock the lock so it's safe to free it */ - if (self->locked > 0) - PyThread_release_lock(self->lock); - PyThread_free_lock(self->lock); - } (void)simplequeue_clear(self); if (self->weakreflist != NULL) PyObject_ClearWeakRefs((PyObject *) self); @@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type) self = (simplequeueobject *) type->tp_alloc(type, 0); if (self != NULL) { self->weakreflist = NULL; - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "can't allocate lock"); - return NULL; - } if (RingBuf_Init(&self->buf) < 0) { Py_DECREF(self); return NULL; @@ -263,6 +258,26 @@ simplequeue_new_impl(PyTypeObject *type) return (PyObject *) self; } +typedef struct { + int handed_off; + simplequeueobject *queue; + PyObject *item; +} HandoffData; + +static void +maybe_unparked_thread(HandoffData *data, PyObject **item, int has_more_waiters) +{ + if (item == NULL) { + // Didn't unpark a thread + data->handed_off = 0; + } + else { + // Successfully unparked a thread + *item = data->item; + data->handed_off = 1; + } + data->queue->threads_waiting = has_more_waiters; +} /*[clinic input] _queue.SimpleQueue.put @@ -282,15 +297,21 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, int block, PyObject *timeout) /*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/ { - /* BEGIN GIL-protected critical section */ - if (RingBuf_Put(&self->buf, item) < 0) - return NULL; - if (self->locked) { - /* A get() may be waiting, wake it up */ - self->locked = 0; - PyThread_release_lock(self->lock); + HandoffData data = { + .handed_off = 0, + .item = Py_NewRef(item), + .queue = self, + }; + if (self->threads_waiting) { + // Try to hand the item off directly if there are threads waiting + _PyParkingLot_Unpark(&self->threads_waiting, + (_Py_unpark_fn_t *)maybe_unparked_thread, &data); + } + if (!data.handed_off) { + if (RingBuf_Put(&self->buf, item) < 0) { + return NULL; + } } - /* END GIL-protected critical section */ Py_RETURN_NONE; } @@ -312,6 +333,15 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item) return _queue_SimpleQueue_put_impl(self, item, 0, Py_None); } +static PyObject * +empty_error(PyTypeObject *cls) +{ + PyObject *module = PyType_GetModule(cls); + simplequeue_state *state = simplequeue_get_state(module); + PyErr_SetNone(state->EmptyError); + return NULL; +} + /*[clinic input] _queue.SimpleQueue.get @@ -338,20 +368,12 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, /*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/ { _PyTime_t endtime = 0; - _PyTime_t timeout; - PyObject *item; - PyLockStatus r; - PY_TIMEOUT_T microseconds; - PyThreadState *tstate = PyThreadState_Get(); // XXX Use PyThread_ParseTimeoutArg(). - if (block == 0) { - /* Non-blocking */ - microseconds = 0; - } - else if (timeout_obj != Py_None) { + if (block != 0 && timeout_obj != Py_None) { /* With timeout */ + _PyTime_t timeout; if (_PyTime_FromSecondsObject(&timeout, timeout_obj, _PyTime_ROUND_CEILING) < 0) { return NULL; @@ -361,8 +383,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, "'timeout' must be a non-negative number"); return NULL; } - microseconds = _PyTime_AsMicroseconds(timeout, - _PyTime_ROUND_CEILING); + PY_TIMEOUT_T microseconds = + _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING); if (microseconds > PY_TIMEOUT_MAX) { PyErr_SetString(PyExc_OverflowError, "timeout value is too large"); @@ -370,53 +392,58 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, } endtime = _PyDeadline_Init(timeout); } - else { - /* Infinitely blocking */ - microseconds = -1; - } - /* put() signals the queue to be non-empty by releasing the lock. - * So we simply try to acquire the lock in a loop, until the condition - * (queue non-empty) becomes true. - */ - while (RingBuf_IsEmpty(&self->buf)) { - /* First a simple non-blocking try without releasing the GIL */ - r = PyThread_acquire_lock_timed(self->lock, 0, 0); - if (r == PY_LOCK_FAILURE && microseconds != 0) { - Py_BEGIN_ALLOW_THREADS - r = PyThread_acquire_lock_timed(self->lock, microseconds, 1); - Py_END_ALLOW_THREADS + for (;;) { + if (!RingBuf_IsEmpty(&self->buf)) { + return RingBuf_Get(&self->buf); } - if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) { - return NULL; - } - if (r == PY_LOCK_FAILURE) { - PyObject *module = PyType_GetModule(cls); - simplequeue_state *state = simplequeue_get_state(module); - /* Timed out */ - PyErr_SetNone(state->EmptyError); - return NULL; + if (!block) { + return empty_error(cls); } - self->locked = 1; - /* Adjust timeout for next iteration (if any) */ - if (microseconds > 0) { - timeout = _PyDeadline_Get(endtime); - microseconds = _PyTime_AsMicroseconds(timeout, - _PyTime_ROUND_CEILING); + int64_t timeout_ns = -1; + if (endtime != 0) { + timeout_ns = _PyDeadline_Get(endtime); + if (timeout_ns < 0) { + return empty_error(cls); + } } - } - /* BEGIN GIL-protected critical section */ - item = RingBuf_Get(&self->buf); - if (self->locked) { - PyThread_release_lock(self->lock); - self->locked = 0; + uint8_t waiting = 1; + self->threads_waiting = waiting; + + PyObject *item = NULL; + int st = _PyParkingLot_Park(&self->threads_waiting, &waiting, + sizeof(uint8_t), timeout_ns, &item, + /* detach */ 1); + switch (st) { + case Py_PARK_OK: { + assert(item != NULL); + return item; + } + case Py_PARK_TIMEOUT: { + return empty_error(cls); + } + case Py_PARK_INTR: { + // Interrupted + if (Py_MakePendingCalls() < 0) { + return NULL; + } + break; + } + case Py_PARK_AGAIN: { + // This should be impossible with the current implementation of + // PyParkingLot, but would be possible if critical sections / + // the GIL were released before the thread was added to the + // internal thread queue in the parking lot. + break; + } + default: { + Py_UNREACHABLE(); + } + } } - /* END GIL-protected critical section */ - - return item; } /*[clinic input] From 65c0b9099eb1bdda4c84b4e27492e2628aa00107 Mon Sep 17 00:00:00 2001 From: Matt Page Date: Tue, 16 Jan 2024 16:39:20 -0800 Subject: [PATCH 02/10] Make SimpleQueue thread-safe with the GIL disabled Methods on SimpleQueue are protected with the per-object lock. --- Modules/_queuemodule.c | 18 ++++++++++++------ Modules/clinic/_queuemodule.c.h | 24 +++++++++++++++++++++--- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index 53aa5efbedcfd7..6d496581dc1385 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -280,6 +280,7 @@ maybe_unparked_thread(HandoffData *data, PyObject **item, int has_more_waiters) } /*[clinic input] +@critical_section _queue.SimpleQueue.put item: object block: bool = True @@ -295,7 +296,7 @@ never blocks. They are provided for compatibility with the Queue class. static PyObject * _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, int block, PyObject *timeout) -/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/ +/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/ { HandoffData data = { .handed_off = 0, @@ -316,6 +317,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, } /*[clinic input] +@critical_section _queue.SimpleQueue.put_nowait item: object @@ -328,7 +330,7 @@ for compatibility with the Queue class. static PyObject * _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item) -/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/ +/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/ { return _queue_SimpleQueue_put_impl(self, item, 0, Py_None); } @@ -343,6 +345,7 @@ empty_error(PyTypeObject *cls) } /*[clinic input] +@critical_section _queue.SimpleQueue.get cls: defining_class @@ -365,7 +368,7 @@ in that case). static PyObject * _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, int block, PyObject *timeout_obj) -/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/ +/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/ { _PyTime_t endtime = 0; @@ -447,6 +450,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, } /*[clinic input] +@critical_section _queue.SimpleQueue.get_nowait cls: defining_class @@ -461,12 +465,13 @@ raise the Empty exception. static PyObject * _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self, PyTypeObject *cls) -/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/ +/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/ { return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None); } /*[clinic input] +@critical_section _queue.SimpleQueue.empty -> bool Return True if the queue is empty, False otherwise (not reliable!). @@ -474,12 +479,13 @@ Return True if the queue is empty, False otherwise (not reliable!). static int _queue_SimpleQueue_empty_impl(simplequeueobject *self) -/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/ +/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/ { return RingBuf_IsEmpty(&self->buf); } /*[clinic input] +@critical_section _queue.SimpleQueue.qsize -> Py_ssize_t Return the approximate size of the queue (not reliable!). @@ -487,7 +493,7 @@ Return the approximate size of the queue (not reliable!). static Py_ssize_t _queue_SimpleQueue_qsize_impl(simplequeueobject *self) -/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/ +/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/ { return RingBuf_Len(&self->buf); } diff --git a/Modules/clinic/_queuemodule.c.h b/Modules/clinic/_queuemodule.c.h index 8e2a430835e35f..b3b6b8e96c135e 100644 --- a/Modules/clinic/_queuemodule.c.h +++ b/Modules/clinic/_queuemodule.c.h @@ -6,6 +6,7 @@ preserve # include "pycore_gc.h" // PyGC_Head # include "pycore_runtime.h" // _Py_ID() #endif +#include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION() #include "pycore_modsupport.h" // _PyArg_NoKeywords() PyDoc_STRVAR(simplequeue_new__doc__, @@ -107,7 +108,9 @@ _queue_SimpleQueue_put(simplequeueobject *self, PyObject *const *args, Py_ssize_ } timeout = args[2]; skip_optional_pos: + Py_BEGIN_CRITICAL_SECTION(self); return_value = _queue_SimpleQueue_put_impl(self, item, block, timeout); + Py_END_CRITICAL_SECTION(); exit: return return_value; @@ -165,7 +168,9 @@ _queue_SimpleQueue_put_nowait(simplequeueobject *self, PyObject *const *args, Py goto exit; } item = args[0]; + Py_BEGIN_CRITICAL_SECTION(self); return_value = _queue_SimpleQueue_put_nowait_impl(self, item); + Py_END_CRITICAL_SECTION(); exit: return return_value; @@ -244,7 +249,9 @@ _queue_SimpleQueue_get(simplequeueobject *self, PyTypeObject *cls, PyObject *con } timeout_obj = args[1]; skip_optional_pos: + Py_BEGIN_CRITICAL_SECTION(self); return_value = _queue_SimpleQueue_get_impl(self, cls, block, timeout_obj); + Py_END_CRITICAL_SECTION(); exit: return return_value; @@ -269,11 +276,18 @@ _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self, static PyObject * _queue_SimpleQueue_get_nowait(simplequeueobject *self, PyTypeObject *cls, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) { + PyObject *return_value = NULL; + if (nargs) { PyErr_SetString(PyExc_TypeError, "get_nowait() takes no arguments"); - return NULL; + goto exit; } - return _queue_SimpleQueue_get_nowait_impl(self, cls); + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _queue_SimpleQueue_get_nowait_impl(self, cls); + Py_END_CRITICAL_SECTION(); + +exit: + return return_value; } PyDoc_STRVAR(_queue_SimpleQueue_empty__doc__, @@ -294,7 +308,9 @@ _queue_SimpleQueue_empty(simplequeueobject *self, PyObject *Py_UNUSED(ignored)) PyObject *return_value = NULL; int _return_value; + Py_BEGIN_CRITICAL_SECTION(self); _return_value = _queue_SimpleQueue_empty_impl(self); + Py_END_CRITICAL_SECTION(); if ((_return_value == -1) && PyErr_Occurred()) { goto exit; } @@ -322,7 +338,9 @@ _queue_SimpleQueue_qsize(simplequeueobject *self, PyObject *Py_UNUSED(ignored)) PyObject *return_value = NULL; Py_ssize_t _return_value; + Py_BEGIN_CRITICAL_SECTION(self); _return_value = _queue_SimpleQueue_qsize_impl(self); + Py_END_CRITICAL_SECTION(); if ((_return_value == -1) && PyErr_Occurred()) { goto exit; } @@ -331,4 +349,4 @@ _queue_SimpleQueue_qsize(simplequeueobject *self, PyObject *Py_UNUSED(ignored)) exit: return return_value; } -/*[clinic end generated code: output=457310b20cb61cf8 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=242950edc8f7dfd7 input=a9049054013a1b77]*/ From d999dd627137688563ac2fbaf3bc61b37a9f1331 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Wed, 17 Jan 2024 00:52:58 +0000 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst b/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst new file mode 100644 index 00000000000000..d59fd8d6747505 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst @@ -0,0 +1 @@ +Make ``queue.SimpleQueue`` thread safe when the GIL is disabled. Existing semantics and behavior should be preserved, but the implementation is changed to ensure thread-safety when the GIL is disabled. From 511f59f6cd7f6678dee7b48f3002c52a0c27d2e1 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Wed, 17 Jan 2024 09:49:40 +0100 Subject: [PATCH 04/10] Update Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst --- .../2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst b/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst index d59fd8d6747505..a45c7d5bbb8b72 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst @@ -1 +1 @@ -Make ``queue.SimpleQueue`` thread safe when the GIL is disabled. Existing semantics and behavior should be preserved, but the implementation is changed to ensure thread-safety when the GIL is disabled. +Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled. Existing semantics and behavior should be preserved, but the implementation is changed to ensure thread-safety when the GIL is disabled. From ad5217acce56d28cdf9ae63c7becad597cbbce31 Mon Sep 17 00:00:00 2001 From: Matt Page Date: Wed, 17 Jan 2024 16:22:20 -0800 Subject: [PATCH 05/10] Remove unnecessary overflow check We're not using `PyThread_acquire_lock_timed`. --- Modules/_queuemodule.c | 7 ------- 1 file changed, 7 deletions(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index 6d496581dc1385..76f7e042decc83 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -386,13 +386,6 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, "'timeout' must be a non-negative number"); return NULL; } - PY_TIMEOUT_T microseconds = - _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING); - if (microseconds > PY_TIMEOUT_MAX) { - PyErr_SetString(PyExc_OverflowError, - "timeout value is too large"); - return NULL; - } endtime = _PyDeadline_Init(timeout); } From d3b55477a3380d4b4ae0d65318c9cac744534366 Mon Sep 17 00:00:00 2001 From: Matt Page Date: Wed, 17 Jan 2024 16:26:16 -0800 Subject: [PATCH 06/10] Use a clearer name for field indicating whether or not threads are waiting --- Modules/_queuemodule.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index 76f7e042decc83..f866927e20e2bb 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -189,7 +189,7 @@ typedef struct { PyObject_HEAD // Are there threads waiting for items - uint8_t threads_waiting; + bool has_threads_waiting; // Items in the queue RingBuf buf; @@ -276,7 +276,7 @@ maybe_unparked_thread(HandoffData *data, PyObject **item, int has_more_waiters) *item = data->item; data->handed_off = 1; } - data->queue->threads_waiting = has_more_waiters; + data->queue->has_threads_waiting = has_more_waiters; } /*[clinic input] @@ -303,9 +303,9 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, .item = Py_NewRef(item), .queue = self, }; - if (self->threads_waiting) { + if (self->has_threads_waiting) { // Try to hand the item off directly if there are threads waiting - _PyParkingLot_Unpark(&self->threads_waiting, + _PyParkingLot_Unpark(&self->has_threads_waiting, (_Py_unpark_fn_t *)maybe_unparked_thread, &data); } if (!data.handed_off) { @@ -407,11 +407,11 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, } uint8_t waiting = 1; - self->threads_waiting = waiting; + self->has_threads_waiting = waiting; PyObject *item = NULL; - int st = _PyParkingLot_Park(&self->threads_waiting, &waiting, - sizeof(uint8_t), timeout_ns, &item, + int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting, + sizeof(bool), timeout_ns, &item, /* detach */ 1); switch (st) { case Py_PARK_OK: { From 6fef770cd5cb207cf24c4228a278bb417e19c364 Mon Sep 17 00:00:00 2001 From: Matt Page Date: Fri, 19 Jan 2024 15:56:38 -0800 Subject: [PATCH 07/10] Fix type --- Modules/_queuemodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index f866927e20e2bb..b0d1fca59c715d 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -406,7 +406,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, } } - uint8_t waiting = 1; + bool waiting = 1; self->has_threads_waiting = waiting; PyObject *item = NULL; From 98dfcf66b4990f95ac6161f2cc4bacc30377a77b Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Tue, 23 Jan 2024 11:01:16 +0100 Subject: [PATCH 08/10] Apply suggestions from code review --- .../2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst | 2 +- Modules/_queuemodule.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst b/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst index a45c7d5bbb8b72..6a39fd2f60ab81 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst @@ -1 +1 @@ -Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled. Existing semantics and behavior should be preserved, but the implementation is changed to ensure thread-safety when the GIL is disabled. +Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled. diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index b0d1fca59c715d..bd126f0ce27c3b 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -258,6 +258,7 @@ simplequeue_new_impl(PyTypeObject *type) return (PyObject *) self; } + typedef struct { int handed_off; simplequeueobject *queue; @@ -339,6 +340,7 @@ static PyObject * empty_error(PyTypeObject *cls) { PyObject *module = PyType_GetModule(cls); + assert(module != NULL); simplequeue_state *state = simplequeue_get_state(module); PyErr_SetNone(state->EmptyError); return NULL; @@ -374,7 +376,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, // XXX Use PyThread_ParseTimeoutArg(). - if (block != 0 && timeout_obj != Py_None) { + if (block != 0 && !Py_IsNone) { /* With timeout */ _PyTime_t timeout; if (_PyTime_FromSecondsObject(&timeout, From 60fd0ec54624a8642073285de1b37e62f2c0455d Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Tue, 23 Jan 2024 12:05:34 +0100 Subject: [PATCH 09/10] Fix suggestion --- Modules/_queuemodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index bd126f0ce27c3b..022c72e82dfd52 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -376,7 +376,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, // XXX Use PyThread_ParseTimeoutArg(). - if (block != 0 && !Py_IsNone) { + if (block != 0 && !Py_IsNone(timeout_obj)) { /* With timeout */ _PyTime_t timeout; if (_PyTime_FromSecondsObject(&timeout, From 80200f8337e77b1778e87c44eaffea6eb2517e41 Mon Sep 17 00:00:00 2001 From: Matt Page Date: Tue, 23 Jan 2024 10:12:00 -0800 Subject: [PATCH 10/10] Address review comments Rename parking lot callback to better reflect what it does. More precise type for whether handoff occurred. --- Modules/_queuemodule.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index 022c72e82dfd52..18b24855c52ad6 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -260,22 +260,22 @@ simplequeue_new_impl(PyTypeObject *type) } typedef struct { - int handed_off; + bool handed_off; simplequeueobject *queue; PyObject *item; } HandoffData; static void -maybe_unparked_thread(HandoffData *data, PyObject **item, int has_more_waiters) +maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters) { if (item == NULL) { - // Didn't unpark a thread - data->handed_off = 0; + // No threads were waiting + data->handed_off = false; } else { - // Successfully unparked a thread + // There was at least one waiting thread, hand off the item *item = data->item; - data->handed_off = 1; + data->handed_off = true; } data->queue->has_threads_waiting = has_more_waiters; } @@ -307,7 +307,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, if (self->has_threads_waiting) { // Try to hand the item off directly if there are threads waiting _PyParkingLot_Unpark(&self->has_threads_waiting, - (_Py_unpark_fn_t *)maybe_unparked_thread, &data); + (_Py_unpark_fn_t *)maybe_handoff_item, &data); } if (!data.handed_off) { if (RingBuf_Put(&self->buf, item) < 0) {