If I pre-scatter a data object across worker nodes, does it get copied in its entirety to each of the worker nodes? Is there an advantage in doing so if that data object is big?
Using the futures interface as an example:
client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
    results[i] = client.submit(nn_train_func, data, **params)
Using the delayed interface as an example:
client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
    results[i] = delayed(nn_train_func, data, **params)
The reason I ask is because I noticed the following phenomena:
delayed appears to re-send data to the worker nodes, thus approximately doubling memory usage. It appears that pre-scattering is not doing what I expected it to do, which is allow for the worker nodes to reference the pre-scattered data.futures interface takes a long time to iterate through the loop (significantly longer). I am currently not sure how to identify where the bottleneck here is.delayed interface, from the time the compute() function is called to the time that activity is reflected on the dashboard, there is an extensive delay, which I suspected was due to data copying.Pre-scattering is designed to avoid placing large object data into the task graph.
x = np.array(lots_of_data)
a = client.submit(add, x, 1)  # have to send all of x to the scheduler
b = client.submit(add, x, 2)  # again
c = client.submit(add, x, 3)  # and again
You'll feel this pain because client.submit will be slow to return, and Dask may even throw a warning.
So instead we scatter our data, receiving a future in return
x = np.array(lots_of_data)
x_future = client.scatter(x)
a = client.submit(add, x_future, 1)  # Only have to send the future/pointer
b = client.submit(add, x_future, 2)  # so this is fast
c = client.submit(add, x_future, 3)  # and this
In your situation you're almost doing this, the only difference is that you scatter your data, then forget about the future it returns, and send your data again.
client.scatter(data, broadcast=True)  # whoops!  forgot to capture the output
data = client.scatter(data, broadcast=True)  # data is now a future pointing to its remote value
You can choose to broadcast or not.  If you know that all of your workers will need this data then it's not a bad thing to do, but things will be fine regardless.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With