Archive for July 12th, 2011

txdpce: a Twisted class for deferred parallel command execution

Tuesday, July 12th, 2011

I just uploaded a simple Twisted Python class, txdpce, to Launchpad. It’s designed for situations where you have multiple ways of obtaining a function result and you want to try them all in parallel and return the result from the fastest function. A typical case is that you can either compute a result via a network call or try to get it out of a cache (perhaps also via a network call, to memcached). You might also be able to compute it locally, etc.

Things can be more complicated than provided for here, of course. E.g., you might like to cache the result of a local call (if it finishes first or if the cache lookup fails). The txdpce class is supposed to be a simple demonstration. I wrote it for a bit of fun this morning and also because it’s yet another nice example of how you can click together the building blocks of Twisted to form increasingly sophisticated classes.

Here’s the class. You’ll find a test suite at the Launchpad site. You can download the code using bzr via bzr branch lp:txdpce.

from twisted.internet import defer
from twisted.python import failure

class ParallelCommandException(Exception):
    pass

class DeferredParallelCommandExecutor(object):
    """
    Provides a mechanism for the execution of a command by multiple methods,
    returning the result from the one that finishes first.
    "
""

    def __init__(self):
        self._functions = []

    def registerFunction(self, func):
        """
        Add func to the list of functions that will be run when execute is
        called.

        @param func: A callable that should be run when execute is called.
        """
        self._functions.append(func)

    def execute(self, *args, **kwargs):
        """
        Run all the functions in self._functions on the given arguments and
        keyword arguments.

        @param args: Arguments to pass to the registered functions.

        @param kwargs: Keyword arguments to pass to the registered functions.

        @raise RuntimeError: if no execution functions have been registered.

        @return: A C{Deferred} that fires when the first of the functions
        finishes.
        """
        if not self._functions:
            raise RuntimeError('No execution functions have been registered.')

        deferreds = [defer.maybeDeferred(func, *args, **kwargs)
                     for func in self._functions]
        d = defer.DeferredList(deferreds, fireOnOneCallback=1, consumeErrors=1)
        d.addCallback(self._done, deferreds)
        return d

    def _done(self, deferredListResult, deferreds):
        """
        Process the result of the C{DeferredList} execution of all the
        functions in self._functions. If result is a tuple, it's the result
        of a successful function, in which case we cancel all the other
        deferreds (none of which has finished yet) and give back the
        result.  Otherwise, all the function calls must have failed (since
        we passed fireOnOneCallback=True to the C{DeferredList} and we
        return a L{ParallelCommandException} containing the failures.

        @param deferredListResult: The result of a C{DeferredList} returned
        by self.execute firing.

        @param deferreds: A list of deferreds for other functions that were
        trying to compute the result.

        @return: Either the result of the first function to successfully
        compute the result or a C{failure.Failure} containing a
        L{ParallelCommandException} with a list of the failures from all
        functions that tried to get the result.
        """
        if isinstance(deferredListResult, tuple):
            # A tuple result means the DeferredList fired with a successful
            # result.  Cancel all other deferreds and return the result.
            result, index = deferredListResult
            for i in range(len(self._functions)):
                if i != index:
                    deferreds[i].cancel()
            return result
        else:
            # All the deferreds failed. Return a list of all the failures.
            failures = [fail for (result, fail) in deferredListResult]
            return failure.Failure(ParallelCommandException(failures))