Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dask: Continue with others task if one fails

I have a simple (but large) task Graph in Dask. This is a code example

results = []

for params in SomeIterable:
    a = dask.delayed(my_function)(**params)
    b = dask.delayed(my_other_function)(a)
    results.append(b)

dask.compute(**results)


Here SomeIterable is a list of dict, where each are arguments to my_function. In each iteration b depends on a, so if the task that produces a fails, b can't be computed. But, each element of results are independent, so I expect if one fails, the other can continue running. This does not happen in practice, if an element of results fails, then the execution of the script ends.

EDIT:

This also happen when using the submit (or map) method of the client class dask.distributed.Client, for example

futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable]
results = [ft.result() for ft in futures]

In the code above if one task fails when I try to gather a result, all code fails as in the docs

like image 358
Andrex Avatar asked Jan 18 '26 12:01

Andrex


1 Answers

An easy way out of this is to wrap your functions in try/except, so something like this:

def try_f(params):
    try:
        a = my_function(**params)
        b = my_other_function(a)
    except:
        b = f"Failed for: {params}"
    return b

results = [dask.delayed(try_f)(params) for params in SomeIterable]
computed = dask.compute(results)

However, depending on your case, you might want to use the client.submit API, since it will give you some further flexibility, e.g. specifying some conditional retries.

like image 114
SultanOrazbayev Avatar answered Jan 20 '26 02:01

SultanOrazbayev



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!