Skip to content

DArray: Operations always return DArrays, and add local partition support for MPI #408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 84 additions & 76 deletions docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@ This should not be confused with the [DistributedArrays.jl](https://github.com/J

A `DArray` can be created in two ways: through an API similar to the usual
`rand`, `ones`, etc. calls, or by distributing an existing array with
`distribute`. Additionally, most operations on `DArray`s also return `DArray`s
or an equivalent object which represents the operation being performed. It's
generally not recommended to manually construct a `DArray` object unless you're
developing the `DArray` itself.
`distribute`. It's generally not recommended to manually construct a `DArray`
object unless you're developing the `DArray` itself.

### Allocating new arrays

As an example, one can allocate a random `DArray` by calling `rand` with a
`Blocks` object as the first argument - `Blocks` specifies the size of
partitions to be constructed. Note that the `DArray` is a lazy asynchronous
object (i.e. operations on it may execute in the background), so to force it to
be materialized, `fetch` may need to be called:
partitions to be constructed, and must be the same number of dimensions as the
array being allocated.

```julia
# Add some Julia workers
Expand All @@ -48,9 +45,6 @@ julia> using Distributed; addprocs(6)
julia> @everywhere using Dagger

julia> DX = rand(Blocks(50, 50), 100, 100)
Dagger.AllocateArray{Float64, 2}(100, 100)

julia> fetch(DX)
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
```

Expand All @@ -59,6 +53,18 @@ should be allocated which is in total 100 x 100, split into 4 blocks of size 50
x 50, and initialized with random `Float64`s. Many other functions, like
`randn`, `ones`, and `zeros` can be called in this same way.

Note that the `DArray` is an asynchronous object (i.e. operations on it may
execute in the background), so to force it to be materialized, `fetch` may need
to be called:

```julia
julia> fetch(DX)
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
```

This doesn't change the type or values of the `DArray`, but it does make sure
that any pending operations have completed.

To convert a `DArray` back into an `Array`, `collect` can be used to gather the
data from all the Julia workers that they're on and combine them into a single
`Array` on the worker calling `collect`:
Expand Down Expand Up @@ -97,26 +103,20 @@ julia> collect(DX)
### Distributing existing arrays

Now let's look at constructing a `DArray` from an existing array object; we can
do this by calling `Distribute`:
do this by calling `distribute`:

```julia
julia> Z = zeros(100, 500);

julia> Dzeros = Distribute(Blocks(10, 50), Z)
Distribute{Float64, 2}(100, 500)

julia> fetch(Dzeros)
Dagger.DArray{Any, 2, typeof(cat)}(100, 500)
```

If we wanted to skip having to call `fetch`, we could just call `distribute`,
which blocks until distributing the array is completed:

```julia
julia> Dzeros = distribute(Z, Blocks(10, 50))
Dagger.DArray{Any, 2, typeof(cat)}(100, 500)
```

This will distribute the array partitions (in chunks of 10 x 50 matrices)
across the workers in the Julia cluster in a relatively even distribution;
future operations on a `DArray` may produce a different distribution from the
one chosen by `distribute`.

## Broadcasting

As the `DArray` is a subtype of `AbstractArray` and generally satisfies Julia's
Expand All @@ -125,80 +125,88 @@ expected:

```julia
julia> DX = rand(Blocks(50,50), 100, 100)
Dagger.AllocateArray{Float64, 2}(100, 100)
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)

julia> DY = DX .+ DX
Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(+), Tuple{Dagger.AllocateArray{Float64, 2}, Dagger.AllocateArray{Float64, 2}}}, Float64, 2}(100, 100)
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)

julia> DZ = DY .* 3
Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(*), Tuple{Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(+), Tuple{Dagger.AllocateArray{Float64, 2}, Dagger.AllocateArray{Float64, 2}}}, Float64, 2}, Int64}}, Float64, 2}(100, 100)

julia> size(DZ)
(100, 100)

julia> DA = fetch(DZ)
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)
Dagger.DArray{Float64, 2, Blocks{2}, typeof(cat)}(100, 100)
```

