I'm trying to run something that looks like this:
y = @parallel (min) for i in collection
    f(i)
end
where f(i) is a function that's essentially a while loop that counts how many iterations it takes to fulfill its conditions. At the beginning, one of the termination conditions is a predetermined number of iterations, n. However, if f(i) ever returns less than n then ideally I would want to replace n with the value of f(i) (e.g., since I am looking for the minimum f(i), if f(j) is m I would want all other loops to stop checking if they reach m iterations).  
I'm new to parallel computing and so I'm likely misinterpreting the documentation, but I think that I should be able to do something like this:
x = Channel{Int64}(1)
put!(x,n)
y = @parallel (min) for i in collection
    f(i,x)
end
close(x)
where I've modified f to take a Channel parameter and now it looks something like this:
@everywhere function f(item,chan)
    going = true
    count = 0
    while (going)
        going = false
        # perform some operations
        if (count < fetch(chan) && !conditions_met())
            # conditions_met checks the other termination conditions
            going = true
            count += 1
        end
    end
    count += 1
    if (count < fetch(chan))
        take!(chan)
        put!(chan,count)
    end
    return count
end
If I replace the first count < fetch(chan) with count < n and remove the other if block/Channel code, the script runs fine. But, since n will be several orders of magnitude larger than the minimum f(i), if I could do something like I've described it would speed up computation significantly. Is this something I should be able to do and if so, am I approaching this correctly?
Right now I am experiencing the following error (running with 4 procs):
ERROR (unhandled task failure): On worker 3:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
ERROR: LoadError: On worker 2:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in preduce at multi.jl:1523
 [inlined code] from multi.jl:1532
 in anonymous at expr.jl:113
 [inlined code] from /home/michael/Documents/julia/script.jl:125
 in anonymous at no file:0
while loading /home/michael/Documents/julia/script.jl, in expression starting on line 121
ERROR (unhandled task failure): On worker 4:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
ERROR (unhandled task failure): On worker 5:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
where line 98 is the take!(chan) statement in the function definition and line 126 is f(i,x) inside the parallel for loop.
Channels implement CSP-like semantics for async communication, but they have no automated mechanism of sharing across parallel processes. You need to use RemoteRef for such purpose: http://docs.julialang.org/en/release-0.4/manual/parallel-computing/#remoterefs-and-abstractchannels
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With