summaryrefslogtreecommitdiffstats
path: root/wallace/__init__.py
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2012-05-24 17:20:48 +0100
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2012-05-24 17:20:48 +0100
commitc116e801d7611d38f5c70f0d148de1616be5643e (patch)
tree6f51e95d0f6e5c864dfe0ebb1be8d7bae73ea6fc /wallace/__init__.py
parent205d67c00df3f0562469be8114bddc1a535a7202 (diff)
downloadpykolab-c116e801d7611d38f5c70f0d148de1616be5643e.tar.gz
- Use pools instead of the less subtle threading, as threading would
allow the daemon to lock up under heavy (> 10k/s) load.
Diffstat (limited to 'wallace/__init__.py')
-rw-r--r--wallace/__init__.py439
1 files changed, 132 insertions, 307 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py
index edd47a6..0d04c5b 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -18,13 +18,16 @@
#
import asyncore
+import binascii
import grp
+import multiprocessing
import os
import pwd
from smtpd import SMTPChannel
+import socket
+import struct
import sys
import tempfile
-import threading
import time
import traceback
@@ -34,6 +37,27 @@ from pykolab.translate import _
log = pykolab.getLogger('pykolab.wallace')
conf = pykolab.getConf()
+max_threads = 24
+
+def pickup_message(filepath, *args, **kw):
+ wallace_modules = args[0]
+ if kw.has_key('module'):
+
+ # Cause the previous modules to be skipped
+ wallace_modules = wallace_modules[(wallace_modules.index(kw['module'])+1):]
+
+ # Execute the module
+ if kw.has_key('stage'):
+ modules.execute(kw['module'], filepath, stage=kw['stage'])
+ else:
+ modules.execute(kw['module'], filepath)
+
+ for module in wallace_modules:
+ modules.execute(module, filepath)
+
+def worker_process(*args, **kw):
+ log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
+
class WallaceDaemon(object):
def __init__(self):
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options"))
@@ -95,122 +119,136 @@ class WallaceDaemon(object):
import modules
modules.__init__()
- def process_message(self, peer, mailfrom, rcpttos, data):
- """
- We have retrieved the message.
+ self.modules = conf.get_list('wallace', 'modules')
+ if self.modules == None:
+ self.modules = ['resources']
+ elif not 'resources' in self.modules:
+ self.modules.append('resources')
- - Dispatch to virus-scanning and anti-spam filtering?
- Not for now. We use some sort of re-injection.
+ def do_wallace(self):
+ self.pool = multiprocessing.Pool(max_threads, worker_process, (), 1)
- - Apply access policies;
- - Maximum number of recipients,
- - kolabAllowSMTPSender,
- - kolabAllowSMTPRecipient,
- - Rule-based matching against white- and/or blacklist
- - ...
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- - Accounting
+ bound = False
+ shutdown = False
+ while not bound:
+ try:
+ if shutdown:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- - Data Loss Prevention
- """
- inheaders = 1
+ s.bind((conf.wallace_bind_address, conf.wallace_port))
+ bound = True
+ except Exception, e:
+ log.warning(
+ _("Could not bind to socket on port %d on bind " + \
+ "address %s") % (
+ conf.wallace_port,
+ conf.wallace_bind_address
+ )
+ )
- (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/")
- os.write(fp, data)
- os.close(fp)
+ while not shutdown:
+ try:
+ s.shutdown(socket.SHUT_RDWR)
+ shutdown = True
+ except Exception, e:
+ log.warning(_("Could not shut down socket"))
+ time.sleep(1)
- while threading.active_count() > 25:
- log.debug(
- _("Number of threads currently running: %d") % (
- threading.active_count()
- ),
- level=8
- )
-
- time.sleep(1)
-
- log.debug(
- _("Continuing with %d threads currently running") % (
- threading.active_count()
- ),
- level=8
- )
+ s.close()
- # TODO: Apply throttling
- log.debug(_("Creating thread for message in %s") % (filename), level=8)
+ time.sleep(1)
- thread = threading.Thread(target=self.thread_run, args=[ filename ])
- thread.start()
+ s.listen(5)
- def thread_run(self, filename, *args, **kw):
- while threading.active_count() > 25:
- log.debug(
- _("Number of threads currently running: %d") % (
- threading.active_count()
- ),
- level=8
- )
+ # Mind you to include the trailing slash
+ pickup_path = '/var/spool/pykolab/wallace/'
+ for root, directory, files in os.walk(pickup_path):
+ for filename in files:
+ filepath = os.path.join(root, filename)
- time.sleep(10)
+ if not root == pickup_path:
+ module = os.path.dirname(root).replace(pickup_path, '')
- log.debug(
- _("Continuing with %d threads currently running") % (
- threading.active_count()
- ),
- level=8
- )
+ # Compare uppercase status (specifically, DEFER) with
+ # lowercase (plugin names).
+ #
+ # The messages in DEFER are supposed to be picked up by
+ # another thread, whereas the messages in other directories
+ # are pending being handled by their respective plugins.
+ #
+ # TODO: Handle messages in spool directories for which a
+ # plugin had been enabled, but is not enabled any longer.
+ #
- log.debug(
- _("Running thread %s for message file %s") % (
- threading.current_thread().name,
- filename
- ),
- level=8
- )
+ if module.lower() == "defer":
+ # Wallace was unable to deliver to re-injection smtpd.
+ # Skip it, another thread is picking up the deferred
+ # messages.
+ continue
- if kw.has_key('module'):
- log.debug(
- _("This message was already in module %s, delegating " + \
- "specifically to that module") % (
- kw['module']
- ),
- level=8
- )
-
- if kw.has_key('stage'):
- log.debug(
- _("It was also in a certain stage: %s, letting " + \
- "module %s know that") % (
- kw['stage'],
- kw['module']
- ),
- level=8
- )
+ stage = root.replace(pickup_path, '').split('/')
+ if len(stage) < 2:
+ stage = None
+ else:
+ stage = stage[1]
- log.debug(_("Executing module %s") % (kw['module']), level=8)
+ if stage.lower() == "hold":
+ continue
- modules.execute(kw['module'], filename, stage=kw['stage'])
+ # Do not handle messages in a defer state.
+ if stage.lower() == "defer":
+ continue
- return
+ self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage}))
- log.debug(_("Executing module %s") % (kw['module']), level=8)
- modules.execute(kw['module'], filename, stage=kw['stage'])
+ continue
- return
+ self.pool.apply_async(pickup_message, (filepath, (self.modules)))
- wallace_modules = conf.get_list('wallace', 'modules')
- if wallace_modules == None:
- wallace_modules = ['resources']
- elif not 'resources' in wallace_modules:
- wallace_modules.append('resources')
+ try:
+ while 1:
+ pair = s.accept()
+ log.info(_("Accepted connection"))
+ if not pair == None:
+ connection, address = pair
+ #print "Accepted connection from %r" % (address)
+ channel = SMTPChannel(self, connection, address)
+ asyncore.loop()
+ except Exception, errmsg:
+ traceback.print_exc()
+ s.shutdown(1)
+ s.close()
- for module in wallace_modules:
- log.debug(_("Executing module %s") % (module), level=8)
- modules.execute(module, filename)
+ def process_message(self, peer, mailfrom, rcpttos, data):
+ """
+ We have retrieved the message. This should be as fast as possible,
+ and not ever block.
+ """
+ inheaders = 1
+
+ (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/")
+ os.write(fp, data)
+ os.close(fp)
+
+ self.pool.apply_async(pickup_message, (filename, (self.modules)))
+
+ return
+
+ def reload_config(self, *args, **kw):
+ pass
+
+ def remove_pid(self, *args, **kw):
+ if os.access(conf.pidfile, os.R_OK):
+ os.remove(conf.pidfile)
+ raise SystemExit
def run(self):
"""
- Run the SASL authentication daemon.
+ Run the Wallace daemon.
"""
exitcode = 0
@@ -320,219 +358,6 @@ class WallaceDaemon(object):
sys.exit(exitcode)
- def pickup_defer(self):
- wallace_modules = conf.get_list('wallace', 'modules')
-
- if wallace_modules == None:
- wallace_modules = []
-
- base_path = '/var/spool/pykolab/wallace/'
-
- while 1:
- file_count = 0
-
- log.debug(_("Picking up deferred messages for wallace"), level=8)
-
- defer_path = os.path.join(base_path, 'DEFER')
-
- if os.path.isdir(defer_path):
- for root, directory, files in os.walk(defer_path):
- for filename in files:
- filepath = os.path.join(root, filename)
-
- file_count += 1
-
- for module in wallace_modules:
- modules.execute(module, filepath)
-
- time.sleep(1)
-
- time.sleep(1)
-
- for module in wallace_modules:
- log.debug(
- _("Picking up deferred messages for module %s") % (
- module
- ),
- level=8
- )
-
- module_defer_path = os.path.join(base_path, module, 'DEFER')
-
- if os.path.isdir(module_defer_path):
- for root, directory, files in os.walk(module_defer_path):
- for filename in files:
- filepath = os.path.join(root, filename)
-
- file_count += 1
-
- modules.execute(module, filepath)
-
- time.sleep(1)
-
- time.sleep(1)
-
- # Sleep for 300 seconds before reprocessing the deferred queues.
- # TODO: Consider using queue_run_delay from Postfix, which is where
- # the default value of 300 seconds comes from.
- log.debug(_("Sleeping for 300 seconds"), level=8)
- time.sleep(300)
-
- def do_wallace(self):
- import binascii
- import socket
- import struct
-
- #s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- ## TODO: The wallace socket path could be a setting.
- #try:
- #os.remove('/var/run/kolab/wallace')
- #except:
- ## TODO: Do the "could not remove, could not start dance"
- #pass
-
- bound = False
- while not bound:
- try:
- s.bind((conf.wallace_bind_address, conf.wallace_port))
- bound = True
- except Exception, e:
- log.warning(
- _("Could not bind to socket on port %d on bind " + \
- "address %s") % (
- conf.wallace_port,
- conf.wallace_bind_address
- )
- )
-
- try:
- s.shutdown(1)
- except Exception, e:
- log.warning(_("Could not shut down socket"))
-
- s.close()
-
- time.sleep(1)
-
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- #os.chmod('/var/run/kolab/wallace', 0777)
- #os.chgrp('/var/run/wallace/mux', 'kolab')
- #os.chown('/var/run/wallace/mux', 'kolab')
-
- s.listen(5)
-
- # Mind you to include the trailing slash
- pickup_path = '/var/spool/pykolab/wallace/'
- for root, directory, files in os.walk(pickup_path):
- for filename in files:
- filepath = os.path.join(root, filename)
-
- if not root == pickup_path:
- module = os.path.dirname(root).replace(pickup_path, '')
-
- # Compare uppercase status (specifically, DEFER) with
- # lowercase (plugin names).
- #
- # The messages in DEFER are supposed to be picked up by
- # another thread, whereas the messages in other directories
- # are pending being handled by their respective plugins.
- #
- # TODO: Handle messages in spool directories for which a
- # plugin had been enabled, but is not enabled any longer.
- #
-
- if module.lower() == "defer":
- # Wallace was unable to deliver to re-injection smtpd.
- # Skip it, another thread is picking up the deferred
- # messages.
- continue
-
- stage = root.replace(pickup_path, '').split('/')
- if len(stage) < 2:
- stage = None
- else:
- stage = stage[1]
-
- if stage.lower() == "hold":
- continue
-
- # Do not handle messages in a defer state.
- if stage.lower() == "defer":
- continue
-
- log.debug(
- _("Number of threads currently running: %d") % (
- threading.active_count()
- ),
- level=8
- )
-
- thread = threading.Thread(
- target = self.thread_run,
- args = [ filepath ],
- kwargs = {
- "module": '%s' % (module),
- "stage": '%s' % (stage)
- }
- )
-
- thread.start()
- time.sleep(0.5)
-
- continue
-
- log.debug(
- _("Picking up spooled email file %s") % (
- filepath
- ),
- level=8
- )
-
- log.debug(
- _("Number of threads currently running: %d") % (
- threading.active_count()
- ),
- level=8
- )
-
- thread = threading.Thread(
- target=self.thread_run,
- args=[ filepath ]
- )
-
- thread.start()
- time.sleep(0.5)
-
- pid = os.fork()
-
- if pid == 0:
- self.pickup_defer()
- else:
-
- try:
- while 1:
- pair = s.accept()
- log.info(_("Accepted connection"))
- if not pair == None:
- connection, address = pair
- #print "Accepted connection from %r" % (address)
- channel = SMTPChannel(self, connection, address)
- asyncore.loop()
- except Exception, errmsg:
- traceback.print_exc()
- s.shutdown(1)
- s.close()
-
- def reload_config(self, *args, **kw):
- pass
-
- def remove_pid(self, *args, **kw):
- if os.access(conf.pidfile, os.R_OK):
- os.remove(conf.pidfile)
- raise SystemExit
-
def set_signal_handlers(self):
import signal
signal.signal(signal.SIGHUP, self.reload_config)