Now, `DA` is the lazy result of computing `(DX .+ DX) .* 3`. Note that `DArray`
objects are immutable, and operations on them are thus functional
Now, `DZ` will contain the result of computing `(DX .+ DX) .* 3`. Note that
`DArray` objects are immutable, and operations on them are thus functional
transformations of their input `DArray`.

!!! note
Support for mutation of `DArray`s is planned for a future release

Additionally, note that we can still call `size` on these lazy `BCast` objects,
as it's clear what the final output's size will be.

```
julia> Dagger.chunks(DA)
julia> Dagger.chunks(DZ)
2×2 Matrix{Any}:
EagerThunk (finished) EagerThunk (finished)
EagerThunk (finished) EagerThunk (finished)

julia> Dagger.chunks(fetch(DZ))
2×2 Matrix{Union{Thunk, Dagger.Chunk}}:
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(4, 8, 0x0000000000004e20), ThreadProc(4, 1), AnyScope(), true) … Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(2, 5, 0x0000000000004e20), ThreadProc(2, 1), AnyScope(), true)
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(5, 5, 0x0000000000004e20), ThreadProc(5, 1), AnyScope(), true) Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(3, 3, 0x0000000000004e20), ThreadProc(3, 1), AnyScope(), true)
```

Here we can see the `DArray`'s internal representation of the partitions, which
are stored as Dagger `Chunk` objects (which, as a reminder, may reference data
which exists on other Julia workers). One doesn't typically need to worry about
these internal details unless implementing operators on `DArray`s.
are stored as either `EagerThunk` objects (representing an ongoing or completed
computation) or `Chunk` objects (which reference data which exist locally or on
other Julia workers). Of course, one doesn't typically need to worry about
these internal details unless implementing low-level operations on `DArray`s.

Finally, it's all the same to get the result of this complicated set of
broadcast operations; just use `fetch` to get a `DArray`, and `collect` to get
an `Array`:
Finally, it's easy to see the results of this combination of broadcast
operations; just use `collect` to get an `Array`:

