Note that how to make an HTTP request with Twisted is already documented. But, unless you're already familiar with Twisted, my guess is that extending the example code to downloading a large number of web pages with a limit on the number of simultaneous requests is not easy. Below, you'll find example code for exactly that. Below the code is a walk-through that will hopefully help you understand the details.
from pprint import pformat
from twisted.internet import reactor
import twisted.internet.defer
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
class PrinterClient(Protocol):
def __init__(self, whenFinished):
self.whenFinished = whenFinished
def dataReceived(self, bytes):
print '##### Received #####\n%s' % (bytes,)
def connectionLost(self, reason):
print 'Finished:', reason.getErrorMessage()
self.whenFinished.callback(None)
def handleResponse(r):
print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
for k, v in r.headers.getAllRawHeaders():
print "%s: %s" % (k, '\n '.join(v))
whenFinished = twisted.internet.defer.Deferred()
r.deliverBody(PrinterClient(whenFinished))
return whenFinished
def handleError(reason):
reason.printTraceback()
reactor.stop()
def getPage(url):
print "Requesting %s" % (url,)
d = Agent(reactor).request('GET', url, Headers({'User-Agent': ['twisted']}), None)
d.addCallbacks(handleResponse, handleError)
return d
semaphore = twisted.internet.defer.DeferredSemaphore(2)
dl = list()
dl.append(semaphore.run(getPage, 'http://google.com'))
dl.append(semaphore.run(getPage, 'http://cnn.com'))
dl.append(semaphore.run(getPage, 'http://nytimes.com'))
dl = twisted.internet.defer.DeferredList(dl)
dl.addCallbacks(lambda x: reactor.stop(), handleError)
reactor.run()
getPage
handles an entire single HTTP request. Agent(reactor).request()
creates an Agent
and sends the HTTP request. request()
returns a deferred which is fired when the headers are retrieved. The addCallbacks
line specifies that handleResponse
is called upon successful header retrieval and handleError
is called if there is an error in retrieving the headers.
handleResponse
is given a Response
object which contains the HTTP header and includes a method, deliverBody
, to specify a Protocol
to handle delivery of the HTTP body. A Protocol
is used for body delivery because it may come in chunks and an error may occur in the middle of delivery (e.g. someone pulls your network plug). PrinterClient
is a very simple Protocol
which (1) prints received data, (2) logs the reason for termination (if not twisted.web.client.ResponseDone
, there was an error), and (3) fires a deferred whenFinished
.
The trickiest part of this code is following the Deferred
chain, which is essential to understanding how we limit the maximum number of outstanding requests. A key point to understand about Deferred
s is that, if a callback returns a Deferred
, the parent Deferred
waits for the child Deferred
to fire before handing a value to the next Deferred
in the chain. See documentation on Chaining Deferreds. Because of this, each semaphore.run
waits for the PrinterClient
protocol to complete before releasing its semaphore. The DeferredSemaphore
is basically a Deferred
-aware semaphore. It's only argument is the number of tokens it allows to be "checked-out" simultaneously. When we make the nytimes.com semaphore.run
call, the semaphore doesn't call getPage
until one of the other requests has completed.
The DeferredList
is used to clean-up after all requests have completed. Under normal circumstances, we just want to stop the reactor so our process will exit. But, if there is an error, we want to see what happened, hence we use handleError
in that case.
Update 9/13/11: Minor code formatting change.
Thanks for this blog post. I'm just starting out with Twisted and working through this introduced me to many of the framework's ideas. I wonder if you could answer me on question, why is the whenFinished Deferred, together with self.whenFinished.callback(None) necessary for the PrinterClient to finish? I know that it is in the docs, but I don't understand why... When I remove it the script seems to hang and doesn't finish.
ReplyDeleteIIRC, the point of the whenFinished Deferreds is to stop the reactor when all retrievals have completed. Note the callback chain: getPage, handleResponse, whenFinished. The Deferred returned by getPage is added to the DeferredList and doesn't complete until the corresponding whenFinished fires. Hence, reactor.stop is only called after all whenFinished Deferreds have fired.
ReplyDeleteThis code is very useful for me. If I want to add timeout and retry for each request? I have googled, but no good solutions...
ReplyDeleteIn getPage, should be ['twisted'], rather than 'twisted'].
ReplyDeleteThanks for the fix, Alex!
ReplyDeleteThanks for this piece of code. I understand what is happening there but one part is not really clear to me. May be you can explain this line:
ReplyDeleter.deliverBody(PrinterClient(whenFinished))
How exactly does that work?
Thanks
PrinterClient(whenFinished) is a Protocol. r.deliverBody will make calls on the Protocol object to deliver data (deliverData) and announce that it is finished (connectionLost).
ReplyDeleteThanks for the answer.
ReplyDeleteWhy is it necessary then to return 'whenFinished' in 'handleResponse'?