I want to implement retry logic with Python's concurrent.futures.ThreadPoolExecutor. I would like the following properties:
A lot of existing code I found online basically operates in "rounds", where they call as_completed on an initial list of futures, resubmits failed futures, gathers those futures in a new list, and goes back to calling as_completed on the new list if it's not empty. Basically something like this:
with concurrent.futures.ThreadPoolExecutor(...) as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        for fut in concurrent.futures.as_completed(futures):
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        futures = new_futures
However, I think that doesn't satisfy the first property, since it's possible that a retried future completes before the initial futures, but we won't process it until all the initial futures complete.
Here's a hypothetical pathological case. Let's say we have two jobs, the first runs for 1 second but has a 90% chance of failure, while the second runs for 100 seconds. If our executor has 2 workers, and the first job fails after 1 second, we'll retry it immediately. But if it failed again, we won't be able to retry until the second job completes.
So my question is, is it possible to implement retry logic with these desired properties, without using external libraries or rewriting low-level executor logic? One thing I tried is putting the retry logic in the code sent to the worker:
def worker_job(fn):
    try:
        return fn()
    except Exception:
        executor.submit(fn)
with concurrent.futures.ThreadPoolExecutor(...) as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    executor.map(worker_job, jobs)
But it seems like submitting new jobs from within a job doesn't work.
as_completedLoop with wait(..., return_when=FIRST_COMPLETED) instead of as_completed(...).
Trade-offs:
pending futures (re-adding waiter, building new_futures).timeout.with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
        for fut in done:
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        for fut in pending:
            job = futures[fut]
            new_futures[fut] = job
        futures = new_futures
Tweak as_completed(...) to add to fs and pending, and use waiter.
Trade-off: Maintenance.
Advantage: Ability to specify overall timeout if wanted.
class AsCompletedWaiterWrapper:
    def __init__(self):
        self.fs = None
        self.pending = None
        self.waiter = None
    def listen(self, fut):
        with self.waiter.lock:
            self.fs.add(fut)
            self.pending.add(fut)
            fut._waiters.append(self.waiter)
    def as_completed(self, fs, timeout=None):
        """
        concurrent.futures.as_completed plus the 3 lines marked with +.
        """
        if timeout is not None:
            end_time = timeout + time.monotonic()
        fs = set(fs)
        total_futures = len(fs)
        with _AcquireFutures(fs):
            finished = set(
                    f for f in fs
                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
            pending = fs - finished
            waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
        self.fs = fs            # +
        self.pending = pending  # +
        self.waiter = waiter    # +
        finished = list(finished)
        try:
            yield from _yield_finished_futures(finished, waiter,
                                               ref_collect=(fs,))
            while pending:
                if timeout is None:
                    wait_timeout = None
                else:
                    wait_timeout = end_time - time.monotonic()
                    if wait_timeout < 0:
                        raise TimeoutError(
                                '%d (of %d) futures unfinished' % (
                                len(pending), total_futures))
                waiter.event.wait(wait_timeout)
                with waiter.lock:
                    finished = waiter.finished_futures
                    waiter.finished_futures = []
                    waiter.event.clear()
                # reverse to keep finishing order
                finished.reverse()
                yield from _yield_finished_futures(finished, waiter,
                                                   ref_collect=(fs, pending))
        finally:
            # Remove waiter from unfinished futures
            for f in fs:
                with f._condition:
                    f._waiters.remove(waiter)
Usage:
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    w = AsCompletedWaiterWrapper()
    for fut in w.as_completed(futures):
        if fut.exception():
            job = futures[fut]
            new_fut = executor.submit(fn, job)
            futures[new_fut] = job
            w.listen(new_fut)
        else:
            ...  # logic to handle successful job
Wait for events in with ... executor: as ThreadPoolExecutor.__exit__ shuts down executor so it cannot schedule new futures.
Trade-offs:
ProcessPoolExecutor due to executor reference in main process.def worker_job(fn, event):
    try:
        rv = fn()
        event.set()
        return rv
    except Exception:
        executor.submit(worker_job, fn, event)
with concurrent.futures.ThreadPoolExecutor() as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    events = [threading.Event() for _ in range(len(jobs))]
    executor.map(worker_job, jobs, events)
    for e in events:
        e.wait()
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With