
We have a rather large mailing list (in the millions) so i've re-wrote pickdist.py to help improve our sending performance.
I thought i'd share....
We generate our email queue first then run a job to sort the mail queue into subqueues by major ISP (gmail, aol, msn, yahoo etc) and one big catchall for everything else. This allows us to throttle each queue based on how fast email is being accepted.
Our problem was pickdist.py was taking days to complete the full mail queue generation. The pickdist.py process would just max out one cpu core for hours on end. It could only generate about 140k emails an hour.
So i hacked in the Python multiprocessing module (need Python 2.6+) into pickdist.py and added forking support. This got around Python's global interpreter lock and now we can finally see huge performance gains from multi-cpu machines.
By having pickdist.py fork off 12 worker threads (we a 8 cpu core server) and always returning true to the queueIsFree call we went from generating about 140k emails an hour to over 4 million an hour!! Holy moly! I've included my pickdist.py patch below.
Hope this helps some people out!
We're on Centos 5.4 64bit, Python 2.6.5, and OpenEMM 5.4.0. You need Python 2.6+ for the multiprocess library.
Happy Hacking!
liam
You can download our pickdist.py patch here: http://dropzone.slacker.com/~liam/pickd ... cker.patch
A note on the patch. We run OpenEMM 5.4.0 and i don't know if pickdist.py has been changed in later releases. Its a pretty small change though so it should be easy to patch into later versions.
If you want to put back in the queue rate limiting (i removed it) remove True and uncomment the < 5000 line in queueIsFree.
Here is the patch incase the above url stops working
Code: Select all
--- pickdist.py.ORIG 2010-04-28 01:55:09.000000000 -0700
+++ pickdist.py 2010-04-28 01:57:15.000000000 -0700
@@ -20,16 +20,34 @@
* Reserved.
*
* Contributor(s): AGNITAS AG.
+*
+*
+* Added mutliprocessing support (forking) to improve performance via the py2.6
+* multiprocessing library. Modify numthread below to specify how many
+* processes to fork out.
+*
+* Liam Slusser liam@slacker.com
+* April 26th 2010
+*
**********************************************************************************
"""
#
import os, time, signal
import shutil
import agn
+# import multiprocessing goodness
+from multiprocessing import Process
agn.require ('1.5.3')
agn.loglevel = agn.LV_INFO
if agn.iswin:
import subprocess
+
+# the number of forks to spawn to do work
+numthread = 12
+
+# some debugging messages if you want
+debug = 0
+
#
class Block:
def __init__ (self, path):
@@ -159,7 +177,9 @@ class Pickdist:
# adjust queue size to be processed
def queueIsFree (self):
- return len ([_f for _f in os.listdir (self.queue) if _f[:2] == 'qf']) < 5000
+ #return len ([_f for _f in os.listdir (self.queue) if _f[:2] == 'qf']) < 5000
+ # Why wait, just keep going
+ return True
def hasData (self):
return len (self.data) > 0
@@ -177,42 +197,68 @@ def handler (sig, stack):
global term
term = True
-signal.signal (signal.SIGINT, handler)
-signal.signal (signal.SIGTERM, handler)
-if not agn.iswin:
- signal.signal (signal.SIGHUP, signal.SIG_IGN)
- signal.signal (signal.SIGPIPE, signal.SIG_IGN)
-#
-agn.lock ()
-agn.log (agn.LV_INFO, 'main', 'Starting up')
-#
-pd = Pickdist ()
-while not term:
- time.sleep (1)
- agn.mark (agn.LV_INFO, 'loop', 180)
- if pd.scanForData () == 0:
- delay = 30
- agn.log (agn.LV_VERBOSE, 'loop', 'No ready to send data file found')
+def doPackMove(pd,blk,agn):
+
+ if blk.unpack (pd.queue):
+ blk.moveTo (agn.mkArchiveDirectory (pd.archive))
else:
- delay = 0
- while not term and pd.hasData ():
- if not pd.queueIsFree ():
- agn.log (agn.LV_INFO, 'loop', 'Queue is already filled up')
- delay = 180
- break
- blk = pd.getNextBlock ()
- if blk.unpack (pd.queue):
- blk.moveTo (agn.mkArchiveDirectory (pd.archive))
- else:
- blk.moveTo (pd.recover)
- while not term and delay > 0:
-
- if agn.iswin and agn.winstop ():
- term = True
- break
+ blk.moveTo (pd.recover)
+
+if __name__ == '__main__':
+ signal.signal (signal.SIGINT, handler)
+ signal.signal (signal.SIGTERM, handler)
+
+ if not agn.iswin:
+ signal.signal (signal.SIGHUP, signal.SIG_IGN)
+ signal.signal (signal.SIGPIPE, signal.SIG_IGN)
+ #
+ agn.lock ()
+ agn.log (agn.LV_INFO, 'main', 'Starting up')
+
+ #
+ pd = Pickdist ()
+ while not term:
time.sleep (1)
- delay -= 1
-#
-agn.log (agn.LV_INFO, 'main', 'Going down')
-agn.unlock ()
+ agn.mark (agn.LV_INFO, 'loop', 180)
+ if pd.scanForData () == 0:
+ delay = 30
+ agn.log (agn.LV_VERBOSE, 'loop', 'No ready to send data file found')
+ else:
+ delay = 0
+ while not term and pd.hasData ():
+ if not pd.queueIsFree ():
+ agn.log (agn.LV_INFO, 'loop', 'Queue is already filled up')
+ delay = 180
+ break
+
+ p = {}
+ # spawn a whole bunch of workers!
+ for proc in range(0,numthread):
+ if not pd.hasData():
+ break
+ else:
+ if debug:
+ print "%s spawning process" % (proc)
+ blk = pd.getNextBlock ()
+ p[proc] = Process(target=doPackMove, args=(pd,blk,agn))
+ p[proc].start()
+
+ # wait for workers to finish
+ for proc in range(0,numthread):
+ if p.has_key(proc):
+ if debug:
+ print "%s joining process" % (proc)
+ p[proc].join()
+
+
+ while not term and delay > 0:
+
+ if agn.iswin and agn.winstop ():
+ term = True
+ break
+ time.sleep (1)
+ delay -= 1
+ #
+ agn.log (agn.LV_INFO, 'main', 'Going down')
+ agn.unlock ()