```
julia> DA *= 2
Dagger.BCast{Base.Broadcast.Broadcasted{Dagger.DaggerBroadcastStyle, Tuple{Base.OneTo{Int64}, Base.OneTo{Int64}}, typeof(*), Tuple{Dagger.DArray{Any, 2, typeof(cat)}, Int64}}, Any, 2}(100, 100)

julia> fetch(DA)
Dagger.DArray{Any, 2, typeof(cat)}(100, 100)

julia> collect(DA)
julia> collect(DZ)
100×100 Matrix{Float64}:
11.6021 9.12356 0.407394 11.2524 4.89022 … 3.26229 1.23314 1.96686 3.04927 3.65649
3.78571 6.24751 2.74505 8.3009 11.4331 0.336563 9.37329 2.84604 8.52946 10.9168
3.9987 0.641359 3.1918 11.4368 4.41555 1.12344 5.44424 3.49739 3.32251 8.86685
7.90953 1.50281 1.91451 4.89621 9.44033 2.97169 9.68018 11.8686 4.74035 8.49143
1.0611 5.5909 10.364 5.48194 6.821 0.66667 5.33619 5.56166 8.19974 7.02791
7.47418 11.3061 7.9809 2.34617 7.90996 … 6.30402 10.2203 4.92873 8.22024 7.41224
7.06002 0.604601 11.6572 4.95498 0.671179 5.42867 8.19648 0.611793 11.9469 1.6628
2.97898 0.738068 4.44802 5.81322 7.3991 8.71256 2.48281 11.0882 10.9801 11.2464
1.34064 7.37116 1.14921 3.95358 9.73416 7.83354 10.8357 0.270462 9.93926 9.05206
8.77125 0.44711 11.7197 11.6632 8.21711 2.20143 5.06451 3.92386 3.90197 4.32807
10.6201 4.82176 8.4164 10.5457 2.65546 … 10.4681 1.00604 7.05816 6.33214 4.13517
10.6633 10.2059 7.06543 1.58093 5.33819 7.86821 9.56034 2.37929 4.39098 11.6246
11.1778 6.76896 10.249 11.3147 9.7838 6.17893 0.433731 0.713574 9.99747 0.570143
⋮ ⋱ ⋮
6.19119 11.027 10.0742 3.51595 0.48755 3.56015 7.43083 0.624126 9.0292 3.04445
3.38276 5.32876 2.66453 4.08388 6.51538 10.8722 5.14729 3.7499 7.11074 11.3595
4.10258 0.474511 0.852416 4.79806 5.21663 … 9.96304 5.82279 0.818069 9.85573 8.9645
6.03249 8.82392 2.14424 10.7512 8.28873 8.32419 2.96016 4.97967 2.52393 2.31372
7.25826 8.49308 3.90884 3.03783 3.67546 6.63201 5.18839 1.99734 8.51863 8.7656
11.6969 1.29504 0.745432 0.119002 6.11005 5.3909 2.61199 11.5168 8.25466 2.29896
10.7 9.66697 2.34518 6.68043 4.09362 11.6484 2.53879 9.95172 3.97177 9.53493
11.652 3.53655 8.38743 3.75028 11.8518 … 3.11588 1.07276 8.12898 8.80697 1.50331
9.69158 11.2718 8.98014 2.71964 4.11854 0.840723 4.55286 4.47269 8.30213 0.927262
10.5868 11.9395 8.22633 6.71811 9.6942 2.2561 0.233772 1.76577 9.67937 8.29349
9.19925 5.77384 2.18139 10.3563 6.7716 9.8496 11.3777 6.43372 11.2769 4.82911
9.15905 8.12721 11.1374 6.32082 3.49716 7.23124 10.3995 6.98103 7.72209 6.08033
5.72754 1.23614 4.67045 4.89095 3.40126 … 5.07663 1.60482 5.04386 1.44755 2.5682
0.189402 3.64462 5.92218 3.94603 2.32192 1.47115 4.6364 0.778867 3.13838 4.87871
3.3492 3.96929 3.46377 1.29776 3.59547 4.82616 1.1512 3.02528 3.05538 0.139763
5.0981 5.72564 5.1128 0.954708 2.04515 2.50365 5.97576 5.17683 4.79587 1.80113
1.0737 5.25768 4.25363 0.943006 4.25783 4.1801 3.14444 3.07428 4.41075 2.90252
5.48746 5.17286 3.99259 0.939678 3.76034 … 0.00763076 2.98176 1.83674 1.61791 3.33216
1.05088 4.98731 1.24925 3.57909 2.53366 5.96733 2.35186 5.75815 3.32867 1.15317
0.0335647 3.52524 0.159895 5.49908 1.33206 3.51113 0.0753356 1.5557 0.884252 1.45085
5.27506 2.00472 0.00636555 0.461574 5.16735 2.74457 1.14679 2.39407 0.151713 0.85013
4.43607 4.50304 4.73833 1.92498 1.64338 4.34602 4.62612 3.28248 1.32726 5.50207
5.22308 2.53069 1.27758 2.62013 3.73961 … 5.91626 2.54943 5.41472 1.67197 4.09026
1.09684 2.53189 4.23236 0.14055 0.889771 2.20834 2.31341 5.23121 1.74341 4.00588
2.55253 4.1789 3.50287 4.96437 1.26724 3.04302 3.74262 5.46611 1.39375 4.13167
3.03291 4.43932 2.85678 1.59531 0.892166 0.414873 0.643423 4.425 5.48145 5.93383
0.726568 0.516686 3.00791 3.76354 3.32603 2.19812 2.15836 3.85669 3.67233 2.1261
2.22763 1.36281 4.41129 5.29229 1.10093 … 0.45575 4.38389 0.0526105 2.14792 2.26734
2.58065 1.99564 4.82657 0.485823 5.24881 2.16097 3.59942 2.25021 3.96498 0.906153
0.546354 0.982523 1.94377 2.43136 2.77469 4.43507 5.98402 0.692576 1.53298 1.20621
4.71374 4.99402 1.5876 1.81629 2.56269 1.56588 5.42296 0.160867 4.17705 1.13915
2.97733 2.4476 3.82752 1.3491 3.5684 1.23393 1.86595 3.97154 4.6419 4.8964
⋮ ⋱ ⋮
3.49162 2.46081 1.21659 2.96078 4.58102 5.97679 3.34463 0.202255 2.85433 0.0786219
0.894714 2.87079 5.09409 2.2922 3.18928 1.5886 0.163886 5.99251 0.697163 5.75684
2.98867 2.2115 5.07771 0.124194 3.88948 3.61176 0.0732554 4.11606 0.424547 0.621287
5.95438 3.45065 0.194537 3.57519 1.2266 2.93837 1.02609 5.84021 5.498 3.53337
2.234 0.275185 0.648536 0.952341 4.41942 … 4.78238 2.24479 3.31705 5.76518 0.621195
5.54212 2.24089 5.81702 1.96178 4.99409 0.30557 3.55499 0.851678 1.80504 5.81679
5.79409 4.86848 3.10078 4.22252 4.488 3.03427 2.32752 3.54999 0.967972 4.0385
3.06557 5.4993 2.44263 1.82296 0.166883 0.763588 1.59113 4.33305 2.8359 5.56667
3.86797 3.73251 3.14999 4.11437 0.454938 0.166886 0.303827 4.7934 3.37593 2.29402
0.762158 4.3716 0.897798 4.60541 2.96872 … 1.60095 0.480542 1.41945 1.33071 0.308611
1.20503 5.66645 4.03237 3.90194 1.55996 3.58442 4.6735 5.52211 5.46891 2.43612
5.51133 1.13591 3.26696 4.24821 4.60696 3.73251 3.25989 4.735 5.61674 4.32185
2.46529 0.444928 3.85984 5.49469 1.13501 1.36861 5.34651 0.398515 0.239671 5.36412
2.62837 3.99017 4.52569 3.54811 3.35515 4.13514 1.22304 1.01833 3.42534 3.58399
4.88289 5.09945 0.267154 3.38482 4.53408 … 3.71752 5.22216 1.39987 1.38622 5.47351
0.1046 3.65967 1.62098 5.33185 0.0822769 3.30334 5.90173 4.06603 5.00789 4.40601
1.9622 0.755491 2.12264 1.67299 2.34482 4.50632 3.84387 3.22232 5.23164 2.97735
4.37208 5.15253 0.346373 2.98573 5.48589 0.336134 2.25751 2.39057 1.97975 3.24243
3.83293 1.69017 3.00189 1.80388 3.43671 5.94085 1.27609 3.98737 0.334963 5.84865
```

