diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index ce72127127c7a6..85ad025cad4f54 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -39,11 +39,13 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(fn, *iterables, timeout=None, chunksize=1) + .. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None) Similar to :func:`map(fn, *iterables) ` except: - * the *iterables* are collected immediately rather than lazily; + * the *iterables* are collected immediately rather than lazily, unless a + *buffersize* is specified: If the buffer is full, then the iteration + over *iterables* is paused until a result is yielded from the buffer. * *fn* is executed asynchronously and several calls to *fn* may be made concurrently. @@ -68,6 +70,9 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. + .. versionchanged:: 3.15 + Added the *buffersize* argument. + .. method:: shutdown(wait=True, *, cancel_futures=False) Signal the executor that it should free any resources that it is using diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 707fcdfde79acd..b804ea3cf1187a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,10 +4,12 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +from itertools import islice import logging import threading import time import types +import weakref FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' @@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -584,6 +586,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + buffersize: The maximum number of not-yet-yielded results buffered. + If the buffer is full, then iteration over `iterables` is paused + until an element is yielded out of the buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -594,10 +599,21 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before the given timeout. Exception: If fn(*args) raises for any values. """ + if buffersize is not None and buffersize < 1: + raise ValueError("buffersize must be None or >= 1.") + if timeout is not None: end_time = timeout + time.monotonic() - fs = [self.submit(fn, *args) for args in zip(*iterables)] + args_iter = iter(zip(*iterables)) + if buffersize: + fs = collections.deque( + self.submit(fn, *args) for args in islice(args_iter, buffersize) + ) + else: + fs = [self.submit(fn, *args) for args in args_iter] + + executor_weakref = weakref.ref(self) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -607,6 +623,8 @@ def result_iterator(): fs.reverse() while fs: # Careful not to keep a reference to the popped future + if buffersize and (executor := executor_weakref()) and (args := next(args_iter, None)): + fs.appendleft(executor.submit(fn, *args)) if timeout is None: yield _result_or_cancel(fs.pop()) else: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7092b4757b5429..548f376b021d7b 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -824,6 +824,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + buffersize: The maximum number of not-yet-yielded results buffered. + If the buffer is full, then iteration over `iterables` is paused + until an element is yielded out of the buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -839,7 +842,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), itertools.batched(zip(*iterables), chunksize), - timeout=timeout) + timeout=timeout, + buffersize=buffersize) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True, *, cancel_futures=False): diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py new file mode 100644 index 00000000000000..adaf29c67fa121 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -0,0 +1,44 @@ +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from multiprocessing import Manager +import time +import unittest + +from .util import BaseTestCase, setup_module + + +class PoolExecutorTest(BaseTestCase): + def test_map_buffersize(self): + manager = Manager() + for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor): + with ExecutorType(max_workers=1) as pool: + with self.assertRaisesRegex( + ValueError, "buffersize must be None or >= 1." + ): + pool.map(bool, [], buffersize=0) + + for buffersize, iterable_size in [ + (1, 5), + (5, 5), + (10, 5), + ]: + iterable = range(iterable_size) + processed_elements = manager.list() + with ExecutorType(max_workers=1) as pool: + iterator = pool.map( + processed_elements.append, iterable, buffersize=buffersize + ) + time.sleep(0.2) # wait for buffered futures to finish + self.assertSetEqual(set(processed_elements), set(range(min(buffersize, iterable_size)))) + next(iterator) + time.sleep(0.1) # wait for the created future to finish + self.assertSetEqual( + set(processed_elements), set(range(min(buffersize + 1, iterable_size))) + ) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main()