7
7
import time
8
8
import weakref
9
9
from concurrent .futures import CancelledError , TimeoutError
10
+ from itertools import islice
10
11
from unittest import mock
11
12
12
13
import pytest
@@ -175,6 +176,9 @@ def test_map(executor):
175
176
results = list (executor .map (lambda x : x + 1 , range (10 )))
176
177
assert results == [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]
177
178
179
+ results = list (executor .map (lambda x , y : x + y , range (10 ), range (9 )))
180
+ assert results == [0 , 2 , 4 , 6 , 8 , 10 , 12 , 14 , 16 ]
181
+
178
182
179
183
def test_map_timeout (executor ):
180
184
"""Test that map with timeout raises TimeoutError and cancels futures"""
@@ -200,6 +204,56 @@ def func(x):
200
204
assert set (results ) != {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 }
201
205
202
206
207
+ def test_map_error (executor ):
208
+ """Test that map with an exception will raise, and remaining tasks are cancelled"""
209
+ results = []
210
+
211
+ def func (x ):
212
+ nonlocal results
213
+ time .sleep (0.05 )
214
+ if len (results ) == 5 :
215
+ raise ValueError ("Test error" )
216
+ results .append (x )
217
+ return x
218
+
219
+ with pytest .raises (ValueError ):
220
+ list (executor .map (func , range (15 )))
221
+
222
+ executor .shutdown (wait = True , cancel_futures = False )
223
+ assert len (results ) <= 10 , "Final 5 at least should have been cancelled"
224
+
225
+
226
+ @pytest .mark .parametrize ("cancel" , [True , False ])
227
+ def test_map_shutdown (executor , cancel ):
228
+ results = []
229
+
230
+ def func (x ):
231
+ nonlocal results
232
+ time .sleep (0.05 )
233
+ results .append (x )
234
+ return x
235
+
236
+ # Get the first few results.
237
+ # Keep the iterator alive so that it isn't closed when its reference is dropped.
238
+ m = executor .map (func , range (15 ))
239
+ values = list (islice (m , 5 ))
240
+ assert values == [0 , 1 , 2 , 3 , 4 ]
241
+
242
+ executor .shutdown (wait = True , cancel_futures = cancel )
243
+ if cancel :
244
+ assert len (results ) < 15 , "Some tasks should have been cancelled"
245
+ else :
246
+ assert len (results ) == 15 , "All tasks should have been completed"
247
+
248
+
249
+ def test_map_start (executor ):
250
+ """Test that map starts tasks immediately, before iterating"""
251
+ e = threading .Event ()
252
+ m = executor .map (lambda x : (e .set (), x ), range (1 ))
253
+ e .wait (timeout = 0.1 )
254
+ assert list (m ) == [(None , 0 )]
255
+
256
+
203
257
def test_closing (executor ):
204
258
"""Test that closing context manager works as expected"""
205
259
# mock the shutdown method of the executor
0 commit comments