In my code, I have two hypothetical tasks: one gets urls from a generator and batch downloads them using Twisted's Cooperator, and the other takes a downloaded source and asynchronously parses it. I'm trying to encapsulate all of the fetch and parse tasks into a single Deferred object that calls back when all pages are downloaded and all sources are parsed.
I've come up with the following solution:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}
    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)
    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)
    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred
    def on_finish(r):
        state['done'] = True
    deferreds = []
    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))
    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)
    return defer.DeferredList([main_tasks, result])
# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)
The code works, but I feel as if I'm either missing something blatantly obvious, or ignorant of a simple Twisted pattern that would make this a lot simpler. Is there a better way to return a single Deferred that calls back when all fetching and parsing is finished?
As currently written, it looks to me like this code will have a limited number of parallel downloads, but an unlimited number of parallel parse jobs. Is that intentional? I'm going to assume "no", since if your network happens to be fast and your parser happens to be slow, as the number of URLs approaches infinity, so does your memory usage :).
So here's a thing that will have limited parallelism but carry out parses sequentially with downloads, instead:
from twisted.internet import defer, task
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)
    coop = task.Cooperator()
    urls = fetch_urls()
    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))
task.react(main_task)
This works because since parse (apparently) returns a Deferred, adding it as a callback to the one returned by getPage results in a Deferred that won't call the callback added by coiterate until parse has done its business.
Since you were asking about idiomatic Twisted code, I've also taken the liberty of modernizing it a bit (using task.react rather than running the reactor manually, inlining expressions to make things briefer and so on).
If you really do want to have more parallel parses than parallel fetches, something like this might work better then:
from twisted.internet import defer, task
from twisted.web.client import getPage
PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10
def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)
    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)
    coop = task.Cooperator()
    urls = fetch_urls()
    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))
task.react(main_task)
You can see that parseWhenReady returns the Deferred returned from acquire, so parallel fetching will continue as soon as the parallel parse can begin, and therefore you won't continue fetching indiscriminately even when the parser is overloaded.  However, parallelParse carefully abstains from returning the Deferred returned by parse or release, since fetching should be able continue as those are proceeding.
(Please note that since your initial example was not runnable, I haven't tested either of these at all. Hopefully the intent is clear even if there are bugs.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With