Using requests with Threading

New in version 0.4.0.

The toolbelt provides a simple API for using requests with threading.

A requests Session is documented as threadsafe but there are still a couple corner cases where it isn’t perfectly threadsafe. The best way to use a Session is to use one per thread.

The implementation provided by the toolbelt is naïve. This means that we use one session per thread and we make no effort to synchronize attributes (e.g., authentication, cookies, etc.). It also means that we make no attempt to direct a request to a session that has already handled a request to the same domain. In other words, if you’re making requests to multiple domains, the toolbelt’s Pool will not try to send requests to the same domain to the same thread.

This module provides three classes:

In 98% of the situations you’ll want to just use a Pool and you’ll treat a ThreadResponse as if it were a regular requests.Response.

Here’s an example:

# This example assumes Python 3
import queue
from requests_toolbelt.threaded import pool

jobs = queue.Queue()
urls = [
    # My list of URLs to get
]

for url in urls:
    queue.put({'method': 'GET', 'url': url})

p = pool.Pool(job_queue=q)
p.join_all()

for response in p.responses():
    print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
                                          response.status_code))

This is clearly a bit underwhelming. This is why there’s a short-cut class method to create a Pool from a list of URLs.

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

for response in p.responses():
    print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
                                          response.status_code))

If one of the URLs in your list throws an exception, it will be accessible from the exceptions() generator.

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

for exc in p.exceptions():
    print('GET {0}. Raised {1}.'.format(exc.request_kwargs['url'],
                                        exc.message))

If instead, you want to retry the exceptions that have been raised you can do the following:

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

new_pool = pool.Pool.from_exceptions(p.exceptions())
new_pool.join_all()

Not all requests are advisable to retry without checking if they should be retried. You would normally check if you want to retry it.

The Pool object takes 4 other keyword arguments:

  • initializer

    This is a callback that will initialize things on every session created. The callback must return the session.

  • auth_generator

    This is a callback that is called after the initializer callback has modified the session. This callback must also return the session.

  • num_processes

    By passing a positive integer that indicates how many threads to use. It is None by default, and will use the result of multiproccessing.cpu_count().

  • session

    You can pass an alternative constructor or any callable that returns a requests.Sesssion like object. It will not be passed any arguments because a requests.Session does not accept any arguments.

class requests_toolbelt.threaded.pool.Pool(job_queue, initializer=None, auth_generator=None, num_processes=None, session=<class 'requests.sessions.Session'>)

Pool that manages the threads containing sessions.

Parameters:
  • queue (queue.Queue) – The queue you’re expected to use to which you should add items.
  • initializer (collections.Callable) – Function used to initialize an instance of session.
  • auth_generator (collections.Callable) – Function used to generate new auth credentials for the session.
  • num_threads (int) – Number of threads to create.
  • session (requests.Session) –
exceptions()

Iterate over all the exceptions in the pool.

Returns:Generator of ThreadException
classmethod from_exceptions(exceptions, **kwargs)

Create a Pool from an ThreadExceptions.

Provided an iterable that provides ThreadException objects, this classmethod will generate a new pool to retry the requests that caused the exceptions.

Parameters:
  • exceptions (iterable) – Iterable that returns ThreadException
  • kwargs – Keyword arguments passed to the Pool initializer.
Returns:

An initialized Pool object.

Return type:

Pool

classmethod from_urls(urls, request_kwargs=None, **kwargs)

Create a Pool from an iterable of URLs.

Parameters:
  • urls (iterable) – Iterable that returns URLs with which we create a pool.
  • request_kwargs (dict) – Dictionary of other keyword arguments to provide to the request method.
  • kwargs – Keyword arguments passed to the Pool initializer.
Returns:

An initialized Pool object.

Return type:

Pool

get_exception()

Get an exception from the pool.

Return type:ThreadException
get_response()

Get a response from the pool.

Return type:ThreadResponse
join_all()

Join all the threads to the master thread.

responses()

Iterate over all the responses in the pool.

Returns:Generator of ThreadResponse
class requests_toolbelt.threaded.pool.ThreadResponse(request_kwargs, response)

A wrapper around a requests Response object.

This will proxy most attribute access actions to the Response object. For example, if you wanted the parsed JSON from the response, you might do:

thread_response = pool.get_response()
json = thread_response.json()
class requests_toolbelt.threaded.pool.ThreadException(request_kwargs, exception)

A wrapper around an exception raised during a request.

This will proxy most attribute access actions to the exception object. For example, if you wanted the message from the exception, you might do:

thread_exc = pool.get_exception()
msg = thread_exc.message