diff options
author | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2012-05-24 17:20:48 +0100 |
---|---|---|
committer | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2012-05-24 17:20:48 +0100 |
commit | c116e801d7611d38f5c70f0d148de1616be5643e (patch) | |
tree | 6f51e95d0f6e5c864dfe0ebb1be8d7bae73ea6fc | |
parent | 205d67c00df3f0562469be8114bddc1a535a7202 (diff) | |
download | pykolab-c116e801d7611d38f5c70f0d148de1616be5643e.tar.gz |
- Use pools instead of the less subtle threading, as threading would
allow the daemon to lock up under heavy (> 10k/s) load.
-rw-r--r-- | wallace/__init__.py | 439 |
1 files changed, 132 insertions, 307 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py index edd47a6..0d04c5b 100644 --- a/wallace/__init__.py +++ b/wallace/__init__.py @@ -18,13 +18,16 @@ # import asyncore +import binascii import grp +import multiprocessing import os import pwd from smtpd import SMTPChannel +import socket +import struct import sys import tempfile -import threading import time import traceback @@ -34,6 +37,27 @@ from pykolab.translate import _ log = pykolab.getLogger('pykolab.wallace') conf = pykolab.getConf() +max_threads = 24 + +def pickup_message(filepath, *args, **kw): + wallace_modules = args[0] + if kw.has_key('module'): + + # Cause the previous modules to be skipped + wallace_modules = wallace_modules[(wallace_modules.index(kw['module'])+1):] + + # Execute the module + if kw.has_key('stage'): + modules.execute(kw['module'], filepath, stage=kw['stage']) + else: + modules.execute(kw['module'], filepath) + + for module in wallace_modules: + modules.execute(module, filepath) + +def worker_process(*args, **kw): + log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) + class WallaceDaemon(object): def __init__(self): daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) @@ -95,122 +119,136 @@ class WallaceDaemon(object): import modules modules.__init__() - def process_message(self, peer, mailfrom, rcpttos, data): - """ - We have retrieved the message. + self.modules = conf.get_list('wallace', 'modules') + if self.modules == None: + self.modules = ['resources'] + elif not 'resources' in self.modules: + self.modules.append('resources') - - Dispatch to virus-scanning and anti-spam filtering? - Not for now. We use some sort of re-injection. + def do_wallace(self): + self.pool = multiprocessing.Pool(max_threads, worker_process, (), 1) - - Apply access policies; - - Maximum number of recipients, - - kolabAllowSMTPSender, - - kolabAllowSMTPRecipient, - - Rule-based matching against white- and/or blacklist - - ... + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - Accounting + bound = False + shutdown = False + while not bound: + try: + if shutdown: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - Data Loss Prevention - """ - inheaders = 1 + s.bind((conf.wallace_bind_address, conf.wallace_port)) + bound = True + except Exception, e: + log.warning( + _("Could not bind to socket on port %d on bind " + \ + "address %s") % ( + conf.wallace_port, + conf.wallace_bind_address + ) + ) - (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/") - os.write(fp, data) - os.close(fp) + while not shutdown: + try: + s.shutdown(socket.SHUT_RDWR) + shutdown = True + except Exception, e: + log.warning(_("Could not shut down socket")) + time.sleep(1) - while threading.active_count() > 25: - log.debug( - _("Number of threads currently running: %d") % ( - threading.active_count() - ), - level=8 - ) - - time.sleep(1) - - log.debug( - _("Continuing with %d threads currently running") % ( - threading.active_count() - ), - level=8 - ) + s.close() - # TODO: Apply throttling - log.debug(_("Creating thread for message in %s") % (filename), level=8) + time.sleep(1) - thread = threading.Thread(target=self.thread_run, args=[ filename ]) - thread.start() + s.listen(5) - def thread_run(self, filename, *args, **kw): - while threading.active_count() > 25: - log.debug( - _("Number of threads currently running: %d") % ( - threading.active_count() - ), - level=8 - ) + # Mind you to include the trailing slash + pickup_path = '/var/spool/pykolab/wallace/' + for root, directory, files in os.walk(pickup_path): + for filename in files: + filepath = os.path.join(root, filename) - time.sleep(10) + if not root == pickup_path: + module = os.path.dirname(root).replace(pickup_path, '') - log.debug( - _("Continuing with %d threads currently running") % ( - threading.active_count() - ), - level=8 - ) + # Compare uppercase status (specifically, DEFER) with + # lowercase (plugin names). + # + # The messages in DEFER are supposed to be picked up by + # another thread, whereas the messages in other directories + # are pending being handled by their respective plugins. + # + # TODO: Handle messages in spool directories for which a + # plugin had been enabled, but is not enabled any longer. + # - log.debug( - _("Running thread %s for message file %s") % ( - threading.current_thread().name, - filename - ), - level=8 - ) + if module.lower() == "defer": + # Wallace was unable to deliver to re-injection smtpd. + # Skip it, another thread is picking up the deferred + # messages. + continue - if kw.has_key('module'): - log.debug( - _("This message was already in module %s, delegating " + \ - "specifically to that module") % ( - kw['module'] - ), - level=8 - ) - - if kw.has_key('stage'): - log.debug( - _("It was also in a certain stage: %s, letting " + \ - "module %s know that") % ( - kw['stage'], - kw['module'] - ), - level=8 - ) + stage = root.replace(pickup_path, '').split('/') + if len(stage) < 2: + stage = None + else: + stage = stage[1] - log.debug(_("Executing module %s") % (kw['module']), level=8) + if stage.lower() == "hold": + continue - modules.execute(kw['module'], filename, stage=kw['stage']) + # Do not handle messages in a defer state. + if stage.lower() == "defer": + continue - return + self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage})) - log.debug(_("Executing module %s") % (kw['module']), level=8) - modules.execute(kw['module'], filename, stage=kw['stage']) + continue - return + self.pool.apply_async(pickup_message, (filepath, (self.modules))) - wallace_modules = conf.get_list('wallace', 'modules') - if wallace_modules == None: - wallace_modules = ['resources'] - elif not 'resources' in wallace_modules: - wallace_modules.append('resources') + try: + while 1: + pair = s.accept() + log.info(_("Accepted connection")) + if not pair == None: + connection, address = pair + #print "Accepted connection from %r" % (address) + channel = SMTPChannel(self, connection, address) + asyncore.loop() + except Exception, errmsg: + traceback.print_exc() + s.shutdown(1) + s.close() - for module in wallace_modules: - log.debug(_("Executing module %s") % (module), level=8) - modules.execute(module, filename) + def process_message(self, peer, mailfrom, rcpttos, data): + """ + We have retrieved the message. This should be as fast as possible, + and not ever block. + """ + inheaders = 1 + + (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/") + os.write(fp, data) + os.close(fp) + + self.pool.apply_async(pickup_message, (filename, (self.modules))) + + return + + def reload_config(self, *args, **kw): + pass + + def remove_pid(self, *args, **kw): + if os.access(conf.pidfile, os.R_OK): + os.remove(conf.pidfile) + raise SystemExit def run(self): """ - Run the SASL authentication daemon. + Run the Wallace daemon. """ exitcode = 0 @@ -320,219 +358,6 @@ class WallaceDaemon(object): sys.exit(exitcode) - def pickup_defer(self): - wallace_modules = conf.get_list('wallace', 'modules') - - if wallace_modules == None: - wallace_modules = [] - - base_path = '/var/spool/pykolab/wallace/' - - while 1: - file_count = 0 - - log.debug(_("Picking up deferred messages for wallace"), level=8) - - defer_path = os.path.join(base_path, 'DEFER') - - if os.path.isdir(defer_path): - for root, directory, files in os.walk(defer_path): - for filename in files: - filepath = os.path.join(root, filename) - - file_count += 1 - - for module in wallace_modules: - modules.execute(module, filepath) - - time.sleep(1) - - time.sleep(1) - - for module in wallace_modules: - log.debug( - _("Picking up deferred messages for module %s") % ( - module - ), - level=8 - ) - - module_defer_path = os.path.join(base_path, module, 'DEFER') - - if os.path.isdir(module_defer_path): - for root, directory, files in os.walk(module_defer_path): - for filename in files: - filepath = os.path.join(root, filename) - - file_count += 1 - - modules.execute(module, filepath) - - time.sleep(1) - - time.sleep(1) - - # Sleep for 300 seconds before reprocessing the deferred queues. - # TODO: Consider using queue_run_delay from Postfix, which is where - # the default value of 300 seconds comes from. - log.debug(_("Sleeping for 300 seconds"), level=8) - time.sleep(300) - - def do_wallace(self): - import binascii - import socket - import struct - - #s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - ## TODO: The wallace socket path could be a setting. - #try: - #os.remove('/var/run/kolab/wallace') - #except: - ## TODO: Do the "could not remove, could not start dance" - #pass - - bound = False - while not bound: - try: - s.bind((conf.wallace_bind_address, conf.wallace_port)) - bound = True - except Exception, e: - log.warning( - _("Could not bind to socket on port %d on bind " + \ - "address %s") % ( - conf.wallace_port, - conf.wallace_bind_address - ) - ) - - try: - s.shutdown(1) - except Exception, e: - log.warning(_("Could not shut down socket")) - - s.close() - - time.sleep(1) - - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - #os.chmod('/var/run/kolab/wallace', 0777) - #os.chgrp('/var/run/wallace/mux', 'kolab') - #os.chown('/var/run/wallace/mux', 'kolab') - - s.listen(5) - - # Mind you to include the trailing slash - pickup_path = '/var/spool/pykolab/wallace/' - for root, directory, files in os.walk(pickup_path): - for filename in files: - filepath = os.path.join(root, filename) - - if not root == pickup_path: - module = os.path.dirname(root).replace(pickup_path, '') - - # Compare uppercase status (specifically, DEFER) with - # lowercase (plugin names). - # - # The messages in DEFER are supposed to be picked up by - # another thread, whereas the messages in other directories - # are pending being handled by their respective plugins. - # - # TODO: Handle messages in spool directories for which a - # plugin had been enabled, but is not enabled any longer. - # - - if module.lower() == "defer": - # Wallace was unable to deliver to re-injection smtpd. - # Skip it, another thread is picking up the deferred - # messages. - continue - - stage = root.replace(pickup_path, '').split('/') - if len(stage) < 2: - stage = None - else: - stage = stage[1] - - if stage.lower() == "hold": - continue - - # Do not handle messages in a defer state. - if stage.lower() == "defer": - continue - - log.debug( - _("Number of threads currently running: %d") % ( - threading.active_count() - ), - level=8 - ) - - thread = threading.Thread( - target = self.thread_run, - args = [ filepath ], - kwargs = { - "module": '%s' % (module), - "stage": '%s' % (stage) - } - ) - - thread.start() - time.sleep(0.5) - - continue - - log.debug( - _("Picking up spooled email file %s") % ( - filepath - ), - level=8 - ) - - log.debug( - _("Number of threads currently running: %d") % ( - threading.active_count() - ), - level=8 - ) - - thread = threading.Thread( - target=self.thread_run, - args=[ filepath ] - ) - - thread.start() - time.sleep(0.5) - - pid = os.fork() - - if pid == 0: - self.pickup_defer() - else: - - try: - while 1: - pair = s.accept() - log.info(_("Accepted connection")) - if not pair == None: - connection, address = pair - #print "Accepted connection from %r" % (address) - channel = SMTPChannel(self, connection, address) - asyncore.loop() - except Exception, errmsg: - traceback.print_exc() - s.shutdown(1) - s.close() - - def reload_config(self, *args, **kw): - pass - - def remove_pid(self, *args, **kw): - if os.access(conf.pidfile, os.R_OK): - os.remove(conf.pidfile) - raise SystemExit - def set_signal_handlers(self): import signal signal.signal(signal.SIGHUP, self.reload_config) |