These days I often find myself writing code to talk to services that are periodically briefly unavailable. An error of some kind occurs and the correct (and documented) action to take is just to retry the original call a little later. Examples include using Amazon’s S3 service and the Twitter API. In both of these services, transient failures happen fairly frequently.
So I wrote the Twisted class below to retry calls, and tried to make it fairly general. I’d be happy to hear comments on it, because it’s pretty simple and if it can be made bullet proof I imagine others will use it too.
In case you’re not familiar with Twisted and it’s not clear, the call retrying in the below is scheduled by the Twisted reactor. This all asynchronous event-based code that will not block (assuming the function you pass in also does not).
First off, here’s the class that handles the calling:
from twisted.python import log, failure
"""Calls a function repeatedly, passing it args and kw args. Failures
are passed to a user-supplied failure testing function. If the failure
is ignored, the function is called again after a delay whose duration
is obtained from a user-supplied iterator. The start method (below)
returns a deferred that fires with the eventual non-error result of
calling the supplied function, or fires its errback if no successful
result can be obtained before the delay backoff iterator raises
def __init__(self, f, *args, **kw):
self._f = f
self._args = args
self._kw = kw
def _err(self, fail):
if self.failure is None:
self.failure = fail
fail = self._failureTester(fail)
if isinstance(fail, failure.Failure):
log.msg(‘RetryingCall: Ignoring %r’ % (fail,))
delay = self._backoffIterator.next()
log.msg(‘StopIteration in RetryingCall: ran out of attempts.’)
d = task.deferLater(reactor, delay,
self._f, *self._args, **self._kw)
def start(self, backoffIterator=None, failureTester=None):
self._backoffIterator = iter(backoffIterator or simpleBackoffIterator())
self._failureTester = failureTester or (lambda _: None)
self._deferred = defer.Deferred()
self.failure = None
You call the constructor with your function and the args it should be called with. Then you call start() to get back a deferred that will eventually fire with the result of the call, or an error. BTW, I called it “start” to mirror twisted.internet.task.LoopingCall.
There’s a helper function for producing successive inter-call delays:
from functools import partial
def simpleBackoffIterator(maxResults=10, maxDelay=120.0, now=True,
assert maxResults > 0
remaining = maxResults
delay = initDelay
incFunc = incFunc or partial(mul, 2.0)
remaining -= 1
while remaining > 0:
yield (delay if delay < maxDelay else maxDelay)
delay = incFunc(delay)
remaining -= 1
By default this will generate the sequence of inter-call delays 0.0, 0.01, 0.02, 0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56 and it should be easy to see how you could write your own. Or you can just supply a list, etc. When the backoff iterator finishes, the RetryingCall class gives up on trying to get a non-error result from the function. In that case errback is called on the deferred that start() returns, with the failure from the first call.
You get to specify a function for testing failures. If it ever raises or returns a failure, the start() deferred’s errback is called. The failure tester can just ignore whatever failures should be considered transient.
So, for example, if you were calling S3 and wanted to ignore 504 errors, you could supply a failureTester arg like this:
def test(self, failure):
if int(failure.value.status) != http.GATEWAY_TIMEOUT:
As another example, while using the Twitter API you might want to allow a range of HTTP errors and also exactly one 404 error, seeing as a 404 might be an error on the part of Twitter (I don’t mean to suggest that actually happens). It’s probably definitive – but, why not try it once again just to be more sure? So, pass RetryingCall a failureTester that’s an instance of a class like this:
okErrs = (http.INTERNAL_SERVER_ERROR,
self.seen404 = False
def __call__(self, failure):
status = int(failure.value.status)
if status == http.NOT_FOUND:
self.seen404 = True
elif status not in self.okErrs:
Changing existing code to use RetryingCall is pretty trivial. Take something like this
d = client.getPage(
and change it to look like this:
r = RetryingCall(client.getPage,
d = r.start(failureTester=TwitterFailureTester())
I wrote this about 10 days ago and posted it to the Twisted mailing list. No-one replied to say how horrible the code is or that it shoud be done another way, which is a pretty good sign. The above includes an improvement suggested by Tim Allen, and is slightly more useful than the code I posted originally (see the thread on the Twisted list for details).
All code above is available to you under CC0 1.0 Universal – Public Domain Dedication.