Archive for June 10th, 2011

How to asynchronously exchange a dictionary using Twisted deferreds

Friday, June 10th, 2011

Here’s a fun class that I can’t think of a good use for :-) But I like its simplicity and it’s another illustration of what I like to call asynchronous data structures.

Suppose you have a producer who’s building a dictionary and a consumer who wants to look things up in it. The producer is working at their own pace, making new dictionary entries in whatever work order that suits them, and the consumer is requesting dictionary items in whatever order they need them. The two orders are obviously extremely unlikely to be the same if the dictionary is of non-trivial size. How do you write an asynchronous server (or data structure) that sits between the producer and consumer?

Yes, it’s far fetched, perhaps, but here’s a simple asynchronous dictionary class that lets you do it gracefully:

from collections import defaultdict
from twisted.internet import defer

class DeferredDict(dict):
    def __init__(self, *args, **kwargs):
        self._deferreds = defaultdict(set)
        dict.__init__(self, *args, **kwargs)

    def __getitem__(self, item):
        try:
            return defer.succeed(dict.__getitem__(self, item))
        except KeyError:
            d = defer.Deferred()
            self._deferreds[item].add(d)
            return d

    def __setitem__(self, item, value):
        if item in self._deferreds:
            for d in self._deferreds[item]:
                d.callback(value)
            del self._deferreds[item]
        dict.__setitem__(self, item, value)
 

When a consumer tries to get an element from the dictionary, they always get a deferred. The deferred will fire with the value from the dictionary when (if) it becomes available. Of course if the value is already known, they get it in a deferred that has already fired (via succeed). When the producer puts an element into the dictionary, any consumer deferreds that were waiting on that element’s value are given the value.

Note that a __delitem__ isn’t needed, we just inherit that from dict. If a non-existent item is deleted, you get the normal dictionary behavior (a KeyError). If the item does exist, that means the list of waiting deferreds on that item is empty (the fact the item exists means any waiting deferreds for the item have all been fired and that that item in the self._deferreds dictionary was deleted), so we can just let the dictionary class delete the item, as usual.

Graceful shutdown of a Twisted service with outstanding deferreds

Friday, June 10th, 2011

I’ve been spending a bit of time thinking again about queues and services. I wrote a Twisted class in 2009 to maintain a resizable dispatch queue (code in Launchpad, description on the Twisted mailing list). For this post I’ve pulled out (and simplified slightly) one of its helper classes, a DeferredPool.

This simple class maintains a set of deferreds and gives you a mechanism to get a deferred that will fire when (if!) the size of the set ever drops to zero. This is useful because it can be used to gracefully shut down a service that has a bunch of outstanding requests in flight. For each incoming request (that’s handled via a deferred), you add the deferred to the pool. When a signal arrives to tell the service to stop, you stop taking new requests and ask the pool for a deferred that will fire when all the outstanding deferreds are done, then you exit. This can all be done elegantly in Twisted, the last part by having the stopService method return the deferred you get back from the pool (perhaps after you add more cleanup callbacks to it).

Here’s the code:

from twisted.internet import defer

class DeferredPool(object):
    """Maintains a pool of not-yet-fired deferreds and gives a mechanism to
    request a deferred that fires when the pool size goes to zero."
""

    def __init__(self):
        self._pool = set()
        self._waiting = []

    def _fired(self, result, d):
        """Callback/errback each pooled deferred runs when it fires. The
        deferred first removes itself from the pool. If the pool is then
        empty, fire all the waiting deferreds (which were returned by
        notifyWhenEmpty)."
""
        self._pool.remove(d)
        if not self._pool:
            waiting, self._waiting = self._waiting, []
            for waiter in waiting:
                waiter.callback(None)
        return result

    def add(self, d):
        """Add a deferred to the pool."""
        d.addBoth(self._fired, d)
        self._pool.add(d)

    def notifyWhenEmpty(self, testImmediately=True):
        """Return a deferred that fires (with None) when the pool empties.
        If testImmediately is True and the pool is empty, return an already
        fired deferred (via succeed)."
""
        if testImmediately and not self._pool:
            return defer.succeed(None)
        else:
            d = defer.Deferred()
            self._waiting.append(d)
            return d
 

As usual I’m posting this example because I find Twisted’s deferreds so elegant. Here are a few comments on the above that might help you understand deferreds better.

A frequent pattern when creating and managing deferreds is that you can add callbacks and errbacks to them yourself to transparently do some housekeeping when they fire. In this case, for each deferred passed to add, I’m adding a callback and an errback that will run self._fired when the deferred fires. The first thing that method does is take the deferred out of the pool of outstanding deferreds. So the deferred itself cleans up the pool. It does that transparently, by which I mean that the call/errback function (self._fired) always returns whatever result it was passed. It’s on both the callback and errback chains of the deferred and has no effect on the result. The deferred may already have call/errbacks on it when it is passed to add, and it may have them added to it after add is done. Whoever created and is otherwise using the deferred will be none the wiser and is in no way affected.

When a deferred in the pool fires, it also checks to see if the pool size is zero and if there are any deferreds waiting to be informed of that. If so, it fires all the waiting deferreds and empties the list of waiting deferreds. This doesn’t mean the action is necessarily over. More deferreds can be added, more waiters can be added, etc. The pool size can go to zero again and if there are no waiters are waiting, no big deal, etc.

It’s easy to add functionality to e.g., record what time deferreds were added, provide stats, allow outstanding deferreds to be cancelled, add notifications when high/low water marks are reached, etc. But that’s enough for now. Feel free to ask questions below.