diff options
-rw-r--r-- | wallace/__init__.py | 126 | ||||
-rw-r--r-- | wallace/module_optout.py | 32 | ||||
-rw-r--r-- | wallace/modules.py | 204 |
3 files changed, 295 insertions, 67 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py index 5897ac7..86d933f 100644 --- a/wallace/__init__.py +++ b/wallace/__init__.py @@ -87,39 +87,80 @@ class WallaceDaemon(object): os.close(fp) while threading.active_count() > 25: - log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) - time.sleep(10) + log.debug( + _("Number of threads currently running: %d") % ( + threading.active_count() + ), + level=8 + ) - log.debug(_("Continuing with %d threads currently running") %(threading.active_count()), level=8) + time.sleep(1) + + log.debug( + _("Continuing with %d threads currently running") % ( + threading.active_count() + ), + level=8 + ) # TODO: Apply throttling - log.debug(_("Creating thread for message in %s") %(filename), level=8) + log.debug(_("Creating thread for message in %s") % (filename), level=8) thread = threading.Thread(target=self.thread_run, args=[ filename ]) thread.start() def thread_run(self, filename, *args, **kw): while threading.active_count() > 25: - log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) + log.debug( + _("Number of threads currently running: %d") % ( + threading.active_count() + ), + level=8 + ) + time.sleep(10) - log.debug(_("Continuing with %d threads currently running") %(threading.active_count()), level=8) + log.debug( + _("Continuing with %d threads currently running") % ( + threading.active_count() + ), + level=8 + ) - log.debug(_("Running thread %s for message file %s") %(threading.current_thread().name,filename), level=8) + log.debug( + _("Running thread %s for message file %s") % ( + threading.current_thread().name, + filename + ), + level=8 + ) if kw.has_key('module'): - log.debug(_("This message was already in module %s, delegating specifically to that module") %(kw['module']), level=8) + log.debug( + _("This message was already in module %s, delegating " + \ + "specifically to that module") % ( + kw['module'] + ), + level=8 + ) if kw.has_key('stage'): - log.debug(_("It was also in a certain stage: %s, letting module %s know that") %(kw['stage'],kw['module']), level=8) + log.debug( + _("It was also in a certain stage: %s, letting " + \ + "module %s know that") % ( + kw['stage'], + kw['module'] + ), + level=8 + ) - log.debug(_("Executing module %s") %(kw['module']), level=8) + log.debug(_("Executing module %s") % (kw['module']), level=8) modules.execute(kw['module'], filename, stage=kw['stage']) return - log.debug(_("Executing module %s") %(kw['module']), level=8) + log.debug(_("Executing module %s") % (kw['module']), level=8) modules.execute(kw['module'], filename, stage=kw['stage']) return @@ -129,7 +170,7 @@ class WallaceDaemon(object): wallace_modules = [] for module in wallace_modules: - log.debug(_("Executing module %s") %(module), level=8) + log.debug(_("Executing module %s") % (module), level=8) modules.execute(module, filename) def run(self): @@ -158,7 +199,9 @@ class WallaceDaemon(object): except AttributeError, e: exitcode = 1 traceback.print_exc() - print >> sys.stderr, _("Traceback occurred, please report a bug at http://bugzilla.kolabsys.com") + print >> sys.stderr, _("Traceback occurred, please report a " + \ + "bug at http://bugzilla.kolabsys.com") + except TypeError, e: exitcode = 1 traceback.print_exc() @@ -166,7 +209,9 @@ class WallaceDaemon(object): except: exitcode = 2 traceback.print_exc() - print >> sys.stderr, _("Traceback occurred, please report a bug at http://bugzilla.kolabsys.com") + print >> sys.stderr, _("Traceback occurred, please report a " + \ + "bug at http://bugzilla.kolabsys.com") + sys.exit(exitcode) def pickup_defer(self): @@ -199,7 +244,12 @@ class WallaceDaemon(object): time.sleep(1) for module in wallace_modules: - log.debug(_("Picking up deferred messages for module %s") %(module), level=8) + log.debug( + _("Picking up deferred messages for module %s") % ( + module + ), + level=8 + ) module_defer_path = os.path.join(base_path, module, 'DEFER') @@ -222,8 +272,8 @@ class WallaceDaemon(object): log.debug(_("Sleeping for 1 second"), level=8) time.sleep(1) else: - log.debug(_("Sleeping for 10 seconds"), level=8) - time.sleep(10) + log.debug(_("Sleeping for 1800 seconds"), level=8) + time.sleep(1800) def do_wallace(self): @@ -249,7 +299,7 @@ class WallaceDaemon(object): except Exception, e: log.warning( _("Could not bind to socket on port %d on bind " + \ - "address %s") %( + "address %s") % ( conf.wallace_port, conf.wallace_bind_address ) @@ -280,8 +330,8 @@ class WallaceDaemon(object): if not root == pickup_path: module = os.path.dirname(root).replace(pickup_path, '') - # Compare uppercase status (specifically, DEFER) with lowercase - # (plugin names). + # Compare uppercase status (specifically, DEFER) with + # lowercase (plugin names). # # The messages in DEFER are supposed to be picked up by # another thread, whereas the messages in other directories @@ -309,13 +359,19 @@ class WallaceDaemon(object): if stage.lower() == "defer": continue - log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) + log.debug( + _("Number of threads currently running: %d") % ( + threading.active_count() + ), + level=8 + ) + thread = threading.Thread( target = self.thread_run, args = [ filepath ], kwargs = { - "module": '%s' %(module), - "stage": '%s' %(stage) + "module": '%s' % (module), + "stage": '%s' % (stage) } ) @@ -324,9 +380,25 @@ class WallaceDaemon(object): continue - log.debug(_("Picking up spooled email file %s") %(filepath), level=8) - log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8) - thread = threading.Thread(target=self.thread_run, args=[ filepath ]) + log.debug( + _("Picking up spooled email file %s") % ( + filepath + ), + level=8 + ) + + log.debug( + _("Number of threads currently running: %d") % ( + threading.active_count() + ), + level=8 + ) + + thread = threading.Thread( + target=self.thread_run, + args=[ filepath ] + ) + thread.start() time.sleep(0.5) @@ -342,7 +414,7 @@ class WallaceDaemon(object): log.info(_("Accepted connection")) if not pair == None: connection, address = pair - #print "Accepted connection from %r" %(address) + #print "Accepted connection from %r" % (address) channel = SMTPChannel(self, connection, address) asyncore.loop() except Exception, e: diff --git a/wallace/module_optout.py b/wallace/module_optout.py index 3e2fd09..79cafb7 100644 --- a/wallace/module_optout.py +++ b/wallace/module_optout.py @@ -49,16 +49,16 @@ def execute(*args, **kw): filepath = args[0] if kw.has_key('stage'): - log.debug(_("Issuing callback after processing to stage %s") %(kw['stage']), level=8) - log.debug(_("Testing cb_action_%s()") %(kw['stage']), level=8) - if hasattr(modules, 'cb_action_%s' %(kw['stage'])): - log.debug(_("Attempting to execute cb_action_%s()") %(kw['stage']), level=8) - exec('modules.cb_action_%s(%r, %r)' %(kw['stage'],'optout',filepath)) + log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8) + log.debug(_("Testing cb_action_%s()") % (kw['stage']), level=8) + if hasattr(modules, 'cb_action_%s' % (kw['stage'])): + log.debug(_("Attempting to execute cb_action_%s()") % (kw['stage']), level=8) + exec('modules.cb_action_%s(%r, %r)' % (kw['stage'],'optout',filepath)) return #modules.next_module('optout') - log.debug(_("Consulting opt-out service for %r, %r") %(args, kw), level=8) + log.debug(_("Consulting opt-out service for %r, %r") % (args, kw), level=8) import email message = email.message_from_file(open(filepath, 'r')) @@ -86,7 +86,7 @@ def execute(*args, **kw): for recipient in recipients[recipient_type]: log.debug( _("Running opt-out consult from envelope sender '%s " + \ - "<%s>' to recipient %s <%s>") %( + "<%s>' to recipient %s <%s>") % ( envelope_sender[0][0], envelope_sender[0][1], recipient[0], @@ -143,23 +143,23 @@ def execute(*args, **kw): if use_this: # TODO: Do not set items with an empty list. - (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/optout/%s" %(answer)) + (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/optout/%s" % (answer)) os.write(fp, _message.__str__()) os.close(fp) # Callback with new filename - if hasattr(modules, 'cb_action_%s' %(answer)): - log.debug(_("Attempting to execute cb_action_%s(%r, %r)") %(answer, 'optout', filename), level=8) - exec('modules.cb_action_%s(%r, %r)' %(answer,'optout', filename)) + if hasattr(modules, 'cb_action_%s' % (answer)): + log.debug(_("Attempting to execute cb_action_%s(%r, %r)") % (answer, 'optout', filename), level=8) + exec('modules.cb_action_%s(%r, %r)' % (answer,'optout', filename)) os.unlink(filepath) - #print "Moving filepath %s to new_filepath %s" %(filepath, new_filepath) + #print "Moving filepath %s to new_filepath %s" % (filepath, new_filepath) #os.rename(filepath, new_filepath) - #if hasattr(modules, 'cb_action_%s' %(optout_answer)): - #log.debug(_("Attempting to execute cb_action_%s()") %(optout_answer), level=8) - #exec('modules.cb_action_%s(%r, %r)' %(optout_answer,'optout', new_filepath)) + #if hasattr(modules, 'cb_action_%s' % (optout_answer)): + #log.debug(_("Attempting to execute cb_action_%s()") % (optout_answer), level=8) + #exec('modules.cb_action_%s(%r, %r)' % (optout_answer,'optout', new_filepath)) #return def request(params=None): @@ -170,7 +170,7 @@ def request(params=None): try: f = urllib.urlopen(optout_url, params) except Exception, e: - log.error(_("Could not send request to optout_url %s") %(optout_url)) + log.error(_("Could not send request to optout_url %s") % (optout_url)) return "DEFER" response = f.read() diff --git a/wallace/modules.py b/wallace/modules.py index 374f440..464cf8e 100644 --- a/wallace/modules.py +++ b/wallace/modules.py @@ -19,8 +19,20 @@ import os import sys +import time + +import email +from email.mime.base import MIMEBase +from email.mime.message import MIMEMessage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.utils import COMMASPACE +from email.utils import formatdate + +import smtplib import pykolab +from pykolab import constants from pykolab.translate import _ log = pykolab.getLogger('pykolab.wallace') @@ -40,9 +52,9 @@ def __init__(): if filename.startswith('module_') and filename.endswith('.py'): module_name = filename.replace('.py','') name = module_name.replace('module_', '') - #print "exec(\"from %s import __init__ as %s_register\"" %(module_name,name) - exec("from %s import __init__ as %s_register" %(module_name,name)) - exec("%s_register()" %(name)) + #print "exec(\"from %s import __init__ as %s_register\"" % (module_name,name) + exec("from %s import __init__ as %s_register" % (module_name, name)) + exec("%s_register()" % (name)) for dirname in dirnames: register_group(modules_path, dirname) @@ -70,21 +82,21 @@ def list_modules(*args, **kw): if __modules[_module].has_key('function'): # This is a top-level module if not __modules[_module]['description'] == None: - print "%-25s - %s" %(_module.replace('_','-'),__modules[_module]['description']) + print "%-25s - %s" % (_module.replace('_','-'),__modules[_module]['description']) else: - print "%-25s" %(_module.replace('_','-')) + print "%-25s" % (_module.replace('_','-')) for _module in _modules: if not __modules[_module].has_key('function'): # This is a nested module - print "\n" + _("Module Group: %s") %(_module) + "\n" + print "\n" + _("Module Group: %s") % (_module) + "\n" ___modules = __modules[_module].keys() ___modules.sort() for __module in ___modules: if not __modules[_module][__module]['description'] == None: - print "%-4s%-21s - %s" %('',__module.replace('_','-'),__modules[_module][__module]['description']) + print "%-4s%-21s - %s" % ('',__module.replace('_','-'),__modules[_module][__module]['description']) else: - print "%-4s%-21s" %('',__module.replace('_','-')) + print "%-4s%-21s" % ('',__module.replace('_','-')) def execute(name, *args, **kw): if not modules.has_key(name): @@ -99,22 +111,166 @@ def execute(name, *args, **kw): modules[name]['function'](*args, **kw) def cb_action_HOLD(module, filepath): - log.info(_("Holding message in queue for manual review (%s by %s)") %(filepath, module)) - ## Actually just unlink the file for now - #os.unlink(filepath) + log.info(_("Holding message in queue for manual review (%s by %s)") % (filepath, module)) def cb_action_DEFER(module, filepath): - log.info(_("Deferring message in %s (by module %s)") %(filepath, module)) + log.info(_("Deferring message in %s (by module %s)") % (filepath, module)) + message = email.message_from_file(open(filepath, 'r')) + + internal_time = email.Utils.parsedate_tz(message.__getitem__('Date')) + internal_time = time.mktime(internal_time[:9]) + internal_time[9] + + now_time = time.time() + + delta = now_time - internal_time + + log.debug(_("The time when the message was sent: %r") % (internal_time), level=8) + log.debug(_("The time now: %r") % (now_time), level=8) + log.debug(_("The time delta: %r") % (delta), level=8) + + if delta > 432000: + # TODO: Send NDR back to user + log.debug(_("Message in file %s older then 5 days, deleting") % (filepath), level=8) + os.unlink(filepath) + + # Alternative method is file age. + #Date sent(/var/spool/pykolab/wallace/optout/DEFER/tmpIv7pDl): 'Thu, 08 Mar 2012 11:51:03 +0000' + #(2012, 3, 8, 11, 51, 3, 0, 1, -1) + # YYYY M D H m s weekday, yearday + + #log.debug(datetime.datetime(*), level=8) + + #import os + #stat = os.stat(filepath) + + #fileage = datetime.datetime.fromtimestamp(stat.st_mtime) + #now = datetime.datetime.now() + #delta = now - fileage + + #print "file:", filepath, "fileage:", fileage, "now:", now, "delta(seconds):", delta.seconds + + #if delta.seconds > 1800: + ## TODO: Send NDR back to user + #log.debug(_("Message in file %s older then 1800 seconds, deleting") % (filepath), level=8) + #os.unlink(filepath) def cb_action_REJECT(module, filepath): - log.info(_("Rejecting message in %s (by module %s)") %(filepath, module)) - # Send NDR, unlink file - os.unlink(filepath) + log.info(_("Rejecting message in %s (by module %s)") % (filepath, module)) + + message = email.message_from_file(open(filepath, 'r')) + envelope_sender = email.utils.getaddresses(message.get_all('From', [])) + + recipients = email.utils.getaddresses(message.get_all('To', [])) + \ + email.utils.getaddresses(message.get_all('Cc', [])) + + _recipients = [] + + for recipient in recipients: + if not recipient[0] == '': + _recipients.append('%s <%s>' % (recipient[0], recipient[1])) + else: + _recipients.append('%s' % (recipient[1])) + + # TODO: Find the preferredLanguage for the envelope_sender user. + ndr_message_subject = "Undelivered Mail Returned to Sender" + ndr_message_text = _("""This is the email system Wallace at %s. + +I'm sorry to inform you we could not deliver the attached message +to the following recipients: + +- %s + +Your message is being delivered to any other recipients you may have +sent your message to. There is no need to resend the message to those +recipients. +""") % ( + constants.fqdn, + "\n- ".join(_recipients) + ) + + diagnostics = _("""X-Wallace-Module: %s +X-Wallace-Result: REJECT +""") % ( + module + ) + + msg = MIMEMultipart("report") + msg['From'] = "MAILER-DAEMON@%s" % (constants.fqdn) + msg['To'] = email.utils.formataddr(envelope_sender) + msg['Date'] = formatdate(localtime=True) + msg['Subject'] = ndr_message_subject + + msg.preamble = "This is a MIME-encapsulated message." + + part = MIMEText(ndr_message_text) + part.add_header("Content-Description", "Notification") + msg.attach(part) + + _diag_message = email.message.Message() + _diag_message.set_payload(diagnostics) + part = MIMEMessage(_diag_message, "delivery-status") + part.add_header("Content-Description", "Delivery Report") + msg.attach(part) + + part = MIMEMessage(message) + part.add_header("Content-Description", "Undelivered Message") + msg.attach(part) + + smtp = smtplib.SMTP("localhost", 10027) + + try: + smtp.sendmail( + "MAILER-DAEMON@%s" % (constants.fqdn), + [email.utils.formataddr(envelope_sender)], + msg.as_string() + ) + except smtplib.SMTPDataError, errmsg: + # DEFER + pass + except smtplib.SMTPHeloError, errmsg: + # DEFER + pass + except smtplib.SMTPRecipientsRefused, errmsg: + # REJECT, send NDR + pass + except smtplib.SMTPSenderRefused, errmsg: + # REJECT, send NDR + pass + finally: + os.unlink(filepath) def cb_action_ACCEPT(module, filepath): - log.info(_("Accepting message in %s (by module %s)") %(filepath, module)) - # Deliver for final delivery (use re-injection smtpd), unlink file - os.unlink(filepath) + log.info(_("Accepting message in %s (by module %s)") % (filepath, module)) + message = email.message_from_file(open(filepath, 'r')) + envelope_sender = email.utils.getaddresses(message.get_all('From', [])) + + recipients = email.utils.getaddresses(message.get_all('To', [])) + \ + email.utils.getaddresses(message.get_all('Cc', [])) + + smtp = smtplib.SMTP("localhost", 10027) + + try: + smtp.sendmail( + email.utils.formataddr(envelope_sender), + COMMASPACE.join( + [email.utils.formataddr(recipient) for recipient in recipients] + ), + message.as_string() + ) + except smtplib.SMTPDataError, errmsg: + # DEFER + pass + except smtplib.SMTPHeloError, errmsg: + # DEFER + pass + except smtplib.SMTPRecipientsRefused, errmsg: + # DEFER + pass + except smtplib.SMTPSenderRefused, errmsg: + # DEFER + pass + finally: + os.unlink(filepath) def register_group(dirname, module): modules_base_path = os.path.join(os.path.dirname(__file__), module) @@ -129,13 +285,13 @@ def register_group(dirname, module): if filename.startswith('module_') and filename.endswith('.py'): module_name = filename.replace('.py','') name = module_name.replace('module_', '') - #print "exec(\"from %s.%s import __init__ as %s_%s_register\"" %(module,module_name,module,name) - exec("from %s.%s import __init__ as %s_%s_register" %(module,module_name,module,name)) - exec("%s_%s_register()" %(module,name)) + #print "exec(\"from %s.%s import __init__ as %s_%s_register\"" % (module,module_name,module,name) + exec("from %s.%s import __init__ as %s_%s_register" % (module,module_name,module,name)) + exec("%s_%s_register()" % (module,name)) def register(name, func, group=None, description=None, aliases=[]): if not group == None: - module = "%s_%s" %(group,name) + module = "%s_%s" % (group,name) else: module = name @@ -143,7 +299,7 @@ def register(name, func, group=None, description=None, aliases=[]): aliases = [aliases] if modules.has_key(module): - log.fatal(_("Module '%s' already registered") %(module)) + log.fatal(_("Module '%s' already registered") % (module)) sys.exit(1) if callable(func): @@ -165,6 +321,6 @@ def register(name, func, group=None, description=None, aliases=[]): for alias in aliases: modules[alias] = { 'function': func, - 'description': _("Alias for %s") %(name) + 'description': _("Alias for %s") % (name) } |