Saturday, April 2, 2011

Twisted: Asynchronous HTTP Request

Note that how to make an HTTP request with Twisted is already documented. But, unless you're already familiar with Twisted, my guess is that extending the example code to downloading a large number of web pages with a limit on the number of simultaneous requests is not easy. Below, you'll find example code for exactly that. Below the code is a walk-through that will hopefully help you understand the details.


from pprint import pformat

from twisted.internet import reactor
import twisted.internet.defer
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

class PrinterClient(Protocol):
    def __init__(self, whenFinished):
        self.whenFinished = whenFinished

    def dataReceived(self, bytes):
        print '##### Received #####\n%s' % (bytes,)

    def connectionLost(self, reason):
        print 'Finished:', reason.getErrorMessage()
        self.whenFinished.callback(None)

def handleResponse(r):
    print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
    for k, v in r.headers.getAllRawHeaders():
        print "%s: %s" % (k, '\n  '.join(v))
    whenFinished = twisted.internet.defer.Deferred()
    r.deliverBody(PrinterClient(whenFinished))
    return whenFinished

def handleError(reason):
    reason.printTraceback()
    reactor.stop()

def getPage(url):
    print "Requesting %s" % (url,)
    d = Agent(reactor).request('GET', url, Headers({'User-Agent': ['twisted']}), None)
    d.addCallbacks(handleResponse, handleError)
    return d

semaphore = twisted.internet.defer.DeferredSemaphore(2)
dl = list()
dl.append(semaphore.run(getPage, 'http://google.com'))
dl.append(semaphore.run(getPage, 'http://cnn.com'))
dl.append(semaphore.run(getPage, 'http://nytimes.com'))
dl = twisted.internet.defer.DeferredList(dl)
dl.addCallbacks(lambda x: reactor.stop(), handleError)

reactor.run()

getPage handles an entire single HTTP request. Agent(reactor).request() creates an Agent and sends the HTTP request. request() returns a deferred which is fired when the headers are retrieved. The addCallbacks line specifies that handleResponse is called upon successful header retrieval and handleError is called if there is an error in retrieving the headers.

handleResponse is given a Response object which contains the HTTP header and includes a method, deliverBody, to specify a Protocol to handle delivery of the HTTP body. A Protocol is used for body delivery because it may come in chunks and an error may occur in the middle of delivery (e.g. someone pulls your network plug). PrinterClient is a very simple Protocol which (1) prints received data, (2) logs the reason for termination (if not twisted.web.client.ResponseDone, there was an error), and (3) fires a deferred whenFinished.

The trickiest part of this code is following the Deferred chain, which is essential to understanding how we limit the maximum number of outstanding requests. A key point to understand about Deferreds is that, if a callback returns a Deferred, the parent Deferred waits for the child Deferred to fire before handing a value to the next Deferred in the chain. See documentation on Chaining Deferreds. Because of this, each semaphore.run waits for the PrinterClient protocol to complete before releasing its semaphore. The DeferredSemaphore is basically a Deferred-aware semaphore. It's only argument is the number of tokens it allows to be "checked-out" simultaneously. When we make the nytimes.com semaphore.run call, the semaphore doesn't call getPage until one of the other requests has completed.

The DeferredList is used to clean-up after all requests have completed. Under normal circumstances, we just want to stop the reactor so our process will exit. But, if there is an error, we want to see what happened, hence we use handleError in that case.

Update 9/13/11: Minor code formatting change.

8 comments:

  1. Thanks for this blog post. I'm just starting out with Twisted and working through this introduced me to many of the framework's ideas. I wonder if you could answer me on question, why is the whenFinished Deferred, together with self.whenFinished.callback(None) necessary for the PrinterClient to finish? I know that it is in the docs, but I don't understand why... When I remove it the script seems to hang and doesn't finish.

    ReplyDelete
  2. IIRC, the point of the whenFinished Deferreds is to stop the reactor when all retrievals have completed. Note the callback chain: getPage, handleResponse, whenFinished. The Deferred returned by getPage is added to the DeferredList and doesn't complete until the corresponding whenFinished fires. Hence, reactor.stop is only called after all whenFinished Deferreds have fired.

    ReplyDelete
  3. This code is very useful for me. If I want to add timeout and retry for each request? I have googled, but no good solutions...

    ReplyDelete
  4. In getPage, should be ['twisted'], rather than 'twisted'].

    ReplyDelete
  5. Thanks for this piece of code. I understand what is happening there but one part is not really clear to me. May be you can explain this line:

    r.deliverBody(PrinterClient(whenFinished))

    How exactly does that work?

    Thanks

    ReplyDelete
  6. PrinterClient(whenFinished) is a Protocol. r.deliverBody will make calls on the Protocol object to deliver data (deliverData) and announce that it is finished (connectionLost).

    ReplyDelete
  7. Thanks for the answer.
    Why is it necessary then to return 'whenFinished' in 'handleResponse'?

    ReplyDelete