Simple Python: a job queue with threading

Every so often you need to use a queue to manage operations in an application. Python makes this very simple. Python also, as I’ve written about before, makes threading very easy to work with. So in this quick program I’ll describe via comments, how to make a simple queue where each job is processed by a thread. Integrating this code to read jobs from a mysql database would be trivial as well; simply replace the “jobs = [..." code with a database call to a row select query.

#!/usr/bin/env python
## DATE: 2011-01-20
## FILE: queue.py
## AUTHOR: Matt Reid
## WEBSITE: http://themattreid.com
from Queue import *
from threading import Thread, Lock

'''this function will process the items in the queue, in serial'''
def processor():
    if queue.empty() == True:
        print "the Queue is empty!"
        sys.exit(1)
    try:
        job = queue.get()
        print "I'm operating on job item: %s"%(job)
        queue.task_done()
    except:
        print "Failed to operate on job"

'''set variables'''
queue = Queue()
threads = 4

'''a list of job items. you would want this to be more advanced,
like reading from a file or database'''
jobs = [ "job1", "job2", "job3" ]

”’iterate over jobs and put each into the queue in sequence”’
for job in jobs:
     print “inserting job into the queue: %s”%(job)
     queue.put(job)

”’start some threads, each one will process one job from the queue”’
for i in range(threads):
     th = Thread(target=processor)
     th.setDaemon(True)
     th.start()

”’wait until all jobs are processed before quitting”’
queue.join()
Read More

Easy Python: multi-threading MySQL queries

There are many times when writing an application that single threaded database operations are simply too slow. In these cases it’s a matter of course that you’ll use multi-threading or forking to spawn secondary processes to handle the database actions. In this simple example for Python multi-threading you’ll see the how simple it is to improve the performance of your python app.

#!/usr/bin/python
## DATE: 2010-08-30
## AUTHOR: Matt Reid
## WEBSITE: http://themattreid.com
## LICENSE: BSD http://www.opensource.org/licenses/bsd-license.php
## Copyright 2010-present Matt Reid

from __future__ import division
from socket import gethostname;
import threading
import sys
import os
import MySQLdb

class threader(threading.Thread):
    def __init__(self,method):
        threading.Thread.__init__(self)
        self.tx =
        self.method = method
    def run(self):
        run_insert()

def run_insert():
    sql = "INSERT INTO table (`id`,`A`,`B`,`C`) VALUES (NULL,'0','0','0');")
        try:
            cursor.execute(sql)
            db.commit()
        except:
            print "insert failed"

def init_thread(): backgrounds = []
    for db in connections:
       logger("Spawning thread: %s"%(db),"d")
       quant = tx / THREADS
       background = threader(method,quant,db)        
       background.start()
       backgrounds.append(background)
    for background in backgrounds:
       background.join()

def main():
    try:
        init_thread()
    except:
        print "failed to initiate threads"

    sys.exit(0)

if __name__ == "__main__":
    mysql_host = "localhost" #default localhost
    mysql_pass = "pass" #default dbbench
    mysql_user = "user" #default dbbench
    mysql_port = 3306 #default 3306
    mysql_db = "schema" #default dbbench
    threads = 4 #must be INT not STR #create connection pool

    connections = []
    for thread in range(THREADS):
      try:
       connections.append(MySQLdb.connect(host=mysql_host, user=mysql_user, passwd=mysql_pass, db=mysql_db, port=mysql_port))
      except MySQLdb.Error, e:
       print "Error %d: %s"%(e.args[0], e.args[1])
       sys.exit (1)

    main()
    
Read More