Branch data Line data Source code
1 : :
2 : : /* interpreters module */
3 : : /* low-level access to interpreter primitives */
4 : : #ifndef Py_BUILD_CORE_BUILTIN
5 : : # define Py_BUILD_CORE_MODULE 1
6 : : #endif
7 : :
8 : : #include "Python.h"
9 : : #include "pycore_pystate.h" // _PyThreadState_GET()
10 : : #include "pycore_interpreteridobject.h"
11 : :
12 : :
13 : : #define MODULE_NAME "_xxinterpchannels"
14 : :
15 : :
16 : : static PyInterpreterState *
17 : 0 : _get_current_interp(void)
18 : : {
19 : : // PyInterpreterState_Get() aborts if lookup fails, so don't need
20 : : // to check the result for NULL.
21 : 0 : return PyInterpreterState_Get();
22 : : }
23 : :
24 : : static PyObject *
25 : 0 : _get_current_module(void)
26 : : {
27 : 0 : PyObject *name = PyUnicode_FromString(MODULE_NAME);
28 [ # # ]: 0 : if (name == NULL) {
29 : 0 : return NULL;
30 : : }
31 : 0 : PyObject *mod = PyImport_GetModule(name);
32 : 0 : Py_DECREF(name);
33 [ # # ]: 0 : if (mod == NULL) {
34 : 0 : return NULL;
35 : : }
36 : : assert(mod != Py_None);
37 : 0 : return mod;
38 : : }
39 : :
40 : : static PyObject *
41 : 0 : get_module_from_owned_type(PyTypeObject *cls)
42 : : {
43 : : assert(cls != NULL);
44 : 0 : return _get_current_module();
45 : : // XXX Use the more efficient API now that we use heap types:
46 : : //return PyType_GetModule(cls);
47 : : }
48 : :
49 : : static struct PyModuleDef moduledef;
50 : :
51 : : static PyObject *
52 : 0 : get_module_from_type(PyTypeObject *cls)
53 : : {
54 : : assert(cls != NULL);
55 : 0 : return _get_current_module();
56 : : // XXX Use the more efficient API now that we use heap types:
57 : : //return PyType_GetModuleByDef(cls, &moduledef);
58 : : }
59 : :
60 : : static PyObject *
61 : 5 : add_new_exception(PyObject *mod, const char *name, PyObject *base)
62 : : {
63 : : assert(!PyObject_HasAttrString(mod, name));
64 : 5 : PyObject *exctype = PyErr_NewException(name, base, NULL);
65 [ - + ]: 5 : if (exctype == NULL) {
66 : 0 : return NULL;
67 : : }
68 : 5 : int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
69 [ - + ]: 5 : if (res < 0) {
70 : 0 : Py_DECREF(exctype);
71 : 0 : return NULL;
72 : : }
73 : 5 : return exctype;
74 : : }
75 : :
76 : : #define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
77 : : add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
78 : :
79 : : static PyTypeObject *
80 : 1 : add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
81 : : {
82 : 1 : PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
83 : : NULL, mod, spec, NULL);
84 [ - + ]: 1 : if (cls == NULL) {
85 : 0 : return NULL;
86 : : }
87 [ - + ]: 1 : if (PyModule_AddType(mod, cls) < 0) {
88 : 0 : Py_DECREF(cls);
89 : 0 : return NULL;
90 : : }
91 [ + - ]: 1 : if (shared != NULL) {
92 [ - + ]: 1 : if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
93 : 0 : Py_DECREF(cls);
94 : 0 : return NULL;
95 : : }
96 : : }
97 : 1 : return cls;
98 : : }
99 : :
100 : : static int
101 : 0 : _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
102 : : {
103 : : PyObject *exc;
104 [ # # ]: 0 : if (ignoreexc) {
105 : 0 : exc = PyErr_GetRaisedException();
106 : : }
107 : 0 : int res = _PyCrossInterpreterData_Release(data);
108 [ # # ]: 0 : if (res < 0) {
109 : : // XXX Fix this!
110 : : /* The owning interpreter is already destroyed.
111 : : * Ideally, this shouldn't ever happen. When an interpreter is
112 : : * about to be destroyed, we should clear out all of its objects
113 : : * from every channel associated with that interpreter.
114 : : * For now we hack around that to resolve refleaks, by decref'ing
115 : : * the released object here, even if its the wrong interpreter.
116 : : * The owning interpreter has already been destroyed
117 : : * so we should be okay, especially since the currently
118 : : * shareable types are all very basic, with no GC.
119 : : * That said, it becomes much messier once interpreters
120 : : * no longer share a GIL, so this needs to be fixed before then. */
121 : 0 : _PyCrossInterpreterData_Clear(NULL, data);
122 [ # # ]: 0 : if (ignoreexc) {
123 : : // XXX Emit a warning?
124 : 0 : PyErr_Clear();
125 : : }
126 : : }
127 [ # # ]: 0 : if (ignoreexc) {
128 : 0 : PyErr_SetRaisedException(exc);
129 : : }
130 : 0 : return res;
131 : : }
132 : :
133 : :
134 : : /* module state *************************************************************/
135 : :
136 : : typedef struct {
137 : : /* heap types */
138 : : PyTypeObject *ChannelIDType;
139 : :
140 : : /* exceptions */
141 : : PyObject *ChannelError;
142 : : PyObject *ChannelNotFoundError;
143 : : PyObject *ChannelClosedError;
144 : : PyObject *ChannelEmptyError;
145 : : PyObject *ChannelNotEmptyError;
146 : : } module_state;
147 : :
148 : : static inline module_state *
149 : 10 : get_module_state(PyObject *mod)
150 : : {
151 : : assert(mod != NULL);
152 : 10 : module_state *state = PyModule_GetState(mod);
153 : : assert(state != NULL);
154 : 10 : return state;
155 : : }
156 : :
157 : : static int
158 : 6 : traverse_module_state(module_state *state, visitproc visit, void *arg)
159 : : {
160 : : /* heap types */
161 [ + - - + ]: 6 : Py_VISIT(state->ChannelIDType);
162 : :
163 : : /* exceptions */
164 [ + - - + ]: 6 : Py_VISIT(state->ChannelError);
165 [ + - - + ]: 6 : Py_VISIT(state->ChannelNotFoundError);
166 [ + - - + ]: 6 : Py_VISIT(state->ChannelClosedError);
167 [ + - - + ]: 6 : Py_VISIT(state->ChannelEmptyError);
168 [ + - - + ]: 6 : Py_VISIT(state->ChannelNotEmptyError);
169 : :
170 : 6 : return 0;
171 : : }
172 : :
173 : : static int
174 : 2 : clear_module_state(module_state *state)
175 : : {
176 : : /* heap types */
177 [ + + ]: 2 : if (state->ChannelIDType != NULL) {
178 : 1 : (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
179 : : }
180 [ + + ]: 2 : Py_CLEAR(state->ChannelIDType);
181 : :
182 : : /* exceptions */
183 [ + + ]: 2 : Py_CLEAR(state->ChannelError);
184 [ + + ]: 2 : Py_CLEAR(state->ChannelNotFoundError);
185 [ + + ]: 2 : Py_CLEAR(state->ChannelClosedError);
186 [ + + ]: 2 : Py_CLEAR(state->ChannelEmptyError);
187 [ + + ]: 2 : Py_CLEAR(state->ChannelNotEmptyError);
188 : :
189 : 2 : return 0;
190 : : }
191 : :
192 : :
193 : : /* channel-specific code ****************************************************/
194 : :
195 : : #define CHANNEL_SEND 1
196 : : #define CHANNEL_BOTH 0
197 : : #define CHANNEL_RECV -1
198 : :
199 : : /* channel errors */
200 : :
201 : : #define ERR_CHANNEL_NOT_FOUND -2
202 : : #define ERR_CHANNEL_CLOSED -3
203 : : #define ERR_CHANNEL_INTERP_CLOSED -4
204 : : #define ERR_CHANNEL_EMPTY -5
205 : : #define ERR_CHANNEL_NOT_EMPTY -6
206 : : #define ERR_CHANNEL_MUTEX_INIT -7
207 : : #define ERR_CHANNELS_MUTEX_INIT -8
208 : : #define ERR_NO_NEXT_CHANNEL_ID -9
209 : :
210 : : static int
211 : 1 : exceptions_init(PyObject *mod)
212 : : {
213 : 1 : module_state *state = get_module_state(mod);
214 [ - + ]: 1 : if (state == NULL) {
215 : 0 : return -1;
216 : : }
217 : :
218 : : #define ADD(NAME, BASE) \
219 : : do { \
220 : : assert(state->NAME == NULL); \
221 : : state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
222 : : if (state->NAME == NULL) { \
223 : : return -1; \
224 : : } \
225 : : } while (0)
226 : :
227 : : // A channel-related operation failed.
228 [ - + ]: 1 : ADD(ChannelError, PyExc_RuntimeError);
229 : : // An operation tried to use a channel that doesn't exist.
230 [ - + ]: 1 : ADD(ChannelNotFoundError, state->ChannelError);
231 : : // An operation tried to use a closed channel.
232 [ - + ]: 1 : ADD(ChannelClosedError, state->ChannelError);
233 : : // An operation tried to pop from an empty channel.
234 [ - + ]: 1 : ADD(ChannelEmptyError, state->ChannelError);
235 : : // An operation tried to close a non-empty channel.
236 [ - + ]: 1 : ADD(ChannelNotEmptyError, state->ChannelError);
237 : : #undef ADD
238 : :
239 : 1 : return 0;
240 : : }
241 : :
242 : : static int
243 : 0 : handle_channel_error(int err, PyObject *mod, int64_t cid)
244 : : {
245 [ # # ]: 0 : if (err == 0) {
246 : : assert(!PyErr_Occurred());
247 : 0 : return 0;
248 : : }
249 : : assert(err < 0);
250 : 0 : module_state *state = get_module_state(mod);
251 : : assert(state != NULL);
252 [ # # ]: 0 : if (err == ERR_CHANNEL_NOT_FOUND) {
253 : 0 : PyErr_Format(state->ChannelNotFoundError,
254 : : "channel %" PRId64 " not found", cid);
255 : : }
256 [ # # ]: 0 : else if (err == ERR_CHANNEL_CLOSED) {
257 : 0 : PyErr_Format(state->ChannelClosedError,
258 : : "channel %" PRId64 " is closed", cid);
259 : : }
260 [ # # ]: 0 : else if (err == ERR_CHANNEL_INTERP_CLOSED) {
261 : 0 : PyErr_Format(state->ChannelClosedError,
262 : : "channel %" PRId64 " is already closed", cid);
263 : : }
264 [ # # ]: 0 : else if (err == ERR_CHANNEL_EMPTY) {
265 : 0 : PyErr_Format(state->ChannelEmptyError,
266 : : "channel %" PRId64 " is empty", cid);
267 : : }
268 [ # # ]: 0 : else if (err == ERR_CHANNEL_NOT_EMPTY) {
269 : 0 : PyErr_Format(state->ChannelNotEmptyError,
270 : : "channel %" PRId64 " may not be closed "
271 : : "if not empty (try force=True)",
272 : : cid);
273 : : }
274 [ # # ]: 0 : else if (err == ERR_CHANNEL_MUTEX_INIT) {
275 : 0 : PyErr_SetString(state->ChannelError,
276 : : "can't initialize mutex for new channel");
277 : : }
278 [ # # ]: 0 : else if (err == ERR_CHANNELS_MUTEX_INIT) {
279 : 0 : PyErr_SetString(state->ChannelError,
280 : : "can't initialize mutex for channel management");
281 : : }
282 [ # # ]: 0 : else if (err == ERR_NO_NEXT_CHANNEL_ID) {
283 : 0 : PyErr_SetString(state->ChannelError,
284 : : "failed to get a channel ID");
285 : : }
286 : : else {
287 : : assert(PyErr_Occurred());
288 : : }
289 : 0 : return 1;
290 : : }
291 : :
292 : : /* the channel queue */
293 : :
294 : : struct _channelitem;
295 : :
296 : : typedef struct _channelitem {
297 : : _PyCrossInterpreterData *data;
298 : : struct _channelitem *next;
299 : : } _channelitem;
300 : :
301 : : static _channelitem *
302 : 0 : _channelitem_new(void)
303 : : {
304 : 0 : _channelitem *item = PyMem_NEW(_channelitem, 1);
305 [ # # ]: 0 : if (item == NULL) {
306 : 0 : PyErr_NoMemory();
307 : 0 : return NULL;
308 : : }
309 : 0 : item->data = NULL;
310 : 0 : item->next = NULL;
311 : 0 : return item;
312 : : }
313 : :
314 : : static void
315 : 0 : _channelitem_clear(_channelitem *item)
316 : : {
317 [ # # ]: 0 : if (item->data != NULL) {
318 : 0 : (void)_release_xid_data(item->data, 1);
319 : 0 : PyMem_Free(item->data);
320 : 0 : item->data = NULL;
321 : : }
322 : 0 : item->next = NULL;
323 : 0 : }
324 : :
325 : : static void
326 : 0 : _channelitem_free(_channelitem *item)
327 : : {
328 : 0 : _channelitem_clear(item);
329 : 0 : PyMem_Free(item);
330 : 0 : }
331 : :
332 : : static void
333 : 0 : _channelitem_free_all(_channelitem *item)
334 : : {
335 [ # # ]: 0 : while (item != NULL) {
336 : 0 : _channelitem *last = item;
337 : 0 : item = item->next;
338 : 0 : _channelitem_free(last);
339 : : }
340 : 0 : }
341 : :
342 : : static _PyCrossInterpreterData *
343 : 0 : _channelitem_popped(_channelitem *item)
344 : : {
345 : 0 : _PyCrossInterpreterData *data = item->data;
346 : 0 : item->data = NULL;
347 : 0 : _channelitem_free(item);
348 : 0 : return data;
349 : : }
350 : :
351 : : typedef struct _channelqueue {
352 : : int64_t count;
353 : : _channelitem *first;
354 : : _channelitem *last;
355 : : } _channelqueue;
356 : :
357 : : static _channelqueue *
358 : 0 : _channelqueue_new(void)
359 : : {
360 : 0 : _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
361 [ # # ]: 0 : if (queue == NULL) {
362 : 0 : PyErr_NoMemory();
363 : 0 : return NULL;
364 : : }
365 : 0 : queue->count = 0;
366 : 0 : queue->first = NULL;
367 : 0 : queue->last = NULL;
368 : 0 : return queue;
369 : : }
370 : :
371 : : static void
372 : 0 : _channelqueue_clear(_channelqueue *queue)
373 : : {
374 : 0 : _channelitem_free_all(queue->first);
375 : 0 : queue->count = 0;
376 : 0 : queue->first = NULL;
377 : 0 : queue->last = NULL;
378 : 0 : }
379 : :
380 : : static void
381 : 0 : _channelqueue_free(_channelqueue *queue)
382 : : {
383 : 0 : _channelqueue_clear(queue);
384 : 0 : PyMem_Free(queue);
385 : 0 : }
386 : :
387 : : static int
388 : 0 : _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
389 : : {
390 : 0 : _channelitem *item = _channelitem_new();
391 [ # # ]: 0 : if (item == NULL) {
392 : 0 : return -1;
393 : : }
394 : 0 : item->data = data;
395 : :
396 : 0 : queue->count += 1;
397 [ # # ]: 0 : if (queue->first == NULL) {
398 : 0 : queue->first = item;
399 : : }
400 : : else {
401 : 0 : queue->last->next = item;
402 : : }
403 : 0 : queue->last = item;
404 : 0 : return 0;
405 : : }
406 : :
407 : : static _PyCrossInterpreterData *
408 : 0 : _channelqueue_get(_channelqueue *queue)
409 : : {
410 : 0 : _channelitem *item = queue->first;
411 [ # # ]: 0 : if (item == NULL) {
412 : 0 : return NULL;
413 : : }
414 : 0 : queue->first = item->next;
415 [ # # ]: 0 : if (queue->last == item) {
416 : 0 : queue->last = NULL;
417 : : }
418 : 0 : queue->count -= 1;
419 : :
420 : 0 : return _channelitem_popped(item);
421 : : }
422 : :
423 : : /* channel-interpreter associations */
424 : :
425 : : struct _channelend;
426 : :
427 : : typedef struct _channelend {
428 : : struct _channelend *next;
429 : : int64_t interp;
430 : : int open;
431 : : } _channelend;
432 : :
433 : : static _channelend *
434 : 0 : _channelend_new(int64_t interp)
435 : : {
436 : 0 : _channelend *end = PyMem_NEW(_channelend, 1);
437 [ # # ]: 0 : if (end == NULL) {
438 : 0 : PyErr_NoMemory();
439 : 0 : return NULL;
440 : : }
441 : 0 : end->next = NULL;
442 : 0 : end->interp = interp;
443 : 0 : end->open = 1;
444 : 0 : return end;
445 : : }
446 : :
447 : : static void
448 : 0 : _channelend_free(_channelend *end)
449 : : {
450 : 0 : PyMem_Free(end);
451 : 0 : }
452 : :
453 : : static void
454 : 0 : _channelend_free_all(_channelend *end)
455 : : {
456 [ # # ]: 0 : while (end != NULL) {
457 : 0 : _channelend *last = end;
458 : 0 : end = end->next;
459 : 0 : _channelend_free(last);
460 : : }
461 : 0 : }
462 : :
463 : : static _channelend *
464 : 0 : _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
465 : : {
466 : 0 : _channelend *prev = NULL;
467 : 0 : _channelend *end = first;
468 [ # # ]: 0 : while (end != NULL) {
469 [ # # ]: 0 : if (end->interp == interp) {
470 : 0 : break;
471 : : }
472 : 0 : prev = end;
473 : 0 : end = end->next;
474 : : }
475 [ # # ]: 0 : if (pprev != NULL) {
476 : 0 : *pprev = prev;
477 : : }
478 : 0 : return end;
479 : : }
480 : :
481 : : typedef struct _channelassociations {
482 : : // Note that the list entries are never removed for interpreter
483 : : // for which the channel is closed. This should not be a problem in
484 : : // practice. Also, a channel isn't automatically closed when an
485 : : // interpreter is destroyed.
486 : : int64_t numsendopen;
487 : : int64_t numrecvopen;
488 : : _channelend *send;
489 : : _channelend *recv;
490 : : } _channelends;
491 : :
492 : : static _channelends *
493 : 0 : _channelends_new(void)
494 : : {
495 : 0 : _channelends *ends = PyMem_NEW(_channelends, 1);
496 [ # # ]: 0 : if (ends== NULL) {
497 : 0 : return NULL;
498 : : }
499 : 0 : ends->numsendopen = 0;
500 : 0 : ends->numrecvopen = 0;
501 : 0 : ends->send = NULL;
502 : 0 : ends->recv = NULL;
503 : 0 : return ends;
504 : : }
505 : :
506 : : static void
507 : 0 : _channelends_clear(_channelends *ends)
508 : : {
509 : 0 : _channelend_free_all(ends->send);
510 : 0 : ends->send = NULL;
511 : 0 : ends->numsendopen = 0;
512 : :
513 : 0 : _channelend_free_all(ends->recv);
514 : 0 : ends->recv = NULL;
515 : 0 : ends->numrecvopen = 0;
516 : 0 : }
517 : :
518 : : static void
519 : 0 : _channelends_free(_channelends *ends)
520 : : {
521 : 0 : _channelends_clear(ends);
522 : 0 : PyMem_Free(ends);
523 : 0 : }
524 : :
525 : : static _channelend *
526 : 0 : _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
527 : : int send)
528 : : {
529 : 0 : _channelend *end = _channelend_new(interp);
530 [ # # ]: 0 : if (end == NULL) {
531 : 0 : return NULL;
532 : : }
533 : :
534 [ # # ]: 0 : if (prev == NULL) {
535 [ # # ]: 0 : if (send) {
536 : 0 : ends->send = end;
537 : : }
538 : : else {
539 : 0 : ends->recv = end;
540 : : }
541 : : }
542 : : else {
543 : 0 : prev->next = end;
544 : : }
545 [ # # ]: 0 : if (send) {
546 : 0 : ends->numsendopen += 1;
547 : : }
548 : : else {
549 : 0 : ends->numrecvopen += 1;
550 : : }
551 : 0 : return end;
552 : : }
553 : :
554 : : static int
555 : 0 : _channelends_associate(_channelends *ends, int64_t interp, int send)
556 : : {
557 : : _channelend *prev;
558 [ # # ]: 0 : _channelend *end = _channelend_find(send ? ends->send : ends->recv,
559 : : interp, &prev);
560 [ # # ]: 0 : if (end != NULL) {
561 [ # # ]: 0 : if (!end->open) {
562 : 0 : return ERR_CHANNEL_CLOSED;
563 : : }
564 : : // already associated
565 : 0 : return 0;
566 : : }
567 [ # # ]: 0 : if (_channelends_add(ends, prev, interp, send) == NULL) {
568 : 0 : return -1;
569 : : }
570 : 0 : return 0;
571 : : }
572 : :
573 : : static int
574 : 0 : _channelends_is_open(_channelends *ends)
575 : : {
576 [ # # # # ]: 0 : if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
577 : 0 : return 1;
578 : : }
579 [ # # # # ]: 0 : if (ends->send == NULL && ends->recv == NULL) {
580 : 0 : return 1;
581 : : }
582 : 0 : return 0;
583 : : }
584 : :
585 : : static void
586 : 0 : _channelends_close_end(_channelends *ends, _channelend *end, int send)
587 : : {
588 : 0 : end->open = 0;
589 [ # # ]: 0 : if (send) {
590 : 0 : ends->numsendopen -= 1;
591 : : }
592 : : else {
593 : 0 : ends->numrecvopen -= 1;
594 : : }
595 : 0 : }
596 : :
597 : : static int
598 : 0 : _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
599 : : {
600 : : _channelend *prev;
601 : : _channelend *end;
602 [ # # ]: 0 : if (which >= 0) { // send/both
603 : 0 : end = _channelend_find(ends->send, interp, &prev);
604 [ # # ]: 0 : if (end == NULL) {
605 : : // never associated so add it
606 : 0 : end = _channelends_add(ends, prev, interp, 1);
607 [ # # ]: 0 : if (end == NULL) {
608 : 0 : return -1;
609 : : }
610 : : }
611 : 0 : _channelends_close_end(ends, end, 1);
612 : : }
613 [ # # ]: 0 : if (which <= 0) { // recv/both
614 : 0 : end = _channelend_find(ends->recv, interp, &prev);
615 [ # # ]: 0 : if (end == NULL) {
616 : : // never associated so add it
617 : 0 : end = _channelends_add(ends, prev, interp, 0);
618 [ # # ]: 0 : if (end == NULL) {
619 : 0 : return -1;
620 : : }
621 : : }
622 : 0 : _channelends_close_end(ends, end, 0);
623 : : }
624 : 0 : return 0;
625 : : }
626 : :
627 : : static void
628 : 0 : _channelends_close_all(_channelends *ends, int which, int force)
629 : : {
630 : : // XXX Handle the ends.
631 : : // XXX Handle force is True.
632 : :
633 : : // Ensure all the "send"-associated interpreters are closed.
634 : : _channelend *end;
635 [ # # ]: 0 : for (end = ends->send; end != NULL; end = end->next) {
636 : 0 : _channelends_close_end(ends, end, 1);
637 : : }
638 : :
639 : : // Ensure all the "recv"-associated interpreters are closed.
640 [ # # ]: 0 : for (end = ends->recv; end != NULL; end = end->next) {
641 : 0 : _channelends_close_end(ends, end, 0);
642 : : }
643 : 0 : }
644 : :
645 : : /* channels */
646 : :
647 : : struct _channel;
648 : : struct _channel_closing;
649 : : static void _channel_clear_closing(struct _channel *);
650 : : static void _channel_finish_closing(struct _channel *);
651 : :
652 : : typedef struct _channel {
653 : : PyThread_type_lock mutex;
654 : : _channelqueue *queue;
655 : : _channelends *ends;
656 : : int open;
657 : : struct _channel_closing *closing;
658 : : } _PyChannelState;
659 : :
660 : : static _PyChannelState *
661 : 0 : _channel_new(PyThread_type_lock mutex)
662 : : {
663 : 0 : _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
664 [ # # ]: 0 : if (chan == NULL) {
665 : 0 : return NULL;
666 : : }
667 : 0 : chan->mutex = mutex;
668 : 0 : chan->queue = _channelqueue_new();
669 [ # # ]: 0 : if (chan->queue == NULL) {
670 : 0 : PyMem_Free(chan);
671 : 0 : return NULL;
672 : : }
673 : 0 : chan->ends = _channelends_new();
674 [ # # ]: 0 : if (chan->ends == NULL) {
675 : 0 : _channelqueue_free(chan->queue);
676 : 0 : PyMem_Free(chan);
677 : 0 : return NULL;
678 : : }
679 : 0 : chan->open = 1;
680 : 0 : chan->closing = NULL;
681 : 0 : return chan;
682 : : }
683 : :
684 : : static void
685 : 0 : _channel_free(_PyChannelState *chan)
686 : : {
687 : 0 : _channel_clear_closing(chan);
688 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
689 : 0 : _channelqueue_free(chan->queue);
690 : 0 : _channelends_free(chan->ends);
691 : 0 : PyThread_release_lock(chan->mutex);
692 : :
693 : 0 : PyThread_free_lock(chan->mutex);
694 : 0 : PyMem_Free(chan);
695 : 0 : }
696 : :
697 : : static int
698 : 0 : _channel_add(_PyChannelState *chan, int64_t interp,
699 : : _PyCrossInterpreterData *data)
700 : : {
701 : 0 : int res = -1;
702 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
703 : :
704 [ # # ]: 0 : if (!chan->open) {
705 : 0 : res = ERR_CHANNEL_CLOSED;
706 : 0 : goto done;
707 : : }
708 [ # # ]: 0 : if (_channelends_associate(chan->ends, interp, 1) != 0) {
709 : 0 : res = ERR_CHANNEL_INTERP_CLOSED;
710 : 0 : goto done;
711 : : }
712 : :
713 [ # # ]: 0 : if (_channelqueue_put(chan->queue, data) != 0) {
714 : 0 : goto done;
715 : : }
716 : :
717 : 0 : res = 0;
718 : 0 : done:
719 : 0 : PyThread_release_lock(chan->mutex);
720 : 0 : return res;
721 : : }
722 : :
723 : : static int
724 : 0 : _channel_next(_PyChannelState *chan, int64_t interp,
725 : : _PyCrossInterpreterData **res)
726 : : {
727 : 0 : int err = 0;
728 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
729 : :
730 [ # # ]: 0 : if (!chan->open) {
731 : 0 : err = ERR_CHANNEL_CLOSED;
732 : 0 : goto done;
733 : : }
734 [ # # ]: 0 : if (_channelends_associate(chan->ends, interp, 0) != 0) {
735 : 0 : err = ERR_CHANNEL_INTERP_CLOSED;
736 : 0 : goto done;
737 : : }
738 : :
739 : 0 : _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
740 [ # # # # : 0 : if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
# # ]
741 : 0 : chan->open = 0;
742 : : }
743 : 0 : *res = data;
744 : :
745 : 0 : done:
746 : 0 : PyThread_release_lock(chan->mutex);
747 [ # # ]: 0 : if (chan->queue->count == 0) {
748 : 0 : _channel_finish_closing(chan);
749 : : }
750 : 0 : return err;
751 : : }
752 : :
753 : : static int
754 : 0 : _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
755 : : {
756 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
757 : :
758 : 0 : int res = -1;
759 [ # # ]: 0 : if (!chan->open) {
760 : 0 : res = ERR_CHANNEL_CLOSED;
761 : 0 : goto done;
762 : : }
763 : :
764 [ # # ]: 0 : if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
765 : 0 : goto done;
766 : : }
767 : 0 : chan->open = _channelends_is_open(chan->ends);
768 : :
769 : 0 : res = 0;
770 : 0 : done:
771 : 0 : PyThread_release_lock(chan->mutex);
772 : 0 : return res;
773 : : }
774 : :
775 : : static int
776 : 0 : _channel_close_all(_PyChannelState *chan, int end, int force)
777 : : {
778 : 0 : int res = -1;
779 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
780 : :
781 [ # # ]: 0 : if (!chan->open) {
782 : 0 : res = ERR_CHANNEL_CLOSED;
783 : 0 : goto done;
784 : : }
785 : :
786 [ # # # # ]: 0 : if (!force && chan->queue->count > 0) {
787 : 0 : res = ERR_CHANNEL_NOT_EMPTY;
788 : 0 : goto done;
789 : : }
790 : :
791 : 0 : chan->open = 0;
792 : :
793 : : // We *could* also just leave these in place, since we've marked
794 : : // the channel as closed already.
795 : 0 : _channelends_close_all(chan->ends, end, force);
796 : :
797 : 0 : res = 0;
798 : 0 : done:
799 : 0 : PyThread_release_lock(chan->mutex);
800 : 0 : return res;
801 : : }
802 : :
803 : : /* the set of channels */
804 : :
805 : : struct _channelref;
806 : :
807 : : typedef struct _channelref {
808 : : int64_t id;
809 : : _PyChannelState *chan;
810 : : struct _channelref *next;
811 : : Py_ssize_t objcount;
812 : : } _channelref;
813 : :
814 : : static _channelref *
815 : 0 : _channelref_new(int64_t id, _PyChannelState *chan)
816 : : {
817 : 0 : _channelref *ref = PyMem_NEW(_channelref, 1);
818 [ # # ]: 0 : if (ref == NULL) {
819 : 0 : return NULL;
820 : : }
821 : 0 : ref->id = id;
822 : 0 : ref->chan = chan;
823 : 0 : ref->next = NULL;
824 : 0 : ref->objcount = 0;
825 : 0 : return ref;
826 : : }
827 : :
828 : : //static void
829 : : //_channelref_clear(_channelref *ref)
830 : : //{
831 : : // ref->id = -1;
832 : : // ref->chan = NULL;
833 : : // ref->next = NULL;
834 : : // ref->objcount = 0;
835 : : //}
836 : :
837 : : static void
838 : 0 : _channelref_free(_channelref *ref)
839 : : {
840 [ # # ]: 0 : if (ref->chan != NULL) {
841 : 0 : _channel_clear_closing(ref->chan);
842 : : }
843 : : //_channelref_clear(ref);
844 : 0 : PyMem_Free(ref);
845 : 0 : }
846 : :
847 : : static _channelref *
848 : 0 : _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
849 : : {
850 : 0 : _channelref *prev = NULL;
851 : 0 : _channelref *ref = first;
852 [ # # ]: 0 : while (ref != NULL) {
853 [ # # ]: 0 : if (ref->id == id) {
854 : 0 : break;
855 : : }
856 : 0 : prev = ref;
857 : 0 : ref = ref->next;
858 : : }
859 [ # # ]: 0 : if (pprev != NULL) {
860 : 0 : *pprev = prev;
861 : : }
862 : 0 : return ref;
863 : : }
864 : :
865 : : typedef struct _channels {
866 : : PyThread_type_lock mutex;
867 : : _channelref *head;
868 : : int64_t numopen;
869 : : int64_t next_id;
870 : : } _channels;
871 : :
872 : : static void
873 : 1 : _channels_init(_channels *channels, PyThread_type_lock mutex)
874 : : {
875 : 1 : channels->mutex = mutex;
876 : 1 : channels->head = NULL;
877 : 1 : channels->numopen = 0;
878 : 1 : channels->next_id = 0;
879 : 1 : }
880 : :
881 : : static void
882 : 1 : _channels_fini(_channels *channels)
883 : : {
884 : : assert(channels->numopen == 0);
885 : : assert(channels->head == NULL);
886 [ + - ]: 1 : if (channels->mutex != NULL) {
887 : 1 : PyThread_free_lock(channels->mutex);
888 : 1 : channels->mutex = NULL;
889 : : }
890 : 1 : }
891 : :
892 : : static int64_t
893 : 0 : _channels_next_id(_channels *channels) // needs lock
894 : : {
895 : 0 : int64_t id = channels->next_id;
896 [ # # ]: 0 : if (id < 0) {
897 : : /* overflow */
898 : 0 : return -1;
899 : : }
900 : 0 : channels->next_id += 1;
901 : 0 : return id;
902 : : }
903 : :
904 : : static int
905 : 0 : _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
906 : : _PyChannelState **res)
907 : : {
908 : 0 : int err = -1;
909 : 0 : _PyChannelState *chan = NULL;
910 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
911 [ # # ]: 0 : if (pmutex != NULL) {
912 : 0 : *pmutex = NULL;
913 : : }
914 : :
915 : 0 : _channelref *ref = _channelref_find(channels->head, id, NULL);
916 [ # # ]: 0 : if (ref == NULL) {
917 : 0 : err = ERR_CHANNEL_NOT_FOUND;
918 : 0 : goto done;
919 : : }
920 [ # # # # ]: 0 : if (ref->chan == NULL || !ref->chan->open) {
921 : 0 : err = ERR_CHANNEL_CLOSED;
922 : 0 : goto done;
923 : : }
924 : :
925 [ # # ]: 0 : if (pmutex != NULL) {
926 : : // The mutex will be closed by the caller.
927 : 0 : *pmutex = channels->mutex;
928 : : }
929 : :
930 : 0 : chan = ref->chan;
931 : 0 : err = 0;
932 : :
933 : 0 : done:
934 [ # # # # ]: 0 : if (pmutex == NULL || *pmutex == NULL) {
935 : 0 : PyThread_release_lock(channels->mutex);
936 : : }
937 : 0 : *res = chan;
938 : 0 : return err;
939 : : }
940 : :
941 : : static int64_t
942 : 0 : _channels_add(_channels *channels, _PyChannelState *chan)
943 : : {
944 : 0 : int64_t cid = -1;
945 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
946 : :
947 : : // Create a new ref.
948 : 0 : int64_t id = _channels_next_id(channels);
949 [ # # ]: 0 : if (id < 0) {
950 : 0 : cid = ERR_NO_NEXT_CHANNEL_ID;
951 : 0 : goto done;
952 : : }
953 : 0 : _channelref *ref = _channelref_new(id, chan);
954 [ # # ]: 0 : if (ref == NULL) {
955 : 0 : goto done;
956 : : }
957 : :
958 : : // Add it to the list.
959 : : // We assume that the channel is a new one (not already in the list).
960 : 0 : ref->next = channels->head;
961 : 0 : channels->head = ref;
962 : 0 : channels->numopen += 1;
963 : :
964 : 0 : cid = id;
965 : 0 : done:
966 : 0 : PyThread_release_lock(channels->mutex);
967 : 0 : return cid;
968 : : }
969 : :
970 : : /* forward */
971 : : static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
972 : :
973 : : static int
974 : 0 : _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
975 : : int end, int force)
976 : : {
977 : 0 : int res = -1;
978 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
979 [ # # ]: 0 : if (pchan != NULL) {
980 : 0 : *pchan = NULL;
981 : : }
982 : :
983 : 0 : _channelref *ref = _channelref_find(channels->head, cid, NULL);
984 [ # # ]: 0 : if (ref == NULL) {
985 : 0 : res = ERR_CHANNEL_NOT_FOUND;
986 : 0 : goto done;
987 : : }
988 : :
989 [ # # ]: 0 : if (ref->chan == NULL) {
990 : 0 : res = ERR_CHANNEL_CLOSED;
991 : 0 : goto done;
992 : : }
993 [ # # # # : 0 : else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
# # ]
994 : 0 : res = ERR_CHANNEL_CLOSED;
995 : 0 : goto done;
996 : : }
997 : : else {
998 : 0 : int err = _channel_close_all(ref->chan, end, force);
999 [ # # ]: 0 : if (err != 0) {
1000 [ # # # # ]: 0 : if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
1001 [ # # ]: 0 : if (ref->chan->closing != NULL) {
1002 : 0 : res = ERR_CHANNEL_CLOSED;
1003 : 0 : goto done;
1004 : : }
1005 : : // Mark the channel as closing and return. The channel
1006 : : // will be cleaned up in _channel_next().
1007 : 0 : PyErr_Clear();
1008 : 0 : int err = _channel_set_closing(ref, channels->mutex);
1009 [ # # ]: 0 : if (err != 0) {
1010 : 0 : res = err;
1011 : 0 : goto done;
1012 : : }
1013 [ # # ]: 0 : if (pchan != NULL) {
1014 : 0 : *pchan = ref->chan;
1015 : : }
1016 : 0 : res = 0;
1017 : : }
1018 : : else {
1019 : 0 : res = err;
1020 : : }
1021 : 0 : goto done;
1022 : : }
1023 [ # # ]: 0 : if (pchan != NULL) {
1024 : 0 : *pchan = ref->chan;
1025 : : }
1026 : : else {
1027 : 0 : _channel_free(ref->chan);
1028 : : }
1029 : 0 : ref->chan = NULL;
1030 : : }
1031 : :
1032 : 0 : res = 0;
1033 : 0 : done:
1034 : 0 : PyThread_release_lock(channels->mutex);
1035 : 0 : return res;
1036 : : }
1037 : :
1038 : : static void
1039 : 0 : _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1040 : : _PyChannelState **pchan)
1041 : : {
1042 [ # # ]: 0 : if (ref == channels->head) {
1043 : 0 : channels->head = ref->next;
1044 : : }
1045 : : else {
1046 : 0 : prev->next = ref->next;
1047 : : }
1048 : 0 : channels->numopen -= 1;
1049 : :
1050 [ # # ]: 0 : if (pchan != NULL) {
1051 : 0 : *pchan = ref->chan;
1052 : : }
1053 : 0 : _channelref_free(ref);
1054 : 0 : }
1055 : :
1056 : : static int
1057 : 0 : _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1058 : : {
1059 : 0 : int res = -1;
1060 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1061 : :
1062 [ # # ]: 0 : if (pchan != NULL) {
1063 : 0 : *pchan = NULL;
1064 : : }
1065 : :
1066 : 0 : _channelref *prev = NULL;
1067 : 0 : _channelref *ref = _channelref_find(channels->head, id, &prev);
1068 [ # # ]: 0 : if (ref == NULL) {
1069 : 0 : res = ERR_CHANNEL_NOT_FOUND;
1070 : 0 : goto done;
1071 : : }
1072 : :
1073 : 0 : _channels_remove_ref(channels, ref, prev, pchan);
1074 : :
1075 : 0 : res = 0;
1076 : 0 : done:
1077 : 0 : PyThread_release_lock(channels->mutex);
1078 : 0 : return res;
1079 : : }
1080 : :
1081 : : static int
1082 : 0 : _channels_add_id_object(_channels *channels, int64_t id)
1083 : : {
1084 : 0 : int res = -1;
1085 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1086 : :
1087 : 0 : _channelref *ref = _channelref_find(channels->head, id, NULL);
1088 [ # # ]: 0 : if (ref == NULL) {
1089 : 0 : res = ERR_CHANNEL_NOT_FOUND;
1090 : 0 : goto done;
1091 : : }
1092 : 0 : ref->objcount += 1;
1093 : :
1094 : 0 : res = 0;
1095 : 0 : done:
1096 : 0 : PyThread_release_lock(channels->mutex);
1097 : 0 : return res;
1098 : : }
1099 : :
1100 : : static void
1101 : 0 : _channels_drop_id_object(_channels *channels, int64_t id)
1102 : : {
1103 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1104 : :
1105 : 0 : _channelref *prev = NULL;
1106 : 0 : _channelref *ref = _channelref_find(channels->head, id, &prev);
1107 [ # # ]: 0 : if (ref == NULL) {
1108 : : // Already destroyed.
1109 : 0 : goto done;
1110 : : }
1111 : 0 : ref->objcount -= 1;
1112 : :
1113 : : // Destroy if no longer used.
1114 [ # # ]: 0 : if (ref->objcount == 0) {
1115 : 0 : _PyChannelState *chan = NULL;
1116 : 0 : _channels_remove_ref(channels, ref, prev, &chan);
1117 [ # # ]: 0 : if (chan != NULL) {
1118 : 0 : _channel_free(chan);
1119 : : }
1120 : : }
1121 : :
1122 : 0 : done:
1123 : 0 : PyThread_release_lock(channels->mutex);
1124 : 0 : }
1125 : :
1126 : : static int64_t *
1127 : 0 : _channels_list_all(_channels *channels, int64_t *count)
1128 : : {
1129 : 0 : int64_t *cids = NULL;
1130 : 0 : PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1131 [ # # ]: 0 : int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1132 [ # # ]: 0 : if (ids == NULL) {
1133 : 0 : goto done;
1134 : : }
1135 : 0 : _channelref *ref = channels->head;
1136 [ # # ]: 0 : for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1137 : 0 : ids[i] = ref->id;
1138 : : }
1139 : 0 : *count = channels->numopen;
1140 : :
1141 : 0 : cids = ids;
1142 : 0 : done:
1143 : 0 : PyThread_release_lock(channels->mutex);
1144 : 0 : return cids;
1145 : : }
1146 : :
1147 : : /* support for closing non-empty channels */
1148 : :
1149 : : struct _channel_closing {
1150 : : struct _channelref *ref;
1151 : : };
1152 : :
1153 : : static int
1154 : 0 : _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1155 : 0 : struct _channel *chan = ref->chan;
1156 [ # # ]: 0 : if (chan == NULL) {
1157 : : // already closed
1158 : 0 : return 0;
1159 : : }
1160 : 0 : int res = -1;
1161 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1162 [ # # ]: 0 : if (chan->closing != NULL) {
1163 : 0 : res = ERR_CHANNEL_CLOSED;
1164 : 0 : goto done;
1165 : : }
1166 : 0 : chan->closing = PyMem_NEW(struct _channel_closing, 1);
1167 [ # # ]: 0 : if (chan->closing == NULL) {
1168 : 0 : goto done;
1169 : : }
1170 : 0 : chan->closing->ref = ref;
1171 : :
1172 : 0 : res = 0;
1173 : 0 : done:
1174 : 0 : PyThread_release_lock(chan->mutex);
1175 : 0 : return res;
1176 : : }
1177 : :
1178 : : static void
1179 : 0 : _channel_clear_closing(struct _channel *chan) {
1180 : 0 : PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1181 [ # # ]: 0 : if (chan->closing != NULL) {
1182 : 0 : PyMem_Free(chan->closing);
1183 : 0 : chan->closing = NULL;
1184 : : }
1185 : 0 : PyThread_release_lock(chan->mutex);
1186 : 0 : }
1187 : :
1188 : : static void
1189 : 0 : _channel_finish_closing(struct _channel *chan) {
1190 : 0 : struct _channel_closing *closing = chan->closing;
1191 [ # # ]: 0 : if (closing == NULL) {
1192 : 0 : return;
1193 : : }
1194 : 0 : _channelref *ref = closing->ref;
1195 : 0 : _channel_clear_closing(chan);
1196 : : // Do the things that would have been done in _channels_close().
1197 : 0 : ref->chan = NULL;
1198 : 0 : _channel_free(chan);
1199 : : }
1200 : :
1201 : : /* "high"-level channel-related functions */
1202 : :
1203 : : static int64_t
1204 : 0 : _channel_create(_channels *channels)
1205 : : {
1206 : 0 : PyThread_type_lock mutex = PyThread_allocate_lock();
1207 [ # # ]: 0 : if (mutex == NULL) {
1208 : 0 : return ERR_CHANNEL_MUTEX_INIT;
1209 : : }
1210 : 0 : _PyChannelState *chan = _channel_new(mutex);
1211 [ # # ]: 0 : if (chan == NULL) {
1212 : 0 : PyThread_free_lock(mutex);
1213 : 0 : return -1;
1214 : : }
1215 : 0 : int64_t id = _channels_add(channels, chan);
1216 [ # # ]: 0 : if (id < 0) {
1217 : 0 : _channel_free(chan);
1218 : : }
1219 : 0 : return id;
1220 : : }
1221 : :
1222 : : static int
1223 : 0 : _channel_destroy(_channels *channels, int64_t id)
1224 : : {
1225 : 0 : _PyChannelState *chan = NULL;
1226 : 0 : int err = _channels_remove(channels, id, &chan);
1227 [ # # ]: 0 : if (err != 0) {
1228 : 0 : return err;
1229 : : }
1230 [ # # ]: 0 : if (chan != NULL) {
1231 : 0 : _channel_free(chan);
1232 : : }
1233 : 0 : return 0;
1234 : : }
1235 : :
1236 : : static int
1237 : 0 : _channel_send(_channels *channels, int64_t id, PyObject *obj)
1238 : : {
1239 : 0 : PyInterpreterState *interp = _get_current_interp();
1240 [ # # ]: 0 : if (interp == NULL) {
1241 : 0 : return -1;
1242 : : }
1243 : :
1244 : : // Look up the channel.
1245 : 0 : PyThread_type_lock mutex = NULL;
1246 : 0 : _PyChannelState *chan = NULL;
1247 : 0 : int err = _channels_lookup(channels, id, &mutex, &chan);
1248 [ # # ]: 0 : if (err != 0) {
1249 : 0 : return err;
1250 : : }
1251 : : assert(chan != NULL);
1252 : : // Past this point we are responsible for releasing the mutex.
1253 : :
1254 [ # # ]: 0 : if (chan->closing != NULL) {
1255 : 0 : PyThread_release_lock(mutex);
1256 : 0 : return ERR_CHANNEL_CLOSED;
1257 : : }
1258 : :
1259 : : // Convert the object to cross-interpreter data.
1260 : 0 : _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1261 [ # # ]: 0 : if (data == NULL) {
1262 : 0 : PyThread_release_lock(mutex);
1263 : 0 : return -1;
1264 : : }
1265 [ # # ]: 0 : if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1266 : 0 : PyThread_release_lock(mutex);
1267 : 0 : PyMem_Free(data);
1268 : 0 : return -1;
1269 : : }
1270 : :
1271 : : // Add the data to the channel.
1272 : 0 : int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1273 : 0 : PyThread_release_lock(mutex);
1274 [ # # ]: 0 : if (res != 0) {
1275 : : // We may chain an exception here:
1276 : 0 : (void)_release_xid_data(data, 0);
1277 : 0 : PyMem_Free(data);
1278 : 0 : return res;
1279 : : }
1280 : :
1281 : 0 : return 0;
1282 : : }
1283 : :
1284 : : static int
1285 : 0 : _channel_recv(_channels *channels, int64_t id, PyObject **res)
1286 : : {
1287 : : int err;
1288 : 0 : *res = NULL;
1289 : :
1290 : 0 : PyInterpreterState *interp = _get_current_interp();
1291 [ # # ]: 0 : if (interp == NULL) {
1292 : : // XXX Is this always an error?
1293 [ # # ]: 0 : if (PyErr_Occurred()) {
1294 : 0 : return -1;
1295 : : }
1296 : 0 : return 0;
1297 : : }
1298 : :
1299 : : // Look up the channel.
1300 : 0 : PyThread_type_lock mutex = NULL;
1301 : 0 : _PyChannelState *chan = NULL;
1302 : 0 : err = _channels_lookup(channels, id, &mutex, &chan);
1303 [ # # ]: 0 : if (err != 0) {
1304 : 0 : return err;
1305 : : }
1306 : : assert(chan != NULL);
1307 : : // Past this point we are responsible for releasing the mutex.
1308 : :
1309 : : // Pop off the next item from the channel.
1310 : 0 : _PyCrossInterpreterData *data = NULL;
1311 : 0 : err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
1312 : 0 : PyThread_release_lock(mutex);
1313 [ # # ]: 0 : if (err != 0) {
1314 : 0 : return err;
1315 : : }
1316 [ # # ]: 0 : else if (data == NULL) {
1317 : : assert(!PyErr_Occurred());
1318 : 0 : return 0;
1319 : : }
1320 : :
1321 : : // Convert the data back to an object.
1322 : 0 : PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1323 [ # # ]: 0 : if (obj == NULL) {
1324 : : assert(PyErr_Occurred());
1325 : 0 : (void)_release_xid_data(data, 1);
1326 : 0 : PyMem_Free(data);
1327 : 0 : return -1;
1328 : : }
1329 : 0 : int release_res = _release_xid_data(data, 0);
1330 : 0 : PyMem_Free(data);
1331 [ # # ]: 0 : if (release_res < 0) {
1332 : : // The source interpreter has been destroyed already.
1333 : : assert(PyErr_Occurred());
1334 : 0 : Py_DECREF(obj);
1335 : 0 : return -1;
1336 : : }
1337 : :
1338 : 0 : *res = obj;
1339 : 0 : return 0;
1340 : : }
1341 : :
1342 : : static int
1343 : 0 : _channel_drop(_channels *channels, int64_t id, int send, int recv)
1344 : : {
1345 : 0 : PyInterpreterState *interp = _get_current_interp();
1346 [ # # ]: 0 : if (interp == NULL) {
1347 : 0 : return -1;
1348 : : }
1349 : :
1350 : : // Look up the channel.
1351 : 0 : PyThread_type_lock mutex = NULL;
1352 : 0 : _PyChannelState *chan = NULL;
1353 : 0 : int err = _channels_lookup(channels, id, &mutex, &chan);
1354 [ # # ]: 0 : if (err != 0) {
1355 : 0 : return err;
1356 : : }
1357 : : // Past this point we are responsible for releasing the mutex.
1358 : :
1359 : : // Close one or both of the two ends.
1360 : 0 : int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1361 : 0 : PyThread_release_lock(mutex);
1362 : 0 : return res;
1363 : : }
1364 : :
1365 : : static int
1366 : 0 : _channel_close(_channels *channels, int64_t id, int end, int force)
1367 : : {
1368 : 0 : return _channels_close(channels, id, NULL, end, force);
1369 : : }
1370 : :
1371 : : static int
1372 : 0 : _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
1373 : : int send)
1374 : : {
1375 : 0 : _PyChannelState *chan = NULL;
1376 : 0 : int err = _channels_lookup(channels, cid, NULL, &chan);
1377 [ # # ]: 0 : if (err != 0) {
1378 : 0 : return err;
1379 : : }
1380 [ # # # # ]: 0 : else if (send && chan->closing != NULL) {
1381 : 0 : return ERR_CHANNEL_CLOSED;
1382 : : }
1383 : :
1384 [ # # ]: 0 : _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
1385 : : interp, NULL);
1386 : :
1387 [ # # # # ]: 0 : return (end != NULL && end->open);
1388 : : }
1389 : :
1390 : : /* ChannelID class */
1391 : :
1392 : : typedef struct channelid {
1393 : : PyObject_HEAD
1394 : : int64_t id;
1395 : : int end;
1396 : : int resolve;
1397 : : _channels *channels;
1398 : : } channelid;
1399 : :
1400 : : struct channel_id_converter_data {
1401 : : PyObject *module;
1402 : : int64_t cid;
1403 : : };
1404 : :
1405 : : static int
1406 : 0 : channel_id_converter(PyObject *arg, void *ptr)
1407 : : {
1408 : : int64_t cid;
1409 : 0 : struct channel_id_converter_data *data = ptr;
1410 : 0 : module_state *state = get_module_state(data->module);
1411 : : assert(state != NULL);
1412 [ # # ]: 0 : if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
1413 : 0 : cid = ((channelid *)arg)->id;
1414 : : }
1415 [ # # ]: 0 : else if (PyIndex_Check(arg)) {
1416 : 0 : cid = PyLong_AsLongLong(arg);
1417 [ # # # # ]: 0 : if (cid == -1 && PyErr_Occurred()) {
1418 : 0 : return 0;
1419 : : }
1420 [ # # ]: 0 : if (cid < 0) {
1421 : 0 : PyErr_Format(PyExc_ValueError,
1422 : : "channel ID must be a non-negative int, got %R", arg);
1423 : 0 : return 0;
1424 : : }
1425 : : }
1426 : : else {
1427 : 0 : PyErr_Format(PyExc_TypeError,
1428 : : "channel ID must be an int, got %.100s",
1429 : 0 : Py_TYPE(arg)->tp_name);
1430 : 0 : return 0;
1431 : : }
1432 : 0 : data->cid = cid;
1433 : 0 : return 1;
1434 : : }
1435 : :
1436 : : static int
1437 : 0 : newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1438 : : int force, int resolve, channelid **res)
1439 : : {
1440 : 0 : *res = NULL;
1441 : :
1442 : 0 : channelid *self = PyObject_New(channelid, cls);
1443 [ # # ]: 0 : if (self == NULL) {
1444 : 0 : return -1;
1445 : : }
1446 : 0 : self->id = cid;
1447 : 0 : self->end = end;
1448 : 0 : self->resolve = resolve;
1449 : 0 : self->channels = channels;
1450 : :
1451 : 0 : int err = _channels_add_id_object(channels, cid);
1452 [ # # ]: 0 : if (err != 0) {
1453 [ # # # # ]: 0 : if (force && err == ERR_CHANNEL_NOT_FOUND) {
1454 : : assert(!PyErr_Occurred());
1455 : : }
1456 : : else {
1457 : 0 : Py_DECREF((PyObject *)self);
1458 : 0 : return err;
1459 : : }
1460 : : }
1461 : :
1462 : 0 : *res = self;
1463 : 0 : return 0;
1464 : : }
1465 : :
1466 : : static _channels * _global_channels(void);
1467 : :
1468 : : static PyObject *
1469 : 0 : _channelid_new(PyObject *mod, PyTypeObject *cls,
1470 : : PyObject *args, PyObject *kwds)
1471 : : {
1472 : : static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1473 : : int64_t cid;
1474 : 0 : struct channel_id_converter_data cid_data = {
1475 : : .module = mod,
1476 : : };
1477 : 0 : int send = -1;
1478 : 0 : int recv = -1;
1479 : 0 : int force = 0;
1480 : 0 : int resolve = 0;
1481 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds,
1482 : : "O&|$pppp:ChannelID.__new__", kwlist,
1483 : : channel_id_converter, &cid_data,
1484 : : &send, &recv, &force, &resolve)) {
1485 : 0 : return NULL;
1486 : : }
1487 : 0 : cid = cid_data.cid;
1488 : :
1489 : : // Handle "send" and "recv".
1490 [ # # # # ]: 0 : if (send == 0 && recv == 0) {
1491 : 0 : PyErr_SetString(PyExc_ValueError,
1492 : : "'send' and 'recv' cannot both be False");
1493 : 0 : return NULL;
1494 : : }
1495 : :
1496 : 0 : int end = 0;
1497 [ # # ]: 0 : if (send == 1) {
1498 [ # # # # ]: 0 : if (recv == 0 || recv == -1) {
1499 : 0 : end = CHANNEL_SEND;
1500 : : }
1501 : : }
1502 [ # # ]: 0 : else if (recv == 1) {
1503 : 0 : end = CHANNEL_RECV;
1504 : : }
1505 : :
1506 : 0 : PyObject *id = NULL;
1507 : 0 : int err = newchannelid(cls, cid, end, _global_channels(),
1508 : : force, resolve,
1509 : : (channelid **)&id);
1510 [ # # ]: 0 : if (handle_channel_error(err, mod, cid)) {
1511 : : assert(id == NULL);
1512 : 0 : return NULL;
1513 : : }
1514 : : assert(id != NULL);
1515 : 0 : return id;
1516 : : }
1517 : :
1518 : : static void
1519 : 0 : channelid_dealloc(PyObject *self)
1520 : : {
1521 : 0 : int64_t cid = ((channelid *)self)->id;
1522 : 0 : _channels *channels = ((channelid *)self)->channels;
1523 : :
1524 : 0 : PyTypeObject *tp = Py_TYPE(self);
1525 : 0 : tp->tp_free(self);
1526 : : /* "Instances of heap-allocated types hold a reference to their type."
1527 : : * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
1528 : : * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
1529 : : */
1530 : : // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
1531 : : // like we do for _abc._abc_data?
1532 : 0 : Py_DECREF(tp);
1533 : :
1534 : 0 : _channels_drop_id_object(channels, cid);
1535 : 0 : }
1536 : :
1537 : : static PyObject *
1538 : 0 : channelid_repr(PyObject *self)
1539 : : {
1540 : 0 : PyTypeObject *type = Py_TYPE(self);
1541 : 0 : const char *name = _PyType_Name(type);
1542 : :
1543 : 0 : channelid *cid = (channelid *)self;
1544 : : const char *fmt;
1545 [ # # ]: 0 : if (cid->end == CHANNEL_SEND) {
1546 : 0 : fmt = "%s(%" PRId64 ", send=True)";
1547 : : }
1548 [ # # ]: 0 : else if (cid->end == CHANNEL_RECV) {
1549 : 0 : fmt = "%s(%" PRId64 ", recv=True)";
1550 : : }
1551 : : else {
1552 : 0 : fmt = "%s(%" PRId64 ")";
1553 : : }
1554 : 0 : return PyUnicode_FromFormat(fmt, name, cid->id);
1555 : : }
1556 : :
1557 : : static PyObject *
1558 : 0 : channelid_str(PyObject *self)
1559 : : {
1560 : 0 : channelid *cid = (channelid *)self;
1561 : 0 : return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1562 : : }
1563 : :
1564 : : static PyObject *
1565 : 0 : channelid_int(PyObject *self)
1566 : : {
1567 : 0 : channelid *cid = (channelid *)self;
1568 : 0 : return PyLong_FromLongLong(cid->id);
1569 : : }
1570 : :
1571 : : static Py_hash_t
1572 : 0 : channelid_hash(PyObject *self)
1573 : : {
1574 : 0 : channelid *cid = (channelid *)self;
1575 : 0 : PyObject *id = PyLong_FromLongLong(cid->id);
1576 [ # # ]: 0 : if (id == NULL) {
1577 : 0 : return -1;
1578 : : }
1579 : 0 : Py_hash_t hash = PyObject_Hash(id);
1580 : 0 : Py_DECREF(id);
1581 : 0 : return hash;
1582 : : }
1583 : :
1584 : : static PyObject *
1585 : 0 : channelid_richcompare(PyObject *self, PyObject *other, int op)
1586 : : {
1587 : 0 : PyObject *res = NULL;
1588 [ # # # # ]: 0 : if (op != Py_EQ && op != Py_NE) {
1589 : 0 : Py_RETURN_NOTIMPLEMENTED;
1590 : : }
1591 : :
1592 : 0 : PyObject *mod = get_module_from_type(Py_TYPE(self));
1593 [ # # ]: 0 : if (mod == NULL) {
1594 : 0 : return NULL;
1595 : : }
1596 : 0 : module_state *state = get_module_state(mod);
1597 [ # # ]: 0 : if (state == NULL) {
1598 : 0 : goto done;
1599 : : }
1600 : :
1601 [ # # ]: 0 : if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
1602 : 0 : res = Py_NewRef(Py_NotImplemented);
1603 : 0 : goto done;
1604 : : }
1605 : :
1606 : 0 : channelid *cid = (channelid *)self;
1607 : : int equal;
1608 [ # # ]: 0 : if (PyObject_TypeCheck(other, state->ChannelIDType)) {
1609 : 0 : channelid *othercid = (channelid *)other;
1610 [ # # # # ]: 0 : equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1611 : : }
1612 [ # # ]: 0 : else if (PyLong_Check(other)) {
1613 : : /* Fast path */
1614 : : int overflow;
1615 : 0 : long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1616 [ # # # # ]: 0 : if (othercid == -1 && PyErr_Occurred()) {
1617 : 0 : goto done;
1618 : : }
1619 [ # # # # : 0 : equal = !overflow && (othercid >= 0) && (cid->id == othercid);
# # ]
1620 : : }
1621 [ # # ]: 0 : else if (PyNumber_Check(other)) {
1622 : 0 : PyObject *pyid = PyLong_FromLongLong(cid->id);
1623 [ # # ]: 0 : if (pyid == NULL) {
1624 : 0 : goto done;
1625 : : }
1626 : 0 : res = PyObject_RichCompare(pyid, other, op);
1627 : 0 : Py_DECREF(pyid);
1628 : 0 : goto done;
1629 : : }
1630 : : else {
1631 : 0 : res = Py_NewRef(Py_NotImplemented);
1632 : 0 : goto done;
1633 : : }
1634 : :
1635 [ # # # # : 0 : if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
# # # # ]
1636 : 0 : res = Py_NewRef(Py_True);
1637 : : }
1638 : : else {
1639 : 0 : res = Py_NewRef(Py_False);
1640 : : }
1641 : :
1642 : 0 : done:
1643 : 0 : Py_DECREF(mod);
1644 : 0 : return res;
1645 : : }
1646 : :
1647 : : static PyObject *
1648 : 0 : _channel_from_cid(PyObject *cid, int end)
1649 : : {
1650 : 0 : PyObject *highlevel = PyImport_ImportModule("interpreters");
1651 [ # # ]: 0 : if (highlevel == NULL) {
1652 : 0 : PyErr_Clear();
1653 : 0 : highlevel = PyImport_ImportModule("test.support.interpreters");
1654 [ # # ]: 0 : if (highlevel == NULL) {
1655 : 0 : return NULL;
1656 : : }
1657 : : }
1658 [ # # ]: 0 : const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1659 : : "SendChannel";
1660 : 0 : PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1661 : 0 : Py_DECREF(highlevel);
1662 [ # # ]: 0 : if (cls == NULL) {
1663 : 0 : return NULL;
1664 : : }
1665 : 0 : PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1666 : 0 : Py_DECREF(cls);
1667 [ # # ]: 0 : if (chan == NULL) {
1668 : 0 : return NULL;
1669 : : }
1670 : 0 : return chan;
1671 : : }
1672 : :
1673 : : struct _channelid_xid {
1674 : : int64_t id;
1675 : : int end;
1676 : : int resolve;
1677 : : };
1678 : :
1679 : : static PyObject *
1680 : 0 : _channelid_from_xid(_PyCrossInterpreterData *data)
1681 : : {
1682 : 0 : struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1683 : :
1684 : : // It might not be imported yet, so we can't use _get_current_module().
1685 : 0 : PyObject *mod = PyImport_ImportModule(MODULE_NAME);
1686 [ # # ]: 0 : if (mod == NULL) {
1687 : 0 : return NULL;
1688 : : }
1689 : : assert(mod != Py_None);
1690 : 0 : module_state *state = get_module_state(mod);
1691 [ # # ]: 0 : if (state == NULL) {
1692 : 0 : return NULL;
1693 : : }
1694 : :
1695 : : // Note that we do not preserve the "resolve" flag.
1696 : 0 : PyObject *cid = NULL;
1697 : 0 : int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
1698 : : _global_channels(), 0, 0,
1699 : : (channelid **)&cid);
1700 [ # # ]: 0 : if (err != 0) {
1701 : : assert(cid == NULL);
1702 : 0 : (void)handle_channel_error(err, mod, xid->id);
1703 : 0 : goto done;
1704 : : }
1705 : : assert(cid != NULL);
1706 [ # # ]: 0 : if (xid->end == 0) {
1707 : 0 : goto done;
1708 : : }
1709 [ # # ]: 0 : if (!xid->resolve) {
1710 : 0 : goto done;
1711 : : }
1712 : :
1713 : : /* Try returning a high-level channel end but fall back to the ID. */
1714 : 0 : PyObject *chan = _channel_from_cid(cid, xid->end);
1715 [ # # ]: 0 : if (chan == NULL) {
1716 : 0 : PyErr_Clear();
1717 : 0 : goto done;
1718 : : }
1719 : 0 : Py_DECREF(cid);
1720 : 0 : cid = chan;
1721 : :
1722 : 0 : done:
1723 : 0 : Py_DECREF(mod);
1724 : 0 : return cid;
1725 : : }
1726 : :
1727 : : static int
1728 : 0 : _channelid_shared(PyThreadState *tstate, PyObject *obj,
1729 : : _PyCrossInterpreterData *data)
1730 : : {
1731 [ # # ]: 0 : if (_PyCrossInterpreterData_InitWithSize(
1732 : : data, tstate->interp, sizeof(struct _channelid_xid), obj,
1733 : : _channelid_from_xid
1734 : : ) < 0)
1735 : : {
1736 : 0 : return -1;
1737 : : }
1738 : 0 : struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1739 : 0 : xid->id = ((channelid *)obj)->id;
1740 : 0 : xid->end = ((channelid *)obj)->end;
1741 : 0 : xid->resolve = ((channelid *)obj)->resolve;
1742 : 0 : return 0;
1743 : : }
1744 : :
1745 : : static PyObject *
1746 : 0 : channelid_end(PyObject *self, void *end)
1747 : : {
1748 : 0 : int force = 1;
1749 : 0 : channelid *cid = (channelid *)self;
1750 [ # # ]: 0 : if (end != NULL) {
1751 : 0 : PyObject *id = NULL;
1752 : 0 : int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1753 : : cid->channels, force, cid->resolve,
1754 : : (channelid **)&id);
1755 [ # # ]: 0 : if (err != 0) {
1756 : : assert(id == NULL);
1757 : 0 : PyObject *mod = get_module_from_type(Py_TYPE(self));
1758 [ # # ]: 0 : if (mod == NULL) {
1759 : 0 : return NULL;
1760 : : }
1761 : 0 : (void)handle_channel_error(err, mod, cid->id);
1762 : 0 : Py_DECREF(mod);
1763 : 0 : return NULL;
1764 : : }
1765 : : assert(id != NULL);
1766 : 0 : return id;
1767 : : }
1768 : :
1769 [ # # ]: 0 : if (cid->end == CHANNEL_SEND) {
1770 : 0 : return PyUnicode_InternFromString("send");
1771 : : }
1772 [ # # ]: 0 : if (cid->end == CHANNEL_RECV) {
1773 : 0 : return PyUnicode_InternFromString("recv");
1774 : : }
1775 : 0 : return PyUnicode_InternFromString("both");
1776 : : }
1777 : :
1778 : : static int _channelid_end_send = CHANNEL_SEND;
1779 : : static int _channelid_end_recv = CHANNEL_RECV;
1780 : :
1781 : : static PyGetSetDef channelid_getsets[] = {
1782 : : {"end", (getter)channelid_end, NULL,
1783 : : PyDoc_STR("'send', 'recv', or 'both'")},
1784 : : {"send", (getter)channelid_end, NULL,
1785 : : PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1786 : : {"recv", (getter)channelid_end, NULL,
1787 : : PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1788 : : {NULL}
1789 : : };
1790 : :
1791 : : PyDoc_STRVAR(channelid_doc,
1792 : : "A channel ID identifies a channel and may be used as an int.");
1793 : :
1794 : : static PyType_Slot ChannelIDType_slots[] = {
1795 : : {Py_tp_dealloc, (destructor)channelid_dealloc},
1796 : : {Py_tp_doc, (void *)channelid_doc},
1797 : : {Py_tp_repr, (reprfunc)channelid_repr},
1798 : : {Py_tp_str, (reprfunc)channelid_str},
1799 : : {Py_tp_hash, channelid_hash},
1800 : : {Py_tp_richcompare, channelid_richcompare},
1801 : : {Py_tp_getset, channelid_getsets},
1802 : : // number slots
1803 : : {Py_nb_int, (unaryfunc)channelid_int},
1804 : : {Py_nb_index, (unaryfunc)channelid_int},
1805 : : {0, NULL},
1806 : : };
1807 : :
1808 : : static PyType_Spec ChannelIDType_spec = {
1809 : : .name = MODULE_NAME ".ChannelID",
1810 : : .basicsize = sizeof(channelid),
1811 : : .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
1812 : : Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
1813 : : .slots = ChannelIDType_slots,
1814 : : };
1815 : :
1816 : :
1817 : : /* module level code ********************************************************/
1818 : :
1819 : : /* globals is the process-global state for the module. It holds all
1820 : : the data that we need to share between interpreters, so it cannot
1821 : : hold PyObject values. */
1822 : : static struct globals {
1823 : : int module_count;
1824 : : _channels channels;
1825 : : } _globals = {0};
1826 : :
1827 : : static int
1828 : 1 : _globals_init(void)
1829 : : {
1830 : : // XXX This isn't thread-safe.
1831 : 1 : _globals.module_count++;
1832 [ - + ]: 1 : if (_globals.module_count > 1) {
1833 : : // Already initialized.
1834 : 0 : return 0;
1835 : : }
1836 : :
1837 : : assert(_globals.channels.mutex == NULL);
1838 : 1 : PyThread_type_lock mutex = PyThread_allocate_lock();
1839 [ - + ]: 1 : if (mutex == NULL) {
1840 : 0 : return ERR_CHANNELS_MUTEX_INIT;
1841 : : }
1842 : 1 : _channels_init(&_globals.channels, mutex);
1843 : 1 : return 0;
1844 : : }
1845 : :
1846 : : static void
1847 : 1 : _globals_fini(void)
1848 : : {
1849 : : // XXX This isn't thread-safe.
1850 : 1 : _globals.module_count--;
1851 [ - + ]: 1 : if (_globals.module_count > 0) {
1852 : 0 : return;
1853 : : }
1854 : :
1855 : 1 : _channels_fini(&_globals.channels);
1856 : : }
1857 : :
1858 : : static _channels *
1859 : 0 : _global_channels(void) {
1860 : 0 : return &_globals.channels;
1861 : : }
1862 : :
1863 : :
1864 : : static PyObject *
1865 : 0 : channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
1866 : : {
1867 : 0 : int64_t cid = _channel_create(&_globals.channels);
1868 [ # # ]: 0 : if (cid < 0) {
1869 : 0 : (void)handle_channel_error(-1, self, cid);
1870 : 0 : return NULL;
1871 : : }
1872 : 0 : module_state *state = get_module_state(self);
1873 [ # # ]: 0 : if (state == NULL) {
1874 : 0 : return NULL;
1875 : : }
1876 : 0 : PyObject *id = NULL;
1877 : 0 : int err = newchannelid(state->ChannelIDType, cid, 0,
1878 : : &_globals.channels, 0, 0,
1879 : : (channelid **)&id);
1880 [ # # ]: 0 : if (handle_channel_error(err, self, cid)) {
1881 : : assert(id == NULL);
1882 : 0 : err = _channel_destroy(&_globals.channels, cid);
1883 : 0 : if (handle_channel_error(err, self, cid)) {
1884 : : // XXX issue a warning?
1885 : : }
1886 : 0 : return NULL;
1887 : : }
1888 : : assert(id != NULL);
1889 : : assert(((channelid *)id)->channels != NULL);
1890 : 0 : return id;
1891 : : }
1892 : :
1893 : : PyDoc_STRVAR(channel_create_doc,
1894 : : "channel_create() -> cid\n\
1895 : : \n\
1896 : : Create a new cross-interpreter channel and return a unique generated ID.");
1897 : :
1898 : : static PyObject *
1899 : 0 : channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
1900 : : {
1901 : : static char *kwlist[] = {"cid", NULL};
1902 : : int64_t cid;
1903 : 0 : struct channel_id_converter_data cid_data = {
1904 : : .module = self,
1905 : : };
1906 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
1907 : : channel_id_converter, &cid_data)) {
1908 : 0 : return NULL;
1909 : : }
1910 : 0 : cid = cid_data.cid;
1911 : :
1912 : 0 : int err = _channel_destroy(&_globals.channels, cid);
1913 [ # # ]: 0 : if (handle_channel_error(err, self, cid)) {
1914 : 0 : return NULL;
1915 : : }
1916 : 0 : Py_RETURN_NONE;
1917 : : }
1918 : :
1919 : : PyDoc_STRVAR(channel_destroy_doc,
1920 : : "channel_destroy(cid)\n\
1921 : : \n\
1922 : : Close and finalize the channel. Afterward attempts to use the channel\n\
1923 : : will behave as though it never existed.");
1924 : :
1925 : : static PyObject *
1926 : 0 : channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
1927 : : {
1928 : 0 : int64_t count = 0;
1929 : 0 : int64_t *cids = _channels_list_all(&_globals.channels, &count);
1930 [ # # ]: 0 : if (cids == NULL) {
1931 [ # # ]: 0 : if (count == 0) {
1932 : 0 : return PyList_New(0);
1933 : : }
1934 : 0 : return NULL;
1935 : : }
1936 : 0 : PyObject *ids = PyList_New((Py_ssize_t)count);
1937 [ # # ]: 0 : if (ids == NULL) {
1938 : 0 : goto finally;
1939 : : }
1940 : 0 : module_state *state = get_module_state(self);
1941 [ # # ]: 0 : if (state == NULL) {
1942 : 0 : Py_DECREF(ids);
1943 : 0 : ids = NULL;
1944 : 0 : goto finally;
1945 : : }
1946 : 0 : int64_t *cur = cids;
1947 [ # # ]: 0 : for (int64_t i=0; i < count; cur++, i++) {
1948 : 0 : PyObject *id = NULL;
1949 : 0 : int err = newchannelid(state->ChannelIDType, *cur, 0,
1950 : : &_globals.channels, 0, 0,
1951 : : (channelid **)&id);
1952 [ # # ]: 0 : if (handle_channel_error(err, self, *cur)) {
1953 : : assert(id == NULL);
1954 : 0 : Py_SETREF(ids, NULL);
1955 : 0 : break;
1956 : : }
1957 : : assert(id != NULL);
1958 : 0 : PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
1959 : : }
1960 : :
1961 : 0 : finally:
1962 : 0 : PyMem_Free(cids);
1963 : 0 : return ids;
1964 : : }
1965 : :
1966 : : PyDoc_STRVAR(channel_list_all_doc,
1967 : : "channel_list_all() -> [cid]\n\
1968 : : \n\
1969 : : Return the list of all IDs for active channels.");
1970 : :
1971 : : static PyObject *
1972 : 0 : channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
1973 : : {
1974 : : static char *kwlist[] = {"cid", "send", NULL};
1975 : : int64_t cid; /* Channel ID */
1976 : 0 : struct channel_id_converter_data cid_data = {
1977 : : .module = self,
1978 : : };
1979 : 0 : int send = 0; /* Send or receive end? */
1980 : : int64_t id;
1981 : : PyObject *ids, *id_obj;
1982 : : PyInterpreterState *interp;
1983 : :
1984 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(
1985 : : args, kwds, "O&$p:channel_list_interpreters",
1986 : : kwlist, channel_id_converter, &cid_data, &send)) {
1987 : 0 : return NULL;
1988 : : }
1989 : 0 : cid = cid_data.cid;
1990 : :
1991 : 0 : ids = PyList_New(0);
1992 [ # # ]: 0 : if (ids == NULL) {
1993 : 0 : goto except;
1994 : : }
1995 : :
1996 : 0 : interp = PyInterpreterState_Head();
1997 [ # # ]: 0 : while (interp != NULL) {
1998 : 0 : id = PyInterpreterState_GetID(interp);
1999 : : assert(id >= 0);
2000 : 0 : int res = _channel_is_associated(&_globals.channels, cid, id, send);
2001 [ # # ]: 0 : if (res < 0) {
2002 : 0 : (void)handle_channel_error(res, self, cid);
2003 : 0 : goto except;
2004 : : }
2005 [ # # ]: 0 : if (res) {
2006 : 0 : id_obj = _PyInterpreterState_GetIDObject(interp);
2007 [ # # ]: 0 : if (id_obj == NULL) {
2008 : 0 : goto except;
2009 : : }
2010 : 0 : res = PyList_Insert(ids, 0, id_obj);
2011 : 0 : Py_DECREF(id_obj);
2012 [ # # ]: 0 : if (res < 0) {
2013 : 0 : goto except;
2014 : : }
2015 : : }
2016 : 0 : interp = PyInterpreterState_Next(interp);
2017 : : }
2018 : :
2019 : 0 : goto finally;
2020 : :
2021 : 0 : except:
2022 [ # # ]: 0 : Py_CLEAR(ids);
2023 : :
2024 : 0 : finally:
2025 : 0 : return ids;
2026 : : }
2027 : :
2028 : : PyDoc_STRVAR(channel_list_interpreters_doc,
2029 : : "channel_list_interpreters(cid, *, send) -> [id]\n\
2030 : : \n\
2031 : : Return the list of all interpreter IDs associated with an end of the channel.\n\
2032 : : \n\
2033 : : The 'send' argument should be a boolean indicating whether to use the send or\n\
2034 : : receive end.");
2035 : :
2036 : :
2037 : : static PyObject *
2038 : 0 : channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2039 : : {
2040 : : static char *kwlist[] = {"cid", "obj", NULL};
2041 : : int64_t cid;
2042 : 0 : struct channel_id_converter_data cid_data = {
2043 : : .module = self,
2044 : : };
2045 : : PyObject *obj;
2046 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2047 : : channel_id_converter, &cid_data, &obj)) {
2048 : 0 : return NULL;
2049 : : }
2050 : 0 : cid = cid_data.cid;
2051 : :
2052 : 0 : int err = _channel_send(&_globals.channels, cid, obj);
2053 [ # # ]: 0 : if (handle_channel_error(err, self, cid)) {
2054 : 0 : return NULL;
2055 : : }
2056 : 0 : Py_RETURN_NONE;
2057 : : }
2058 : :
2059 : : PyDoc_STRVAR(channel_send_doc,
2060 : : "channel_send(cid, obj)\n\
2061 : : \n\
2062 : : Add the object's data to the channel's queue.");
2063 : :
2064 : : static PyObject *
2065 : 0 : channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2066 : : {
2067 : : static char *kwlist[] = {"cid", "default", NULL};
2068 : : int64_t cid;
2069 : 0 : struct channel_id_converter_data cid_data = {
2070 : : .module = self,
2071 : : };
2072 : 0 : PyObject *dflt = NULL;
2073 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
2074 : : channel_id_converter, &cid_data, &dflt)) {
2075 : 0 : return NULL;
2076 : : }
2077 : 0 : cid = cid_data.cid;
2078 : :
2079 : 0 : PyObject *obj = NULL;
2080 : 0 : int err = _channel_recv(&_globals.channels, cid, &obj);
2081 [ # # ]: 0 : if (handle_channel_error(err, self, cid)) {
2082 : 0 : return NULL;
2083 : : }
2084 : 0 : Py_XINCREF(dflt);
2085 [ # # ]: 0 : if (obj == NULL) {
2086 : : // Use the default.
2087 [ # # ]: 0 : if (dflt == NULL) {
2088 : 0 : (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
2089 : 0 : return NULL;
2090 : : }
2091 : 0 : obj = Py_NewRef(dflt);
2092 : : }
2093 : 0 : Py_XDECREF(dflt);
2094 : 0 : return obj;
2095 : : }
2096 : :
2097 : : PyDoc_STRVAR(channel_recv_doc,
2098 : : "channel_recv(cid, [default]) -> obj\n\
2099 : : \n\
2100 : : Return a new object from the data at the front of the channel's queue.\n\
2101 : : \n\
2102 : : If there is nothing to receive then raise ChannelEmptyError, unless\n\
2103 : : a default value is provided. In that case return it.");
2104 : :
2105 : : static PyObject *
2106 : 0 : channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2107 : : {
2108 : : static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2109 : : int64_t cid;
2110 : 0 : struct channel_id_converter_data cid_data = {
2111 : : .module = self,
2112 : : };
2113 : 0 : int send = 0;
2114 : 0 : int recv = 0;
2115 : 0 : int force = 0;
2116 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds,
2117 : : "O&|$ppp:channel_close", kwlist,
2118 : : channel_id_converter, &cid_data,
2119 : : &send, &recv, &force)) {
2120 : 0 : return NULL;
2121 : : }
2122 : 0 : cid = cid_data.cid;
2123 : :
2124 : 0 : int err = _channel_close(&_globals.channels, cid, send-recv, force);
2125 [ # # ]: 0 : if (handle_channel_error(err, self, cid)) {
2126 : 0 : return NULL;
2127 : : }
2128 : 0 : Py_RETURN_NONE;
2129 : : }
2130 : :
2131 : : PyDoc_STRVAR(channel_close_doc,
2132 : : "channel_close(cid, *, send=None, recv=None, force=False)\n\
2133 : : \n\
2134 : : Close the channel for all interpreters.\n\
2135 : : \n\
2136 : : If the channel is empty then the keyword args are ignored and both\n\
2137 : : ends are immediately closed. Otherwise, if 'force' is True then\n\
2138 : : all queued items are released and both ends are immediately\n\
2139 : : closed.\n\
2140 : : \n\
2141 : : If the channel is not empty *and* 'force' is False then following\n\
2142 : : happens:\n\
2143 : : \n\
2144 : : * recv is True (regardless of send):\n\
2145 : : - raise ChannelNotEmptyError\n\
2146 : : * recv is None and send is None:\n\
2147 : : - raise ChannelNotEmptyError\n\
2148 : : * send is True and recv is not True:\n\
2149 : : - fully close the 'send' end\n\
2150 : : - close the 'recv' end to interpreters not already receiving\n\
2151 : : - fully close it once empty\n\
2152 : : \n\
2153 : : Closing an already closed channel results in a ChannelClosedError.\n\
2154 : : \n\
2155 : : Once the channel's ID has no more ref counts in any interpreter\n\
2156 : : the channel will be destroyed.");
2157 : :
2158 : : static PyObject *
2159 : 0 : channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2160 : : {
2161 : : // Note that only the current interpreter is affected.
2162 : : static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2163 : : int64_t cid;
2164 : 0 : struct channel_id_converter_data cid_data = {
2165 : : .module = self,
2166 : : };
2167 : 0 : int send = 0;
2168 : 0 : int recv = 0;
2169 : 0 : int force = 0;
2170 [ # # ]: 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds,
2171 : : "O&|$ppp:channel_release", kwlist,
2172 : : channel_id_converter, &cid_data,
2173 : : &send, &recv, &force)) {
2174 : 0 : return NULL;
2175 : : }
2176 : 0 : cid = cid_data.cid;
2177 [ # # # # ]: 0 : if (send == 0 && recv == 0) {
2178 : 0 : send = 1;
2179 : 0 : recv = 1;
2180 : : }
2181 : :
2182 : : // XXX Handle force is True.
2183 : : // XXX Fix implicit release.
2184 : :
2185 : 0 : int err = _channel_drop(&_globals.channels, cid, send, recv);
2186 [ # # ]: 0 : if (handle_channel_error(err, self, cid)) {
2187 : 0 : return NULL;
2188 : : }
2189 : 0 : Py_RETURN_NONE;
2190 : : }
2191 : :
2192 : : PyDoc_STRVAR(channel_release_doc,
2193 : : "channel_release(cid, *, send=None, recv=None, force=True)\n\
2194 : : \n\
2195 : : Close the channel for the current interpreter. 'send' and 'recv'\n\
2196 : : (bool) may be used to indicate the ends to close. By default both\n\
2197 : : ends are closed. Closing an already closed end is a noop.");
2198 : :
2199 : : static PyObject *
2200 : 0 : channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2201 : : {
2202 : 0 : module_state *state = get_module_state(self);
2203 [ # # ]: 0 : if (state == NULL) {
2204 : 0 : return NULL;
2205 : : }
2206 : 0 : PyTypeObject *cls = state->ChannelIDType;
2207 : 0 : PyObject *mod = get_module_from_owned_type(cls);
2208 [ # # ]: 0 : if (mod == NULL) {
2209 : 0 : return NULL;
2210 : : }
2211 : 0 : PyObject *cid = _channelid_new(mod, cls, args, kwds);
2212 : 0 : Py_DECREF(mod);
2213 : 0 : return cid;
2214 : : }
2215 : :
2216 : : static PyMethodDef module_functions[] = {
2217 : : {"create", channel_create,
2218 : : METH_NOARGS, channel_create_doc},
2219 : : {"destroy", _PyCFunction_CAST(channel_destroy),
2220 : : METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2221 : : {"list_all", channel_list_all,
2222 : : METH_NOARGS, channel_list_all_doc},
2223 : : {"list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
2224 : : METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
2225 : : {"send", _PyCFunction_CAST(channel_send),
2226 : : METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2227 : : {"recv", _PyCFunction_CAST(channel_recv),
2228 : : METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2229 : : {"close", _PyCFunction_CAST(channel_close),
2230 : : METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2231 : : {"release", _PyCFunction_CAST(channel_release),
2232 : : METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2233 : : {"_channel_id", _PyCFunction_CAST(channel__channel_id),
2234 : : METH_VARARGS | METH_KEYWORDS, NULL},
2235 : :
2236 : : {NULL, NULL} /* sentinel */
2237 : : };
2238 : :
2239 : :
2240 : : /* initialization function */
2241 : :
2242 : : PyDoc_STRVAR(module_doc,
2243 : : "This module provides primitive operations to manage Python interpreters.\n\
2244 : : The 'interpreters' module provides a more convenient interface.");
2245 : :
2246 : : static int
2247 : 1 : module_exec(PyObject *mod)
2248 : : {
2249 [ - + ]: 1 : if (_globals_init() != 0) {
2250 : 0 : return -1;
2251 : : }
2252 : :
2253 : : /* Add exception types */
2254 [ - + ]: 1 : if (exceptions_init(mod) != 0) {
2255 : 0 : goto error;
2256 : : }
2257 : :
2258 : : /* Add other types */
2259 : 1 : module_state *state = get_module_state(mod);
2260 [ - + ]: 1 : if (state == NULL) {
2261 : 0 : goto error;
2262 : : }
2263 : :
2264 : : // ChannelID
2265 : 1 : state->ChannelIDType = add_new_type(
2266 : : mod, &ChannelIDType_spec, _channelid_shared);
2267 [ - + ]: 1 : if (state->ChannelIDType == NULL) {
2268 : 0 : goto error;
2269 : : }
2270 : :
2271 : 1 : return 0;
2272 : :
2273 : 0 : error:
2274 : 0 : _globals_fini();
2275 : 0 : return -1;
2276 : : }
2277 : :
2278 : : static struct PyModuleDef_Slot module_slots[] = {
2279 : : {Py_mod_exec, module_exec},
2280 : : {0, NULL},
2281 : : };
2282 : :
2283 : : static int
2284 : 6 : module_traverse(PyObject *mod, visitproc visit, void *arg)
2285 : : {
2286 : 6 : module_state *state = get_module_state(mod);
2287 : : assert(state != NULL);
2288 : 6 : traverse_module_state(state, visit, arg);
2289 : 6 : return 0;
2290 : : }
2291 : :
2292 : : static int
2293 : 1 : module_clear(PyObject *mod)
2294 : : {
2295 : 1 : module_state *state = get_module_state(mod);
2296 : : assert(state != NULL);
2297 : 1 : clear_module_state(state);
2298 : 1 : return 0;
2299 : : }
2300 : :
2301 : : static void
2302 : 1 : module_free(void *mod)
2303 : : {
2304 : 1 : module_state *state = get_module_state(mod);
2305 : : assert(state != NULL);
2306 : 1 : clear_module_state(state);
2307 : 1 : _globals_fini();
2308 : 1 : }
2309 : :
2310 : : static struct PyModuleDef moduledef = {
2311 : : .m_base = PyModuleDef_HEAD_INIT,
2312 : : .m_name = MODULE_NAME,
2313 : : .m_doc = module_doc,
2314 : : .m_size = sizeof(module_state),
2315 : : .m_methods = module_functions,
2316 : : .m_slots = module_slots,
2317 : : .m_traverse = module_traverse,
2318 : : .m_clear = module_clear,
2319 : : .m_free = (freefunc)module_free,
2320 : : };
2321 : :
2322 : : PyMODINIT_FUNC
2323 : 1 : PyInit__xxinterpchannels(void)
2324 : : {
2325 : 1 : return PyModuleDef_Init(&moduledef);
2326 : : }
|