Apparently I must have a fundamental misunderstanding about DistributedArrays.jl. I have set up a MWE of something similar to what I have to do:
using Distributed
using DistributedArrays
addprocs()
@everywhere using Distributed, DistributedArrays
a = distribute(zeros(5))
@sync @distributed for i in 1:5
a_l = localpart(a)
a_l[i] = 100 * i
end
And then I run into the following Error:
ERROR: TaskFailedException:
On worker 2:
BoundsError: attempt to access 1-element Array{Float64,1} at index [2]
setindex! at ./array.jl:847
macro expansion at /home/user/test.jl:36 [inlined]
[inlined]
#17 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:301
#160 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:87
#103 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:290
run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:88
#96 at ./task.jl:356
...and 3 more exception(s).
Stacktrace:
[1] sync_end(::Channel{Any}) at ./task.jl:314
[2] (::Distributed.var"#159#161"{var"#17#18",UnitRange{Int64}})() at ./task.jl:333
Stacktrace:
sync_end(::Channel{Any}) at ./task.jl
top-level scope at task.jl
Using a = dzeros((5,1), workers()) also gives the same Error. Any help is appreciated!
There are two problems:
localpart is indexed starting from 1localparts.Let us consider this code:
a = distribute(zeros(5));
@sync @distributed for i in 1:5
for j in keys(a[:L])
a[:L][j] = 100 * i+myid()
end
end
While it solves the first issue the second is still there:
julia> a
5-element DArray{Float64, 1, Vector{Float64}}:
402.0
503.0
0.0
0.0
0.0
Why it does not work as expected? because addprocs is adding all processes so I have now 8 workers and the size of loop is 5.
Perhaps the simplest solution is to replace the range from 1:5 to 1:max(5,nworkers()). This makes sure that each localpart is going to get processed.
julia> @sync @distributed for i in 1:max(5,nworkers())
@show i, myid(), length(a[:L])
for j in keys(a[:L])
a[:L][j] = 100 * i+myid()
end
end
From worker 9: (i, myid(), length(a[:L])) = (6, 9, 0)
From worker 7: (i, myid(), length(a[:L])) = (4, 7, 0)
From worker 2: (i, myid(), length(a[:L])) = (7, 2, 1)
From worker 8: (i, myid(), length(a[:L])) = (5, 8, 0)
From worker 3: (i, myid(), length(a[:L])) = (8, 3, 1)
From worker 4: (i, myid(), length(a[:L])) = (1, 4, 1)
From worker 5: (i, myid(), length(a[:L])) = (2, 5, 1)
From worker 6: (i, myid(), length(a[:L])) = (3, 6, 1)
Task (done) @0x0000000073e09f50
This code run shows clearly what is happening when you loop over 5 elements and use 8 workers.
The result is now as expected (with regard that tasks are randomly allocated around workers):
julia> a
5-element DArray{Float64, 1, Vector{Float64}}:
702.0
803.0
104.0
205.0
306.0
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With