diff options
author | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2012-03-02 15:47:14 +0000 |
---|---|---|
committer | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2012-03-02 15:47:14 +0000 |
commit | 219e2a96d15e867b220cc92ee91a03ede0984cfc (patch) | |
tree | 47e076cbfc4c94295856609fad9d81d59143fcad | |
parent | 2d7c188c7bea6932918d74715c32b2575358dd4a (diff) | |
download | pykolab-219e2a96d15e867b220cc92ee91a03ede0984cfc.tar.gz |
Add the bare bones of Wallace with the optout module
-rwxr-xr-x | test-wallace.py | 93 | ||||
-rw-r--r-- | wallace/__init__.py | 328 | ||||
-rw-r--r-- | wallace/module_optout.py | 150 | ||||
-rw-r--r-- | wallace/modules.py | 170 |
4 files changed, 741 insertions, 0 deletions
diff --git a/test-wallace.py b/test-wallace.py new file mode 100755 index 0000000..84d3488 --- /dev/null +++ b/test-wallace.py @@ -0,0 +1,93 @@ +#!/usr/bin/python +# +# Copyright 2010-2012 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import smtplib +import socket +import sys + +# For development purposes +sys.path.extend(['.', '..']) + +from email.MIMEMultipart import MIMEMultipart +from email.MIMEBase import MIMEBase +from email.MIMEText import MIMEText +from email.Utils import COMMASPACE, formatdate +from email import Encoders + +def send_mail(send_from, send_to, send_with=None): + smtp = smtplib.SMTP("localhost", 8025) + smtp.set_debuglevel(True) + subject = "This is a Kolab load test mail" + text = """Hi there, + +I am a Kolab Groupware test email, generated by a script that makes +me send lots of email to lots of people using one account and a bunch +of delegation blah. + +Your response, though completely unnecessary, would be appreciated, because +being a fictitious character doesn't do my address book of friends any good. + +Kind regards, + +Lucy Meier. +""" + + msg = MIMEMultipart() + msg['From'] = send_from + msg['To'] = COMMASPACE.join(send_to) + msg['Date'] = formatdate(localtime=True) + msg['Subject'] = subject + + msg.attach( MIMEText(text) ) + + #msg.attach( MIMEBase('application', open('/boot/initrd-plymouth.img').read()) ) + + smtp.sendmail(send_from, send_to, msg.as_string()) + +if __name__ == "__main__": + #send_to = [ + #'Jeroen van Meeuwen <jeroen.vanmeeuwen@klab.cc>', + #'Aleksander Machniak <aleksander.machniak@klab.cc>', + #'Georg Greve <georg.greve@klab.cc>', + #'Paul Adams <paul.adams@klab.cc>', + #'Thomas Broderli <thomas.broderli@klab.cc>', + #'Christoph Wickert <christoph.wickert@klab.cc>', + #'Lucy Meier <lucy.meier@klab.cc>', + #] + + + #send_mail( + #'Jeroen van Meeuwen <jeroen.vanmeeuwen@klab.cc>', + #send_to + #) + + #send_mail( + #'Lucy Meier on behalf of Paul Adams <paul.adams@test90.kolabsys.com>', + #send_to + #) + + #send_mail( + #'Lucy Meier on behalf of Georg Greve <georg.greve@test90.kolabsys.com>', + #send_to + #) + + send_to = [ 'Jeroen van Meeuwen <jeroen.vanmeeuwen@klab.cc>' ] + + send_mail('Jeroen van Meeuwen <vanmeeuwen@kolabsys.com>', send_to) diff --git a/wallace/__init__.py b/wallace/__init__.py new file mode 100644 index 0000000..b9df940 --- /dev/null +++ b/wallace/__init__.py @@ -0,0 +1,328 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2012 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import asyncore +import os +from smtpd import SMTPChannel +import sys +import tempfile +import threading +import time +import traceback + +import pykolab +from pykolab.translate import _ + +log = pykolab.getLogger('pykolab.wallace') +conf = pykolab.getConf() + +class WallaceDaemon(object): + def __init__(self): + daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) + + daemon_group.add_option( + "--fork", + dest = "fork_mode", + action = "store_true", + default = False, + help = _("Fork to the background.") + ) + + conf.finalize_conf() + + import modules + modules.__init__() + + def process_message(self, peer, mailfrom, rcpttos, data): + """ + We have retrieved the message. + + - Dispatch to virus-scanning and anti-spam filtering? + - Apply access policies; + - Maximum number of recipients, + - kolabAllowSMTPSender, + - kolabAllowSMTPRecipient, + - Rule-based matching against white- and/or blacklist + - ... + - Accounting + - Data Loss Prevention + """ + inheaders = 1 + + (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/") + + os.write(fp, data) + os.close(fp) + + while threading.active_count() > 25: + log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) + time.sleep(10) + + log.debug(_("Continuing with %d threads currently running") %(threading.active_count()), level=8) + + # TODO: Apply throttling + log.debug(_("Creating thread for message in %s") %(filename), level=8) + + thread = threading.Thread(target=self.thread_run, args=[ filename ]) + thread.start() + + def thread_run(self, filename, *args, **kw): + while threading.active_count() > 25: + log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) + time.sleep(10) + + log.debug(_("Continuing with %d threads currently running") %(threading.active_count()), level=8) + + log.debug(_("Running thread %s for message file %s") %(threading.current_thread().name,filename), level=8) + + if kw.has_key('module'): + log.debug(_("This message was already in module %s, delegating specifically to that module") %(kw['module']), level=8) + + if kw.has_key('stage'): + log.debug(_("It was also in a certain stage: %s, letting module %s know that") %(kw['stage'],kw['module']), level=8) + + log.debug(_("Executing module %s") %(kw['module']), level=8) + + modules.execute(kw['module'], filename, stage=kw['stage']) + + return + + log.debug(_("Executing module %s") %(kw['module']), level=8) + modules.execute(kw['module'], filename, stage=kw['stage']) + + return + + wallace_modules = conf.get_list('wallace', 'modules') + if wallace_modules == None: + wallace_modules = [] + + for module in wallace_modules: + log.debug(_("Executing module %s") %(module), level=8) + modules.execute(module, filename) + + def run(self): + """ + Run the SASL authentication daemon. + """ + + exitcode = 0 + + try: + pid = 1 + if conf.fork_mode: + self.thread_count += 1 + pid = os.fork() + + if pid == 0: + log.remove_stdout_handler() + + self.do_wallace() + + except SystemExit, e: + exitcode = e + except KeyboardInterrupt: + exitcode = 1 + log.info(_("Interrupted by user")) + except AttributeError, e: + exitcode = 1 + traceback.print_exc() + print >> sys.stderr, _("Traceback occurred, please report a bug at http://bugzilla.kolabsys.com") + except TypeError, e: + exitcode = 1 + traceback.print_exc() + log.error(_("Type Error: %s") % e) + except: + exitcode = 2 + traceback.print_exc() + print >> sys.stderr, _("Traceback occurred, please report a bug at http://bugzilla.kolabsys.com") + sys.exit(exitcode) + + def pickup_defer(self): + wallace_modules = conf.get_list('wallace', 'modules') + + if wallace_modules == None: + wallace_modules = [] + + base_path = '/var/spool/pykolab/wallace/' + + while 1: + file_count = 0 + + log.debug(_("Picking up deferred messages for wallace"), level=8) + + defer_path = os.path.join(base_path, 'DEFER') + + if os.path.isdir(defer_path): + for root, directory, files in os.walk(defer_path): + for filename in files: + filepath = os.path.join(root, filename) + + file_count += 1 + + for module in wallace_modules: + modules.execute(module, filepath) + + time.sleep(1) + + time.sleep(1) + + for module in wallace_modules: + log.debug(_("Picking up deferred messages for module %s") %(module), level=8) + + module_defer_path = os.path.join(base_path, module, 'DEFER') + + if os.path.isdir(module_defer_path): + for root, directory, files in os.walk(module_defer_path): + for filename in files: + filepath = os.path.join(root, filename) + + file_count += 1 + + modules.execute(module, filepath) + + time.sleep(1) + + time.sleep(1) + + # Sleep longer if last time around we didn't find any deferred + # message files + if file_count > 0: + log.debug(_("Sleeping for 1 second"), level=8) + time.sleep(1) + else: + log.debug(_("Sleeping for 10 seconds"), level=8) + time.sleep(10) + + + def do_wallace(self): + import binascii + import socket + import struct + + #s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + ## TODO: The wallace socket path could be a setting. + #try: + #os.remove('/var/run/kolab/wallace') + #except: + ## TODO: Do the "could not remove, could not start dance" + #pass + + bound = False + while not bound: + try: + s.bind(('localhost', 8025)) + bound = True + except Exception, e: + log.warning(_("Could not bind to socket on port 8025")) + try: + s.shutdown(1) + except Exception, e: + log.warning(_("Could not shut down socket")) + + s.close() + + time.sleep(1) + + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + #os.chmod('/var/run/kolab/wallace', 0777) + #os.chgrp('/var/run/wallace/mux', 'kolab') + #os.chown('/var/run/wallace/mux', 'kolab') + + s.listen(5) + + # Mind you to include the trailing slash + pickup_path = '/var/spool/pykolab/wallace/' + for root, directory, files in os.walk(pickup_path): + for filename in files: + filepath = os.path.join(root, filename) + + if not root == pickup_path: + module = os.path.dirname(root).replace(pickup_path, '') + + # Compare uppercase status (specifically, DEFER) with lowercase + # (plugin names). + # + # The messages in DEFER are supposed to be picked up by + # another thread, whereas the messages in other directories + # are pending being handled by their respective plugins. + # + # TODO: Handle messages in spool directories for which a + # plugin had been enabled, but is not enabled any longer. + # + + if module.lower() == "defer": + # Wallace was unable to deliver to re-injection smtpd. + # Skip it, another thread is picking up the defers. + continue + + stage = root.replace(pickup_path, '').split('/') + if len(stage) < 2: + stage = None + else: + stage = stage[1] + + if stage.lower() == "hold": + continue + + # Do not handle messages in a defer state. + if stage.lower() == "defer": + continue + + log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) + thread = threading.Thread( + target = self.thread_run, + args = [ filepath ], + kwargs = { + "module": '%s' %(module), + "stage": '%s' %(stage) + } + ) + + thread.start() + time.sleep(0.5) + + continue + + log.debug(_("Picking up spooled email file %s") %(filepath), level=8) + log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) + thread = threading.Thread(target=self.thread_run, args=[ filepath ]) + thread.start() + time.sleep(0.5) + + pid = os.fork() + + if pid == 0: + self.pickup_defer() + else: + + try: + while 1: + pair = s.accept() + log.info(_("Accepted connection")) + if not pair == None: + connection, address = pair + #print "Accepted connection from %r" %(address) + channel = SMTPChannel(self, connection, address) + asyncore.loop() + except Exception, e: + traceback.print_exc() + s.shutdown(1) + s.close() diff --git a/wallace/module_optout.py b/wallace/module_optout.py new file mode 100644 index 0000000..d4bbf33 --- /dev/null +++ b/wallace/module_optout.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2012 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import os +import random +import tempfile +import time + +import modules + +import pykolab + +from pykolab.translate import _ + +log = pykolab.getLogger('pykolab.wallace') +conf = pykolab.getConf() + +mybasepath = '/var/spool/pykolab/wallace/optout/' + +def __init__(): + if not os.path.isdir(mybasepath): + os.makedirs(mybasepath) + + modules.register('optout', execute, description=description()) + +def description(): + return """Consult the opt-out service.""" + +def execute(*args, **kw): + filepath = args[0] + + if kw.has_key('stage'): + log.debug(_("Issuing callback after processing to stage %s") %(kw['stage']), level=8) + log.debug(_("Testing cb_action_%s()") %(kw['stage']), level=8) + if hasattr(modules, 'cb_action_%s' %(kw['stage'])): + log.debug(_("Attempting to execute cb_action_%s()") %(kw['stage']), level=8) + exec('modules.cb_action_%s(%r, %r)' %(kw['stage'],'optout',filepath)) + return + + #modules.next_module('optout') + + log.debug(_("Consulting opt-out service for %r, %r") %(args, kw), level=8) + + + import email + message = email.message_from_file(open(filepath, 'r')) + envelope_sender = email.utils.getaddresses(message.get_all('From', [])) + + recipients = { + "To": email.utils.getaddresses(message.get_all('To', [])), + "Cc": email.utils.getaddresses(message.get_all('Cc', [])) + # TODO: Are those all recipient addresses? + } + + # optout answers are ACCEPT, REJECT, HOLD or DEFER + answers = [ 'ACCEPT', 'REJECT', 'HOLD', 'DEFER' ] + + # Initialize our results placeholders. + _recipients = {} + + for answer in answers: + _recipients[answer] = { + "To": [], + "Cc": [] + } + + for recipient_type in recipients.keys(): + for recipient in recipients[recipient_type]: + log.debug( + _("Running opt-out consult from envelope sender '%s " + \ + "<%s>' to recipient %s <%s>") %( + envelope_sender[0][0], + envelope_sender[0][1], + recipient[0], + recipient[1] + ), + level=8 + ) + + optout_answer = answers[random.randint(0,(len(answers)-1))] + # Let's pretend it takes two seconds to get an answer, shall we? + time.sleep(2) + + _recipients[optout_answer][recipient_type].append(recipient) + + #print _recipients + + ## + ## TODO + ## + ## If one of them all is DEFER, DEFER the entire message and discard the + ## other results. + ## + + for answer in answers: + if not os.path.isdir(os.path.join(mybasepath, answer)): + os.makedirs(os.path.join(mybasepath, answer)) + + # Consider using a new mktemp()-like call + new_filepath = os.path.join(mybasepath, answer, os.path.basename(filepath)) + + # Write out a message file representing the new contents for the message + # use email.formataddr(recipient) + _message = email.message_from_file(open(filepath, 'r')) + + use_this = False + + for recipient_type in _recipients[answer].keys(): + _message.__delitem__(recipient_type) + if not len(_recipients[answer][recipient_type]) == 0: + _message.__setitem__(recipient_type, ',\n'.join([email.utils.formataddr(x) for x in _recipients[answer][recipient_type]])) + + use_this = True + + if use_this: + # TODO: Do not set items with an empty list. + (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/optout/%s" %(answer)) + os.write(fp, _message.__str__()) + os.close(fp) + + # Callback with new filename + if hasattr(modules, 'cb_action_%s' %(answer)): + log.debug(_("Attempting to execute cb_action_%s()") %(answer), level=8) + exec('modules.cb_action_%s(%r, %r)' %(answer,'optout', filename)) + + os.unlink(filepath) + + #print "Moving filepath %s to new_filepath %s" %(filepath, new_filepath) + #os.rename(filepath, new_filepath) + + #if hasattr(modules, 'cb_action_%s' %(optout_answer)): + #log.debug(_("Attempting to execute cb_action_%s()") %(optout_answer), level=8) + #exec('modules.cb_action_%s(%r, %r)' %(optout_answer,'optout', new_filepath)) + #return diff --git a/wallace/modules.py b/wallace/modules.py new file mode 100644 index 0000000..374f440 --- /dev/null +++ b/wallace/modules.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2012 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import os +import sys + +import pykolab +from pykolab.translate import _ + +log = pykolab.getLogger('pykolab.wallace') +conf = pykolab.getConf() + +modules = {} + +def __init__(): + # We only want the base path + modules_base_path = os.path.dirname(__file__) + + for modules_path, dirnames, filenames in os.walk(modules_base_path): + if not modules_path == modules_base_path: + continue + + for filename in filenames: + if filename.startswith('module_') and filename.endswith('.py'): + module_name = filename.replace('.py','') + name = module_name.replace('module_', '') + #print "exec(\"from %s import __init__ as %s_register\"" %(module_name,name) + exec("from %s import __init__ as %s_register" %(module_name,name)) + exec("%s_register()" %(name)) + + for dirname in dirnames: + register_group(modules_path, dirname) + +def list_modules(*args, **kw): + """ + List modules + """ + + __modules = {} + + for module in modules.keys(): + if isinstance(module, tuple): + module_group, module = module + __modules[module_group] = { + module: modules[(module_group,module)] + } + else: + __modules[module] = modules[module] + + _modules = __modules.keys() + _modules.sort() + + for _module in _modules: + if __modules[_module].has_key('function'): + # This is a top-level module + if not __modules[_module]['description'] == None: + print "%-25s - %s" %(_module.replace('_','-'),__modules[_module]['description']) + else: + print "%-25s" %(_module.replace('_','-')) + + for _module in _modules: + if not __modules[_module].has_key('function'): + # This is a nested module + print "\n" + _("Module Group: %s") %(_module) + "\n" + ___modules = __modules[_module].keys() + ___modules.sort() + for __module in ___modules: + if not __modules[_module][__module]['description'] == None: + print "%-4s%-21s - %s" %('',__module.replace('_','-'),__modules[_module][__module]['description']) + else: + print "%-4s%-21s" %('',__module.replace('_','-')) + +def execute(name, *args, **kw): + if not modules.has_key(name): + log.error(_("No such module.")) + sys.exit(1) + + if not modules[name].has_key('function') and \ + not modules[name].has_key('group'): + log.error(_("No such module.")) + sys.exit(1) + + modules[name]['function'](*args, **kw) + +def cb_action_HOLD(module, filepath): + log.info(_("Holding message in queue for manual review (%s by %s)") %(filepath, module)) + ## Actually just unlink the file for now + #os.unlink(filepath) + +def cb_action_DEFER(module, filepath): + log.info(_("Deferring message in %s (by module %s)") %(filepath, module)) + +def cb_action_REJECT(module, filepath): + log.info(_("Rejecting message in %s (by module %s)") %(filepath, module)) + # Send NDR, unlink file + os.unlink(filepath) + +def cb_action_ACCEPT(module, filepath): + log.info(_("Accepting message in %s (by module %s)") %(filepath, module)) + # Deliver for final delivery (use re-injection smtpd), unlink file + os.unlink(filepath) + +def register_group(dirname, module): + modules_base_path = os.path.join(os.path.dirname(__file__), module) + + modules[module] = {} + + for modules_path, dirnames, filenames in os.walk(modules_base_path): + if not modules_path == modules_base_path: + continue + + for filename in filenames: + if filename.startswith('module_') and filename.endswith('.py'): + module_name = filename.replace('.py','') + name = module_name.replace('module_', '') + #print "exec(\"from %s.%s import __init__ as %s_%s_register\"" %(module,module_name,module,name) + exec("from %s.%s import __init__ as %s_%s_register" %(module,module_name,module,name)) + exec("%s_%s_register()" %(module,name)) + +def register(name, func, group=None, description=None, aliases=[]): + if not group == None: + module = "%s_%s" %(group,name) + else: + module = name + + if isinstance(aliases, basestring): + aliases = [aliases] + + if modules.has_key(module): + log.fatal(_("Module '%s' already registered") %(module)) + sys.exit(1) + + if callable(func): + if group == None: + modules[name] = { + 'function': func, + 'description': description + } + else: + modules[group][name] = { + 'function': func, + 'description': description + } + + modules[module] = modules[group][name] + modules[module]['group'] = group + modules[module]['name'] = name + + for alias in aliases: + modules[alias] = { + 'function': func, + 'description': _("Alias for %s") %(name) + } + |