I am trying to understand how to use the package Distributed together with SharedArrays to perform parallel operations with julia. Just as an example I am takingt a simple Montecarlo average method
using Distributed
using SharedArrays
using Statistics
const NWorkers = 2
const Ns = Int(1e6)
function parallelRun()
addprocs(NWorkers)
procsID = workers()
A = SharedArray{Float64,1}(Ns)
println("starting loop")
for i=1:2:Ns
#parallel block
@sync for p=1:NWorkers
@async A[i+p-1] = remotecall_fetch(rand,procsID[p]);
end
end
println(mean(A))
end
function singleRun()
A = zeros(Ns)
for i=1:Ns
A[i] = rand()
end
println(mean(A))
end
However if I @time both functions I get
julia> @time singleRun()
0.49965531193003165
0.009762 seconds (17 allocations: 7.630 MiB)
julia> @time parallelRun()
0.4994892300029917
46.319737 seconds (66.99 M allocations: 2.665 GiB, 1.01% gc time)
In particular there are many more allocations in the parallel version, which makes the code much slower.
Am I missing something?
By the way the reason why I am using @sync and @async (even if not needed in this framework since every sample can be computed in random order) is just because I would like to apply the same strategy to solve a parabolic PDE numerically with something on the line of
for t=1:time_steps
#parallel block
@sync for p=1:NWorkers
@async remotecall(make_step_PDE,procsID[p],p);
end
end
where each worker indexed by p should work on a disjoint set of indices of my equation.
Thanks in advance
There are the following problems in your code:
i a and this is just expensive and in the end it takes long. Basically the rule of thumb is to use @distributed macro for your load balancing across workers this will just evenly share the work.addprocs inside your work function because every time you run it, every time you add new processes - spawning a new Julia process also takes lots of time and this was included in your measurements. In practice this means you want to run addprocs at some part of the script that performs the initialization or perhaps the processes are added via starting the julia process with -p or --machine-file parameter@time always twice - in the first measurement @time is also measuring compilation times and the compilation in a distributed environment takes much longer than in a single process.Your function should look more or less like this
using Distributed, SharedArrays
addprocs(4)
@everywhere using Distributed, SharedArrays
function parallelRun(Ns)
A = SharedArray{Float64,1}(Ns)
@sync @distributed for i=1:Ns
A[i] = rand();
end
println(mean(A))
end
You might also consider completely splitting the data between workers. This in some scenarios is less bug prone and allows you to distribute over many nodes:
using Distributed, DistributedArrays
addprocs(4)
@everywhere using Distributed, DistributedArrays
function parallelRun2(Ns)
d = dzeros(Ns) #creates an array distributed evenly on all workers
@sync @distributed for i in 1:Ns
p = localpart(d)
p[((i-1) % Int(Ns/nworkers())+1] = rand()
end
println(mean(d))
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With