Archive for June, 2011

A resizable dispatch queue for Twisted

Monday, June 27th, 2011

In December 2009 I posted to the Twisted mailing list about what I called a Resizable Dispatch Queue. I’ve just spent some time making a new version that’s much improved. You can pick up the new version via pip install txrdq, from PyPI, or from the txRDQ project page on Launchpad. Here’s the example use case, taken from my original posting:

You want to write a server with a web interface that allows people to enter their phone number so you can send them an SMS. You anticipate lots of people will use the service. But sending SMS messages is quite slow, and the company that you ship those jobs off to is concerned that you’ll overrun their service (or maybe they have an API limit, etc). So you need to queue up jobs locally and send them off at a certain rate. You’d like to be able to adjust that rate up or down. You also want to be able to shut your service down cleanly (i.e., not in the middle of a task), and when you restart it you want to be able to re-queue the jobs that were queued last time but which hadn’t gone out.

To make the example more concrete, suppose your function that sends the SMS is called sendSMS and that it takes a (number, message) tuple argument. Here are some of the kinds of things you can do:

from txrdq.rdq import ResizableDispatchQueue

# Create a queue that will allow 5 simultaneous underway jobs.
rdq = ResizableDispatchQueue(sendSMS, 5)

# Later... send off some SMS messages.
d1 = rdq.put((2127399921, 'Hello...'), priority=5)
d1.addCallback(...)

d2 = rdq.put((5052929919, 'Test...'), priority=5)
d2.addCallback(...)

# Cancel the second job
d2.cancel()

# Widen the outgoing pipeline to 10 simultaneous jobs.
rdq.width = 10

# We're dispatching jobs too fast, turn it down a little.
rdq.width = 7

# Get a copy of the list of pending jobs.
jobs = rdq.pending()

# Cancel one of the pending jobs from the jobs list.
job.cancel()

# Reprioritize one of the pending jobs from the jobs list.
rdq.reprioritize(job, -1)

# Arrange to increase the number of jobs in one hour.
reactor.callLater(3600, rdq.setWidth, 20)

# Pause processing.
rdq.pause()

# Resume processing, with a new width of 8.
rdq.resume(8)

# Shutdown. Wait for any underway jobs to complete, and save
# the list of jobs not yet processed.

def saveJobs(jobs):
    pickle.dump(jobs, ...)

d = rdq.stop()
d.addCallback(saveJobs)
 

I’ve come up with many uses for this class over the last 18 months, and have quite often recommended it to people on the #twisted IRC channel. Other examples include fetching a large number of URLs in a controlled way, making many calls to the Twitter API, etc.

Usage of the class is very simple. You create the dispatch queue, giving it a function that will be called to process all jobs. Then you just put job arguments as fast as you like. Each call to put gets you a Twisted Deferred instance. If your function runs successfully on the argument, the deferred will call back with an instance of txrdq.job.Job. The job contains information about when it was queued, when it was launched, when it stopped, and of course the result of the function. If your function hits an error or the job is canceled (by calling cancel on the deferred), the deferred will errback and the failure will again contain a job instance with the details.

It’s also useful to have an admin interface to the queue, so calls such as pending and underway are provided. These return lists of job instances. You can call cancel on a job instance too. You can reprioritize jobs. And you can pause processing or shut the queue down cleanly.

The code also contains an independently useful Twisted classes called DeferredPriorityQueue (which I plan to write about), and DeferredPool (which I described earlier).

Back of the envelope calculations with The Rule of 72

Monday, June 20th, 2011

Image: internetworldstats.com

The Rule of 72 deserves to be better known among technical people. It’s a widely-known financial rule of thumb used for understanding and calculating interest rates. But others, including computer scientist and start-up founders, are often concerned with growth rates. Knowing and applying the rule of 72 can help in developing numerical literacy (numeracy) around growth.

For example, consider Moore’s Law, which describes how "the number of transistors that can be placed inexpensively on an integrated circuit doubles approximately every two years." If something doubles every two years, at what rate does it increase per month, on average? If you know the rule of 72, you’ll instantly know that the monthly growth rate is about 3%. You get the answer by dividing 72 by 24 (the number of months).

