Skip to content

Feat/pool executor map buffersize #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
9 changes: 7 additions & 2 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())

.. method:: map(fn, *iterables, timeout=None, chunksize=1)
.. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

Similar to :func:`map(fn, *iterables) <map>` except:

* the *iterables* are collected immediately rather than lazily;
* the *iterables* are collected immediately rather than lazily, unless a
*buffersize* is specified: If the buffer is full, then the iteration
over *iterables* is paused until a result is yielded from the buffer.

* *fn* is executed asynchronously and several calls to
*fn* may be made concurrently.
Expand All @@ -68,6 +70,9 @@ Executor Objects
.. versionchanged:: 3.5
Added the *chunksize* argument.

.. versionchanged:: 3.15
Added the *buffersize* argument.

.. method:: shutdown(wait=True, *, cancel_futures=False)

Signal the executor that it should free any resources that it is using
Expand Down
22 changes: 20 additions & 2 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
__author__ = 'Brian Quinlan ([email protected])'

import collections
from itertools import islice
import logging
import threading
import time
import types
import weakref

FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
Expand Down Expand Up @@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs):
"""
raise NotImplementedError()

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -584,6 +586,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
buffersize: The maximum number of not-yet-yielded results buffered.
If the buffer is full, then iteration over `iterables` is paused
until an element is yielded out of the buffer.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -594,10 +599,21 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if buffersize is not None and buffersize < 1:
raise ValueError("buffersize must be None or >= 1.")

if timeout is not None:
end_time = timeout + time.monotonic()

fs = [self.submit(fn, *args) for args in zip(*iterables)]
args_iter = iter(zip(*iterables))
if buffersize:
fs = collections.deque(
self.submit(fn, *args) for args in islice(args_iter, buffersize)
)
else:
fs = [self.submit(fn, *args) for args in args_iter]

executor_weakref = weakref.ref(self)

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
Expand All @@ -607,6 +623,8 @@ def result_iterator():
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if buffersize and (executor := executor_weakref()) and (args := next(args_iter, None)):
fs.appendleft(executor.submit(fn, *args))
if timeout is None:
yield _result_or_cancel(fs.pop())
else:
Expand Down
8 changes: 6 additions & 2 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs):
return f
submit.__doc__ = _base.Executor.submit.__doc__

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -824,6 +824,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
buffersize: The maximum number of not-yet-yielded results buffered.
If the buffer is full, then iteration over `iterables` is paused
until an element is yielded out of the buffer.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -839,7 +842,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):

results = super().map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
timeout=timeout,
buffersize=buffersize)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True, *, cancel_futures=False):
Expand Down
44 changes: 44 additions & 0 deletions Lib/test/test_concurrent_futures/test_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Manager
import time
import unittest

from .util import BaseTestCase, setup_module


class PoolExecutorTest(BaseTestCase):
def test_map_buffersize(self):
manager = Manager()
for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor):
with ExecutorType(max_workers=1) as pool:
with self.assertRaisesRegex(
ValueError, "buffersize must be None or >= 1."
):
pool.map(bool, [], buffersize=0)

for buffersize, iterable_size in [
(1, 5),
(5, 5),
(10, 5),
]:
iterable = range(iterable_size)
processed_elements = manager.list()
with ExecutorType(max_workers=1) as pool:
iterator = pool.map(
processed_elements.append, iterable, buffersize=buffersize
)
time.sleep(0.2) # wait for buffered futures to finish
self.assertSetEqual(set(processed_elements), set(range(min(buffersize, iterable_size))))
next(iterator)
time.sleep(0.1) # wait for the created future to finish
self.assertSetEqual(
set(processed_elements), set(range(min(buffersize + 1, iterable_size)))
)


def setUpModule():
setup_module()


if __name__ == "__main__":
unittest.main()