Skip to content

Commit 2e349ed

Browse files
committed
Update docs and fix exception handling on the final batch
1 parent 8e96e15 commit 2e349ed

File tree

1 file changed

+56
-42
lines changed

1 file changed

+56
-42
lines changed

base/asyncmap.jl

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ Uses multiple concurrent tasks to map `f` over a collection (or multiple
1111
equal length collections). For multiple collection arguments, `f` is
1212
applied elementwise.
1313
14-
`ntasks` specifies the number of tasks to run concurrently.
15-
Depending on the length of the collections, if `ntasks` is unspecified,
16-
up to 100 tasks will be used for concurrent mapping.
14+
`ntasks` specifies the maximum number of tasks permitted to run
15+
concurrently. the default is 100.
1716
18-
`ntasks` can also be specified as a zero-arg function. In this case, the
19-
number of tasks to run in parallel is checked before processing every element and a new
20-
task started if the value of `ntasks_func` is less than the current number
21-
of tasks.
17+
`ntasks` can also be specified as a zero-arg function. This is called before
18+
each task is scheduled (each iteration) to be used as the new
19+
maximum. Changing the maximum will not effect tasks that have already been
20+
scheduled, but it may effect the scheduling of new tasks.
2221
2322
If `batch_size` is specified, the collection is processed in batch mode. `f` must
2423
then be a function that must accept a `Vector` of argument tuples and must
@@ -54,50 +53,62 @@ julia> result = asyncmap(1:2) do x
5453
The following examples highlight execution in different tasks by returning
5554
the `objectid` of the tasks in which the mapping function is executed.
5655
57-
First, with `ntasks` undefined, each element is processed in a different task.
56+
First, with `ntasks` undefined, all three iterations can be started at once.
5857
```
59-
julia> tskoid() = objectid(current_task());
60-
61-
julia> asyncmap(x->tskoid(), 1:5)
62-
5-element Array{UInt64,1}:
63-
0x6e15e66c75c75853
64-
0x440f8819a1baa682
65-
0x9fb3eeadd0c83985
66-
0xebd3e35fe90d4050
67-
0x29efc93edce2b961
68-
69-
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
70-
5
58+
julia> asyncmap(1:3) do i
59+
println("before sleep \$i")
60+
sleep(1/i)
61+
println("after sleep \$i")
62+
i
63+
end
64+
before sleep 1
65+
before sleep 2
66+
before sleep 3
67+
after sleep 3
68+
after sleep 2
69+
after sleep 1
70+
3-element Array{Int64,1}:
71+
1
72+
2
73+
3
7174
```
7275
73-
With `ntasks=2` all elements are processed in 2 tasks.
76+
With `ntasks=2` the third iteration must wait until the second
77+
completes. However it can still start and finish before the first completes.
7478
```
75-
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
76-
5-element Array{UInt64,1}:
77-
0x027ab1680df7ae94
78-
0xa23d2f80cd7cf157
79-
0x027ab1680df7ae94
80-
0xa23d2f80cd7cf157
81-
0x027ab1680df7ae94
82-
83-
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
84-
2
79+
julia> asyncmap(1:3; ntasks=2) do i
80+
println("before sleep \$i")
81+
sleep(1/i)
82+
println("after sleep \$i")
83+
i
84+
end
85+
before sleep 1
86+
before sleep 2
87+
after sleep 2
88+
before sleep 3
89+
after sleep 3
90+
after sleep 1
91+
3-element Array{Int64,1}:
92+
1
93+
2
94+
3
8595
```
8696
8797
With `batch_size` defined, the mapping function needs to be changed to accept an array
8898
of argument tuples and return an array of results. `map` is used in the modified mapping
8999
function to achieve this.
90100
```
91-
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
92-
batch_func (generic function with 1 method)
93-
94-
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
101+
julia> asyncmap(1:5; ntasks=2, batch_size=2) do input
102+
map(input) do x
103+
string("args_tuple: ", x, ", element_val: ", x[1], ", ", current_task())
104+
end
105+
end
95106
5-element Array{String,1}:
96-
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
97-
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
98-
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
99-
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
100-
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
107+
"args_tuple: 1, element_val: 1, Task (runnable) @0x00007f6629271390"
108+
"args_tuple: 2, element_val: 2, Task (runnable) @0x00007f6629271390"
109+
"args_tuple: 3, element_val: 3, Task (runnable) @0x00007f6629271600"
110+
"args_tuple: 4, element_val: 4, Task (runnable) @0x00007f6629271600"
111+
"args_tuple: 5, element_val: 5, Task (runnable) @0x00007f6629271870"
101112
```
102113
103114
!!! note
@@ -208,7 +219,7 @@ function do_asyncmap(f, c::Tuple, ntasks::Function, batch_size)
208219

209220
new_batch()
210221
res = try
211-
map(c...) do x
222+
rs = map(c...) do x
212223
r = Ref{Any}(undef)
213224

214225
push!(batch_in, x)
@@ -221,10 +232,13 @@ function do_asyncmap(f, c::Tuple, ntasks::Function, batch_size)
221232

222233
r
223234
end
235+
236+
do_asyncmap_task!(exec_batch, s, batch_in, batch_out)
237+
238+
rs
224239
catch ex
225240
ex isa AbortMapException || rethrow()
226241
end
227-
do_asyncmap_task!(exec_batch, s, batch_in, batch_out)
228242

229243
wait_done(s)
230244

0 commit comments

Comments
 (0)