Computer scientists are usually very familiar with powers of two. It’s often convenient to take advantage of the fact that 2^10 is about 1,000. That means that when something increases by a factor of 1,000, it has doubled about 10 times. By extension, and with a little more error, an increase of a million corresponds to 20 doublings, and a billion is 30 doublings (log base two of a billion is actually 29.897, so the error isn’t too wild). You can use this to ballpark the number of doublings in a process really easily, and go directly from that to a growth rate using the rule of 72.

For example, the bottom of this page tells us that there were about 16,000 internet domains on July 1st 1992, and 1.3M of them on July 1st 1997. Let’s think in thousands: that’s a jump from 16 to just over 1,000 in 5 years. To get from 1 to 16 is four doublings, so from 16 to 1,000 is six doublings (because 1,000 is ten doublings from 1). So the number of domains doubled 6 times in 5 years, or 6 times in 60 months, or once every 10 months (on average). If you want something to double in 10 months, the rule of 72 tells us we need a growth rate of 7.2% per month. To check: 16,000 * (1.072 ^ 60) = 1,037,067. That’s a damned good estimate (remember that we were shooting for 1M, not 1.3M) for five seconds of mental arithmetic! Note that the number of domains was growing much faster than Moore’s law (3% per month).

You can quickly get very good at doing these sorts of calculations. Here’s another easy example. This page shows the number of internet users growing from 16M in December 1995 to 2,072M in March of 2011. That’s just like the above example, but it’s 7 doublings in 15.25 years, or 183 months. That’s pretty close to a doubling every 24 months, which we know from above corresponds to 3% growth per month.

You can use facility with growth rates to have a good sense for interest rates in general. You can use it when building simple (exponential) models of product growth. E.g., suppose you’re launching a product and you reckon you’ll have 300K users in a year’s time. You want to map this out in a spreadsheet using a simple exponential model. What should the growth rate be? 300K is obviously not much more than 256 * 1,024, which is 18 doublings in 365 days, or a doubling roughly every 20 days. The rule of 72 gives 72/20 = ~3.5, so you need to grow 3.5% every day to hit your target. Is that reasonable? If it is, it means that when you hit 300K users, you’ll be signing up about 3.5% of that number, or 10,500 users per day. As you can see, familiarity with powers of two (i.e., estimating number of doublings) and with the rule of 72 can give you ballpark figures really easily. You can even use your new math powers to avoid looking stupid in front of VCs.

The math behind the rule of 72 is easy to extend to triplings (rule of 110), quadrupling (rule of 140), quintupling (rule of 160), etc.

Finally, you can use these rules of thumb to do super geeky party tricks. E.g., what’s the tenth root of two? Put another way, what interest rate do you need for something to double after ten periods? The rule of 72 tells you it’s 72/10 = 7.2%, so the tenth root of two will be about 1.072 (in fact 1.072 ^ 10 = 2.004). What’s the 20th root of 5? The rule of 160 tells you you need 160/20 = 8% growth each period, so 1.08 should be about right (the correct answer is ~1.0838).

As with all rules of thumb, it’s good to have a sense of when it’s most applicable. See the wikipedia page or this page for more detailed information. It’s also of course good to understand that it may not be suitable to model growth as an exponential at all.

How to asynchronously exchange a dictionary using Twisted deferreds

Friday, June 10th, 2011

Here’s a fun class that I can’t think of a good use for :-) But I like its simplicity and it’s another illustration of what I like to call asynchronous data structures.

Suppose you have a producer who’s building a dictionary and a consumer who wants to look things up in it. The producer is working at their own pace, making new dictionary entries in whatever work order that suits them, and the consumer is requesting dictionary items in whatever order they need them. The two orders are obviously extremely unlikely to be the same if the dictionary is of non-trivial size. How do you write an asynchronous server (or data structure) that sits between the producer and consumer?

Yes, it’s far fetched, perhaps, but here’s a simple asynchronous dictionary class that lets you do it gracefully:

from collections import defaultdict
from twisted.internet import defer

class DeferredDict(dict):
    def __init__(self, *args, **kwargs):
        self._deferreds = defaultdict(set)
        dict.__init__(self, *args, **kwargs)

    def __getitem__(self, item):
        try:
            return defer.succeed(dict.__getitem__(self, item))
        except KeyError:
            d = defer.Deferred()
            self._deferreds[item].add(d)
            return d

    def __setitem__(self, item, value):
        if item in self._deferreds:
            for d in self._deferreds[item]:
                d.callback(value)
            del self._deferreds[item]
        dict.__setitem__(self, item, value)
 