A variety of other operations exist on the `DArray`, and it should generally
behavior otherwise similar to any other `AbstractArray` type. If you find that
it's missing an operation that you need, please file an issue!
41 changes: 41 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,44 @@ calls to `hist!` may run in parallel
By using `map` on `temp_bins`, we then make a copy of each worker's bins that
we can safely return back to our current worker, and sum them together to get
our total histogram.

-----

## Quickstart: Distributed Arrays

Dagger's `DArray` type represents a distributed array, where a single large
array is implemented as a set of smaller array partitions, which may be
distributed across a Julia cluster.

For more details: [Distributed Arrays](@ref)

### Distribute an existing array

Distributing any kind of array into a `DArray` is easy, just use `distribute`,
and specify the partitioning you desire with `Blocks`. For example, to
distribute a 16 x 16 matrix in 4 x 4 partitions:

```julia
A = rand(16, 16)
DA = distribute(A, Blocks(4, 4))
```

### Allocate a distributed array directly

To allocate a `DArray`, just pass your `Blocks` partitioning object into the
appropriate allocation function, such as `rand`, `ones`, or `zeros`:

```julia
rand(Blocks(20, 20), 100, 100)
ones(Blocks(20, 100), 100, 2000)
zeros(Blocks(50, 20), 300, 200)
```

### Convert a `DArray` back into an `Array`

To get back an `Array` from a `DArray`, just call `collect`:

```julia
DA = rand(Blocks(32, 32), 256, 128)
collect(DA) # returns a `Matrix{Float64}`
```
25 changes: 16 additions & 9 deletions src/array/alloc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
f::Function
domain::ArrayDomain{N}
domainchunks
partitioning::AbstractBlocks
end
size(a::AllocateArray) = size(a.domain)

