Archive for November, 2009

Twisted code for retrying function calls

Thursday, November 12th, 2009

These days I often find myself writing code to talk to services that are periodically briefly unavailable. An error of some kind occurs and the correct (and documented) action to take is just to retry the original call a little later. Examples include using Amazon’s S3 service and the Twitter API. In both of these services, transient failures happen fairly frequently.

So I wrote the Twisted class below to retry calls, and tried to make it fairly general. I’d be happy to hear comments on it, because it’s pretty simple and if it can be made bullet proof I imagine others will use it too.

In case you’re not familiar with Twisted and it’s not clear, the call retrying in the below is scheduled by the Twisted reactor. This all asynchronous event-based code that will not block (assuming the function you pass in also does not).

First off, here’s the class that handles the calling:

from twisted.internet import reactor, defer, task
from twisted.python import log, failure

class RetryingCall(object):
    """Calls a function repeatedly, passing it args and kw args. Failures
    are passed to a user-supplied failure testing function. If the failure
    is ignored, the function is called again after a delay whose duration
    is obtained from a user-supplied iterator. The start method (below)
    returns a deferred that fires with the eventual non-error result of
    calling the supplied function, or fires its errback if no successful
    result can be obtained before the delay backoff iterator raises
    def __init__(self, f, *args, **kw):
        self._f = f
        self._args = args
        self._kw = kw
    def _err(self, fail):
        if self.failure is None:
            self.failure = fail
            fail = self._failureTester(fail)
            if isinstance(fail, failure.Failure):
                log.msg(‘RetryingCall: Ignoring %r’ % (fail,))

    def _call(self):
            delay =
        except StopIteration:
            log.msg(‘StopIteration in RetryingCall: ran out of attempts.’)
            d = task.deferLater(reactor, delay,
                                self._f, *self._args, **self._kw)
            d.addCallbacks(self._deferred.callback, self._err)

    def start(self, backoffIterator=None, failureTester=None):
        self._backoffIterator = iter(backoffIterator or simpleBackoffIterator())
        self._failureTester = failureTester or (lambda _: None)
        self._deferred = defer.Deferred()
        self.failure = None
        return self._deferred

You call the constructor with your function and the args it should be called with. Then you call start() to get back a deferred that will eventually fire with the result of the call, or an error. BTW, I called it “start” to mirror twisted.internet.task.LoopingCall.

There’s a helper function for producing successive inter-call delays:

from operator import mul
from functools import partial

def simpleBackoffIterator(maxResults=10, maxDelay=120.0, now=True,
                          initDelay=0.01, incFunc=None):
    assert maxResults > 0
    remaining = maxResults
    delay = initDelay
    incFunc = incFunc or partial(mul, 2.0)
    if now:
        yield 0.0
        remaining -= 1
    while remaining > 0:
        yield (delay if delay < maxDelay else maxDelay)
        delay = incFunc(delay)
        remaining -= 1

By default this will generate the sequence of inter-call delays 0.0, 0.01, 0.02, 0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56 and it should be easy to see how you could write your own. Or you can just supply a list, etc. When the backoff iterator finishes, the RetryingCall class gives up on trying to get a non-error result from the function. In that case errback is called on the deferred that start() returns, with the failure from the first call.

You get to specify a function for testing failures. If it ever raises or returns a failure, the start() deferred’s errback is called. The failure tester can just ignore whatever failures should be considered transient.

So, for example, if you were calling S3 and wanted to ignore 504 errors, you could supply a failureTester arg like this:

    from twisted.web import error, http

    def test(self, failure):
        if int(failure.value.status) != http.GATEWAY_TIMEOUT:
            return failure

As another example, while using the Twitter API you might want to allow a range of HTTP errors and also exactly one 404 error, seeing as a 404 might be an error on the part of Twitter (I don’t mean to suggest that actually happens). It’s probably definitive – but, why not try it once again just to be more sure? So, pass RetryingCall a failureTester that’s an instance of a class like this:

class TwitterFailureTester(object):
    okErrs = (http.INTERNAL_SERVER_ERROR,

    def __init__(self):
        self.seen404 = False

    def __call__(self, failure):
        status = int(failure.value.status)
        if status == http.NOT_FOUND:
            if self.seen404:
                return failure
                self.seen404 = True
        elif status not in self.okErrs:
            return failure

Changing existing code to use RetryingCall is pretty trivial. Take something like this

from twisted.web import client

def getUserByScreenname(screenname):
    d = client.getPage(
    return d

and change it to look like this:

def getUserByScreenname(screenname):
    r = RetryingCall(client.getPage,
    d = r.start(failureTester=TwitterFailureTester())
    return d

I wrote this about 10 days ago and posted it to the Twisted mailing list. No-one replied to say how horrible the code is or that it shoud be done another way, which is a pretty good sign. The above includes an improvement suggested by Tim Allen, and is slightly more useful than the code I posted originally (see the thread on the Twisted list for details).