When a consumer tries to get an element from the dictionary, they always get a deferred. The deferred will fire with the value from the dictionary when (if) it becomes available. Of course if the value is already known, they get it in a deferred that has already fired (via succeed). When the producer puts an element into the dictionary, any consumer deferreds that were waiting on that element’s value are given the value.

Note that a __delitem__ isn’t needed, we just inherit that from dict. If a non-existent item is deleted, you get the normal dictionary behavior (a KeyError). If the item does exist, that means the list of waiting deferreds on that item is empty (the fact the item exists means any waiting deferreds for the item have all been fired and that that item in the self._deferreds dictionary was deleted), so we can just let the dictionary class delete the item, as usual.

Graceful shutdown of a Twisted service with outstanding deferreds

Friday, June 10th, 2011

I’ve been spending a bit of time thinking again about queues and services. I wrote a Twisted class in 2009 to maintain a resizable dispatch queue (code in Launchpad, description on the Twisted mailing list). For this post I’ve pulled out (and simplified slightly) one of its helper classes, a DeferredPool.

This simple class maintains a set of deferreds and gives you a mechanism to get a deferred that will fire when (if!) the size of the set ever drops to zero. This is useful because it can be used to gracefully shut down a service that has a bunch of outstanding requests in flight. For each incoming request (that’s handled via a deferred), you add the deferred to the pool. When a signal arrives to tell the service to stop, you stop taking new requests and ask the pool for a deferred that will fire when all the outstanding deferreds are done, then you exit. This can all be done elegantly in Twisted, the last part by having the stopService method return the deferred you get back from the pool (perhaps after you add more cleanup callbacks to it).

Here’s the code:

from twisted.internet import defer

class DeferredPool(object):
    """Maintains a pool of not-yet-fired deferreds and gives a mechanism to
    request a deferred that fires when the pool size goes to zero."
""

    def __init__(self):
        self._pool = set()
        self._waiting = []

    def _fired(self, result, d):
        """Callback/errback each pooled deferred runs when it fires. The
        deferred first removes itself from the pool. If the pool is then
        empty, fire all the waiting deferreds (which were returned by
        notifyWhenEmpty)."
""
        self._pool.remove(d)
        if not self._pool:
            waiting, self._waiting = self._waiting, []
            for waiter in waiting:
                waiter.callback(None)
        return result

    def add(self, d):
        """Add a deferred to the pool."""
        d.addBoth(self._fired, d)
        self._pool.add(d)

    def notifyWhenEmpty(self, testImmediately=True):
        """Return a deferred that fires (with None) when the pool empties.
        If testImmediately is True and the pool is empty, return an already
        fired deferred (via succeed)."
""
        if testImmediately and not self._pool:
            return defer.succeed(None)
        else:
            d = defer.Deferred()
            self._waiting.append(d)
            return d
 

As usual I’m posting this example because I find Twisted’s deferreds so elegant. Here are a few comments on the above that might help you understand deferreds better.

A frequent pattern when creating and managing deferreds is that you can add callbacks and errbacks to them yourself to transparently do some housekeeping when they fire. In this case, for each deferred passed to add, I’m adding a callback and an errback that will run self._fired when the deferred fires. The first thing that method does is take the deferred out of the pool of outstanding deferreds. So the deferred itself cleans up the pool. It does that transparently, by which I mean that the call/errback function (self._fired) always returns whatever result it was passed. It’s on both the callback and errback chains of the deferred and has no effect on the result. The deferred may already have call/errbacks on it when it is passed to add, and it may have them added to it after add is done. Whoever created and is otherwise using the deferred will be none the wiser and is in no way affected.

When a deferred in the pool fires, it also checks to see if the pool size is zero and if there are any deferreds waiting to be informed of that. If so, it fires all the waiting deferreds and empties the list of waiting deferreds. This doesn’t mean the action is necessarily over. More deferreds can be added, more waiters can be added, etc. The pool size can go to zero again and if there are no waiters are waiting, no big deal, etc.

It’s easy to add functionality to e.g., record what time deferreds were added, provide stats, allow outstanding deferreds to be cancelled, add notifications when high/low water marks are reached, etc. But that’s enough for now. Feel free to ask questions below.