Expand All @@ -18,15 +19,15 @@ function _cumlength(len, step)
cumsum(extra > 0 ? vcat(ps, extra) : ps)
end

function partition(p::Blocks, dom::ArrayDomain)
function partition(p::AbstractBlocks, dom::ArrayDomain)
DomainBlocks(map(first, indexes(dom)),
map(_cumlength, map(length, indexes(dom)), p.blocksize))
end

function stage(ctx, a::AllocateArray)
alloc(idx, sz) = a.f(idx, a.eltype, sz)
thunks = [Dagger.@spawn alloc(i, size(x)) for (i, x) in enumerate(a.domainchunks)]
DArray(a.eltype,a.domain, a.domainchunks, thunks)
return DArray(a.eltype, a.domain, a.domainchunks, thunks, a.partitioning)
end

function Base.rand(p::Blocks, eltype::Type, dims)
Expand All @@ -35,7 +36,8 @@ function Base.rand(p::Blocks, eltype::Type, dims)
rand(MersenneTwister(s+idx), x...)
end
d = ArrayDomain(map(x->1:x, dims))
AllocateArray(eltype, f, d, partition(p, d))
a = AllocateArray(eltype, f, d, partition(p, d), p)
return _to_darray(a)
end

Base.rand(p::Blocks, t::Type, dims::Integer...) = rand(p, t, dims)
Expand All @@ -48,21 +50,24 @@ function Base.randn(p::Blocks, dims)
randn(MersenneTwister(s+idx), x...)
end
d = ArrayDomain(map(x->1:x, dims))
AllocateArray(Float64, f, d, partition(p, d))
a = AllocateArray(Float64, f, d, partition(p, d), p)
return _to_darray(a)
end
Base.randn(p::Blocks, dims::Integer...) = randn(p, dims)

function Base.ones(p::Blocks, eltype::Type, dims)
d = ArrayDomain(map(x->1:x, dims))
AllocateArray(eltype, (_, x...) -> ones(x...), d, partition(p, d))
a = AllocateArray(eltype, (_, x...) -> ones(x...), d, partition(p, d), p)
return _to_darray(a)
end
Base.ones(p::Blocks, t::Type, dims::Integer...) = ones(p, t, dims)
Base.ones(p::Blocks, dims::Integer...) = ones(p, Float64, dims)
Base.ones(p::Blocks, dims::Tuple) = ones(p, Float64, dims)

function Base.zeros(p::Blocks, eltype::Type, dims)
d = ArrayDomain(map(x->1:x, dims))
AllocateArray(eltype, (_, x...) -> zeros(x...), d, partition(p, d))
a = AllocateArray(eltype, (_, x...) -> zeros(x...), d, partition(p, d), p)
return _to_darray(a)
end
Base.zeros(p::Blocks, t::Type, dims::Integer...) = zeros(p, t, dims)
Base.zeros(p::Blocks, dims::Integer...) = zeros(p, Float64, dims)
Expand All @@ -73,7 +78,7 @@ function Base.zero(x::DArray{T,N}) where {T,N}
sd = first(x.subdomains)
part_size = ntuple(i->sd.indexes[i].stop, N)
a = zeros(Blocks(part_size...), T, dims)
return cached_stage(Context(global_context()), a)
return _to_darray(a)
end

function sprand(p::Blocks, m::Integer, n::Integer, sparsity::Real)
Expand All @@ -82,13 +87,15 @@ function sprand(p::Blocks, m::Integer, n::Integer, sparsity::Real)
sprand(MersenneTwister(s+idx), sz...,sparsity)
end
d = ArrayDomain((1:m, 1:n))
AllocateArray(Float64, f, d, partition(p, d))
a = AllocateArray(Float64, f, d, partition(p, d), p)
return _to_darray(a)
end

function sprand(p::Blocks, n::Integer, sparsity::Real)
s = rand(UInt)
f = function (idx,t,sz)
sprand(MersenneTwister(s+idx), sz...,sparsity)
end
AllocateArray(Float64, f, d, partition(p, ArrayDomain((1:n,))))
a = AllocateArray(Float64, f, d, partition(p, ArrayDomain((1:n,))), p)
return _to_darray(a)
end
Loading