Threading in Python

| categories: python, web development | View Comments

Python provides a high level threading library that makes threading virtually painless. Generally, you should only use threads if the following is true:

  • Sharing memory between threads is not an issue.
  • You are not looking for the best optimized performance since threads share memory within a process.
  • You want to be able to share objects between threads.
  • You take precautions that threads are not working on the same object at the same time.

Anybody with any experience with threads know that you have to have a way to synchronize the tasks between the threads. Python once again makes this easy by providing the Queue data structure. The following will be an in depth look at one of the multi-threaded paradigms you can take advantage of using Python's built-in threading and Queue library.

Producer and Consumer Paradigm

A common design pattern when using threading in Python (or any other language) is to use the Producer and Consumer model of synchronizing threaded tasks. This model takes advantage of the Queue data structure.

The basic idea is this:

  • Raw data is put in to what is labeled as the in_queue
  • The threaded Producer classes sees that there is data in the in_queue that needs to be processed and begins grabbing data from the queue and performing work on it.
  • Once the work is done the threaded Producer classes will put the formatted data in to what is labeled as the out_queue.
  • The threaded Consumer classes see that data has been put in to the out_queue and begins grabbing the data from the queue and performing work on it.
  • The threaded Consumer classes are what give you your final result, whatever you chose that to be.

Make sense yet? If not, no worries, we will go through each class and how to structure the main procedure.

Producer

class Producer(threading.Thread):
def __init__(self, in_queue, out_queue):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue

def run(self):
while True:
item = self.in_queue.get()

result = 'You should be doing work.'
self.out_queue.put(result)

self.in_queue.task_done()

Here we can see that we are passing in both queues to the Producer as it will need access to both of these. We then define the run function which will do the actual work on the data. The Producer first grabs the data by utilizing the queue get() function. We then do what ever awesome manipulation of the data we need. Once we are done with the data we can then utilize the queue's put() function to put the data in the out_queue for the Consumer class to pick up. Finally and most importantly we signal to the in_queue that we are done with the data with the queue's task_done() function. It is important signal to the queue that the Producer class is done as the threaded class will either grab more data from the in_queue if there is any, or simply block if not.

Next we will take a look at Consumer class, which, will be very similar.

Consumer

class Consumer(threading.Thread):

def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue

def run(self):
while True:
item = self.out_queue.get()

result = 'This is your awesome output.'

self.out_queue.task_done()

The Consumer class is very similar to the Producer class we saw before. All that the Consumer class needs access to is the out_queue. Instead of putting the result in another queue we would now put the result were we finally want it, like a database, stdout or pass it along to another program. Once again we need to signal to the queue that we are done.

Now we will take a look at how to instantiate the threads in our main function.

May the threads be with you

if __name__ == '__main__':

item_list = ['item1', 'item2', 'item3']
in_queue = Queue.Queue()
out_queue = Queue.Queue()

for i in xrange(len(item_list)):
t = Producer(in_queue, out_queue)
t.daemon = True
t.start()

for item in item_list:
in_queue.put(item)

for i in xrange(len(item_list)):
t = Consumer(out_queue)
t.daemon = True
t.start()

in_queue.join()
out_queue.join()

First we create our generic list with items we want to process and the two queues we will be using. We then instantiate our Producer threads. Note that these threads will block until they detect that there is data in the in_queue. We then place the raw data in to the in_queue and the Producer threaded class will at once begin doing work. While the Producer classes are doing work we begin to instantiate our Consumer classes. The threaded Consumer classes will block until they detects data to be consumed in the out_queue. At the we end call each of the queue's join() method, which, in effect blocks the main program until all of the items in both queues have been gotten and processed.

Putting It All Together

Below is all the code that we covered put together in one module:

import threading
import Queue


class Producer(threading.Thread):
def __init__(self, in_queue, out_queue):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue

def run(self):
while True:
item = self.in_queue.get()

result = 'You should be doing work.'
self.out_queue.put(result)

self.in_queue.task_done()

class Consumer(threading.Thread):
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue

def run(self):
while True:
item = self.out_queue.get()

result = 'This is your awesome output.'

self.out_queue.task_done()

if __name__ == '__main__':

item_list = ['item1', 'item2', 'item3']
in_queue = Queue.Queue()
out_queue = Queue.Queue()

for i in xrange(len(item_list)):
t = Producer(in_queue, out_queue)
t.daemon = True
t.start()

for item in item_list:
in_queue.put(item)


for i in xrange(len(item_list)):
t = Consumer(out_queue)
t.daemon = True
t.start()

in_queue.join()
out_queue.join()

So when is it useful?

Threading can be used in a variety of numerous applications. To give give you a more concrete real world example I recently had to figure out a way to simulate millions of database queries per minute happening concurrently to test a system I built. The way I accomplished this is by using threads and code very similar to what we have covered so far. It was very illuminating to see how your system handles millions of concurrent database queries and threading in Python made it a breeze to do.

Resources

blog comments powered by Disqus