summaryrefslogtreecommitdiffstats
path: root/wallace
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2012-03-09 10:38:13 +0000
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2012-03-09 10:38:13 +0000
commit107f91157704dbc5b6b7054cf420da5ef651027f (patch)
tree70be96f61a844b145d92ffbad1b58519cf768b35 /wallace
parent61714a5e0d99ba1bc08eeac7f0ead106d72abae3 (diff)
downloadpykolab-107f91157704dbc5b6b7054cf420da5ef651027f.tar.gz
Correct some pylint conventions
Diffstat (limited to 'wallace')
-rw-r--r--wallace/__init__.py126
-rw-r--r--wallace/module_optout.py32
-rw-r--r--wallace/modules.py204
3 files changed, 295 insertions, 67 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py
index 5897ac7..86d933f 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -87,39 +87,80 @@ class WallaceDaemon(object):
os.close(fp)
while threading.active_count() > 25:
- log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8)
- time.sleep(10)
+ log.debug(
+ _("Number of threads currently running: %d") % (
+ threading.active_count()
+ ),
+ level=8
+ )
- log.debug(_("Continuing with %d threads currently running") %(threading.active_count()), level=8)
+ time.sleep(1)
+
+ log.debug(
+ _("Continuing with %d threads currently running") % (
+ threading.active_count()
+ ),
+ level=8
+ )
# TODO: Apply throttling
- log.debug(_("Creating thread for message in %s") %(filename), level=8)
+ log.debug(_("Creating thread for message in %s") % (filename), level=8)
thread = threading.Thread(target=self.thread_run, args=[ filename ])
thread.start()
def thread_run(self, filename, *args, **kw):
while threading.active_count() > 25:
- log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8)
+ log.debug(
+ _("Number of threads currently running: %d") % (
+ threading.active_count()
+ ),
+ level=8
+ )
+
time.sleep(10)
- log.debug(_("Continuing with %d threads currently running") %(threading.active_count()), level=8)
+ log.debug(
+ _("Continuing with %d threads currently running") % (
+ threading.active_count()
+ ),
+ level=8
+ )
- log.debug(_("Running thread %s for message file %s") %(threading.current_thread().name,filename), level=8)
+ log.debug(
+ _("Running thread %s for message file %s") % (
+ threading.current_thread().name,
+ filename
+ ),
+ level=8
+ )
if kw.has_key('module'):
- log.debug(_("This message was already in module %s, delegating specifically to that module") %(kw['module']), level=8)
+ log.debug(
+ _("This message was already in module %s, delegating " + \
+ "specifically to that module") % (
+ kw['module']
+ ),
+ level=8
+ )
if kw.has_key('stage'):
- log.debug(_("It was also in a certain stage: %s, letting module %s know that") %(kw['stage'],kw['module']), level=8)
+ log.debug(
+ _("It was also in a certain stage: %s, letting " + \
+ "module %s know that") % (
+ kw['stage'],
+ kw['module']
+ ),
+ level=8
+ )
- log.debug(_("Executing module %s") %(kw['module']), level=8)
+ log.debug(_("Executing module %s") % (kw['module']), level=8)
modules.execute(kw['module'], filename, stage=kw['stage'])
return
- log.debug(_("Executing module %s") %(kw['module']), level=8)
+ log.debug(_("Executing module %s") % (kw['module']), level=8)
modules.execute(kw['module'], filename, stage=kw['stage'])
return
@@ -129,7 +170,7 @@ class WallaceDaemon(object):
wallace_modules = []
for module in wallace_modules:
- log.debug(_("Executing module %s") %(module), level=8)
+ log.debug(_("Executing module %s") % (module), level=8)
modules.execute(module, filename)
def run(self):
@@ -158,7 +199,9 @@ class WallaceDaemon(object):
except AttributeError, e:
exitcode = 1
traceback.print_exc()
- print >> sys.stderr, _("Traceback occurred, please report a bug at http://bugzilla.kolabsys.com")
+ print >> sys.stderr, _("Traceback occurred, please report a " + \
+ "bug at http://bugzilla.kolabsys.com")
+
except TypeError, e:
exitcode = 1
traceback.print_exc()
@@ -166,7 +209,9 @@ class WallaceDaemon(object):
except:
exitcode = 2
traceback.print_exc()
- print >> sys.stderr, _("Traceback occurred, please report a bug at http://bugzilla.kolabsys.com")
+ print >> sys.stderr, _("Traceback occurred, please report a " + \
+ "bug at http://bugzilla.kolabsys.com")
+
sys.exit(exitcode)
def pickup_defer(self):
@@ -199,7 +244,12 @@ class WallaceDaemon(object):
time.sleep(1)
for module in wallace_modules:
- log.debug(_("Picking up deferred messages for module %s") %(module), level=8)
+ log.debug(
+ _("Picking up deferred messages for module %s") % (
+ module
+ ),
+ level=8
+ )
module_defer_path = os.path.join(base_path, module, 'DEFER')
@@ -222,8 +272,8 @@ class WallaceDaemon(object):
log.debug(_("Sleeping for 1 second"), level=8)
time.sleep(1)
else:
- log.debug(_("Sleeping for 10 seconds"), level=8)
- time.sleep(10)
+ log.debug(_("Sleeping for 1800 seconds"), level=8)
+ time.sleep(1800)
def do_wallace(self):
@@ -249,7 +299,7 @@ class WallaceDaemon(object):
except Exception, e:
log.warning(
_("Could not bind to socket on port %d on bind " + \
- "address %s") %(
+ "address %s") % (
conf.wallace_port,
conf.wallace_bind_address
)
@@ -280,8 +330,8 @@ class WallaceDaemon(object):
if not root == pickup_path:
module = os.path.dirname(root).replace(pickup_path, '')
- # Compare uppercase status (specifically, DEFER) with lowercase
- # (plugin names).
+ # Compare uppercase status (specifically, DEFER) with
+ # lowercase (plugin names).
#
# The messages in DEFER are supposed to be picked up by
# another thread, whereas the messages in other directories
@@ -309,13 +359,19 @@ class WallaceDaemon(object):
if stage.lower() == "defer":
continue
- log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8)
+ log.debug(
+ _("Number of threads currently running: %d") % (
+ threading.active_count()
+ ),
+ level=8
+ )
+
thread = threading.Thread(
target = self.thread_run,
args = [ filepath ],
kwargs = {
- "module": '%s' %(module),
- "stage": '%s' %(stage)
+ "module": '%s' % (module),
+ "stage": '%s' % (stage)
}
)
@@ -324,9 +380,25 @@ class WallaceDaemon(object):
continue
- log.debug(_("Picking up spooled email file %s") %(filepath), level=8)
- log.debug(_("Number of threads currently running: %d") %(threading.active_count()), level=8)
- thread = threading.Thread(target=self.thread_run, args=[ filepath ])
+ log.debug(
+ _("Picking up spooled email file %s") % (
+ filepath
+ ),
+ level=8
+ )
+
+ log.debug(
+ _("Number of threads currently running: %d") % (
+ threading.active_count()
+ ),
+ level=8
+ )
+
+ thread = threading.Thread(
+ target=self.thread_run,
+ args=[ filepath ]
+ )
+
thread.start()
time.sleep(0.5)
@@ -342,7 +414,7 @@ class WallaceDaemon(object):
log.info(_("Accepted connection"))
if not pair == None:
connection, address = pair
- #print "Accepted connection from %r" %(address)
+ #print "Accepted connection from %r" % (address)
channel = SMTPChannel(self, connection, address)
asyncore.loop()
except Exception, e:
diff --git a/wallace/module_optout.py b/wallace/module_optout.py
index 3e2fd09..79cafb7 100644
--- a/wallace/module_optout.py
+++ b/wallace/module_optout.py
@@ -49,16 +49,16 @@ def execute(*args, **kw):
filepath = args[0]
if kw.has_key('stage'):
- log.debug(_("Issuing callback after processing to stage %s") %(kw['stage']), level=8)
- log.debug(_("Testing cb_action_%s()") %(kw['stage']), level=8)
- if hasattr(modules, 'cb_action_%s' %(kw['stage'])):
- log.debug(_("Attempting to execute cb_action_%s()") %(kw['stage']), level=8)
- exec('modules.cb_action_%s(%r, %r)' %(kw['stage'],'optout',filepath))
+ log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8)
+ log.debug(_("Testing cb_action_%s()") % (kw['stage']), level=8)
+ if hasattr(modules, 'cb_action_%s' % (kw['stage'])):
+ log.debug(_("Attempting to execute cb_action_%s()") % (kw['stage']), level=8)
+ exec('modules.cb_action_%s(%r, %r)' % (kw['stage'],'optout',filepath))
return
#modules.next_module('optout')
- log.debug(_("Consulting opt-out service for %r, %r") %(args, kw), level=8)
+ log.debug(_("Consulting opt-out service for %r, %r") % (args, kw), level=8)
import email
message = email.message_from_file(open(filepath, 'r'))
@@ -86,7 +86,7 @@ def execute(*args, **kw):
for recipient in recipients[recipient_type]:
log.debug(
_("Running opt-out consult from envelope sender '%s " + \
- "<%s>' to recipient %s <%s>") %(
+ "<%s>' to recipient %s <%s>") % (
envelope_sender[0][0],
envelope_sender[0][1],
recipient[0],
@@ -143,23 +143,23 @@ def execute(*args, **kw):
if use_this:
# TODO: Do not set items with an empty list.
- (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/optout/%s" %(answer))
+ (fp, filename) = tempfile.mkstemp(dir="/var/spool/pykolab/wallace/optout/%s" % (answer))
os.write(fp, _message.__str__())
os.close(fp)
# Callback with new filename
- if hasattr(modules, 'cb_action_%s' %(answer)):
- log.debug(_("Attempting to execute cb_action_%s(%r, %r)") %(answer, 'optout', filename), level=8)
- exec('modules.cb_action_%s(%r, %r)' %(answer,'optout', filename))
+ if hasattr(modules, 'cb_action_%s' % (answer)):
+ log.debug(_("Attempting to execute cb_action_%s(%r, %r)") % (answer, 'optout', filename), level=8)
+ exec('modules.cb_action_%s(%r, %r)' % (answer,'optout', filename))
os.unlink(filepath)
- #print "Moving filepath %s to new_filepath %s" %(filepath, new_filepath)
+ #print "Moving filepath %s to new_filepath %s" % (filepath, new_filepath)
#os.rename(filepath, new_filepath)
- #if hasattr(modules, 'cb_action_%s' %(optout_answer)):
- #log.debug(_("Attempting to execute cb_action_%s()") %(optout_answer), level=8)
- #exec('modules.cb_action_%s(%r, %r)' %(optout_answer,'optout', new_filepath))
+ #if hasattr(modules, 'cb_action_%s' % (optout_answer)):
+ #log.debug(_("Attempting to execute cb_action_%s()") % (optout_answer), level=8)
+ #exec('modules.cb_action_%s(%r, %r)' % (optout_answer,'optout', new_filepath))
#return
def request(params=None):
@@ -170,7 +170,7 @@ def request(params=None):
try:
f = urllib.urlopen(optout_url, params)
except Exception, e:
- log.error(_("Could not send request to optout_url %s") %(optout_url))
+ log.error(_("Could not send request to optout_url %s") % (optout_url))
return "DEFER"
response = f.read()
diff --git a/wallace/modules.py b/wallace/modules.py
index 374f440..464cf8e 100644
--- a/wallace/modules.py
+++ b/wallace/modules.py
@@ -19,8 +19,20 @@
import os
import sys
+import time
+
+import email
+from email.mime.base import MIMEBase
+from email.mime.message import MIMEMessage
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.utils import COMMASPACE
+from email.utils import formatdate
+
+import smtplib
import pykolab
+from pykolab import constants
from pykolab.translate import _
log = pykolab.getLogger('pykolab.wallace')
@@ -40,9 +52,9 @@ def __init__():
if filename.startswith('module_') and filename.endswith('.py'):
module_name = filename.replace('.py','')
name = module_name.replace('module_', '')
- #print "exec(\"from %s import __init__ as %s_register\"" %(module_name,name)
- exec("from %s import __init__ as %s_register" %(module_name,name))
- exec("%s_register()" %(name))
+ #print "exec(\"from %s import __init__ as %s_register\"" % (module_name,name)
+ exec("from %s import __init__ as %s_register" % (module_name, name))
+ exec("%s_register()" % (name))
for dirname in dirnames:
register_group(modules_path, dirname)
@@ -70,21 +82,21 @@ def list_modules(*args, **kw):
if __modules[_module].has_key('function'):
# This is a top-level module
if not __modules[_module]['description'] == None:
- print "%-25s - %s" %(_module.replace('_','-'),__modules[_module]['description'])
+ print "%-25s - %s" % (_module.replace('_','-'),__modules[_module]['description'])
else:
- print "%-25s" %(_module.replace('_','-'))
+ print "%-25s" % (_module.replace('_','-'))
for _module in _modules:
if not __modules[_module].has_key('function'):
# This is a nested module
- print "\n" + _("Module Group: %s") %(_module) + "\n"
+ print "\n" + _("Module Group: %s") % (_module) + "\n"
___modules = __modules[_module].keys()
___modules.sort()
for __module in ___modules:
if not __modules[_module][__module]['description'] == None:
- print "%-4s%-21s - %s" %('',__module.replace('_','-'),__modules[_module][__module]['description'])
+ print "%-4s%-21s - %s" % ('',__module.replace('_','-'),__modules[_module][__module]['description'])
else:
- print "%-4s%-21s" %('',__module.replace('_','-'))
+ print "%-4s%-21s" % ('',__module.replace('_','-'))
def execute(name, *args, **kw):
if not modules.has_key(name):
@@ -99,22 +111,166 @@ def execute(name, *args, **kw):
modules[name]['function'](*args, **kw)
def cb_action_HOLD(module, filepath):
- log.info(_("Holding message in queue for manual review (%s by %s)") %(filepath, module))
- ## Actually just unlink the file for now
- #os.unlink(filepath)
+ log.info(_("Holding message in queue for manual review (%s by %s)") % (filepath, module))
def cb_action_DEFER(module, filepath):
- log.info(_("Deferring message in %s (by module %s)") %(filepath, module))
+ log.info(_("Deferring message in %s (by module %s)") % (filepath, module))
+ message = email.message_from_file(open(filepath, 'r'))
+
+ internal_time = email.Utils.parsedate_tz(message.__getitem__('Date'))
+ internal_time = time.mktime(internal_time[:9]) + internal_time[9]
+
+ now_time = time.time()
+
+ delta = now_time - internal_time
+
+ log.debug(_("The time when the message was sent: %r") % (internal_time), level=8)
+ log.debug(_("The time now: %r") % (now_time), level=8)
+ log.debug(_("The time delta: %r") % (delta), level=8)
+
+ if delta > 432000:
+ # TODO: Send NDR back to user
+ log.debug(_("Message in file %s older then 5 days, deleting") % (filepath), level=8)
+ os.unlink(filepath)
+
+ # Alternative method is file age.
+ #Date sent(/var/spool/pykolab/wallace/optout/DEFER/tmpIv7pDl): 'Thu, 08 Mar 2012 11:51:03 +0000'
+ #(2012, 3, 8, 11, 51, 3, 0, 1, -1)
+ # YYYY M D H m s weekday, yearday
+
+ #log.debug(datetime.datetime(*), level=8)
+
+ #import os
+ #stat = os.stat(filepath)
+
+ #fileage = datetime.datetime.fromtimestamp(stat.st_mtime)
+ #now = datetime.datetime.now()
+ #delta = now - fileage
+
+ #print "file:", filepath, "fileage:", fileage, "now:", now, "delta(seconds):", delta.seconds
+
+ #if delta.seconds > 1800:
+ ## TODO: Send NDR back to user
+ #log.debug(_("Message in file %s older then 1800 seconds, deleting") % (filepath), level=8)
+ #os.unlink(filepath)
def cb_action_REJECT(module, filepath):
- log.info(_("Rejecting message in %s (by module %s)") %(filepath, module))
- # Send NDR, unlink file
- os.unlink(filepath)
+ log.info(_("Rejecting message in %s (by module %s)") % (filepath, module))
+
+ message = email.message_from_file(open(filepath, 'r'))
+ envelope_sender = email.utils.getaddresses(message.get_all('From', []))
+
+ recipients = email.utils.getaddresses(message.get_all('To', [])) + \
+ email.utils.getaddresses(message.get_all('Cc', []))
+
+ _recipients = []
+
+ for recipient in recipients:
+ if not recipient[0] == '':
+ _recipients.append('%s <%s>' % (recipient[0], recipient[1]))
+ else:
+ _recipients.append('%s' % (recipient[1]))
+
+ # TODO: Find the preferredLanguage for the envelope_sender user.
+ ndr_message_subject = "Undelivered Mail Returned to Sender"
+ ndr_message_text = _("""This is the email system Wallace at %s.
+
+I'm sorry to inform you we could not deliver the attached message
+to the following recipients:
+
+- %s
+
+Your message is being delivered to any other recipients you may have
+sent your message to. There is no need to resend the message to those
+recipients.
+""") % (
+ constants.fqdn,
+ "\n- ".join(_recipients)
+ )
+
+ diagnostics = _("""X-Wallace-Module: %s
+X-Wallace-Result: REJECT
+""") % (
+ module
+ )
+
+ msg = MIMEMultipart("report")
+ msg['From'] = "MAILER-DAEMON@%s" % (constants.fqdn)
+ msg['To'] = email.utils.formataddr(envelope_sender)
+ msg['Date'] = formatdate(localtime=True)
+ msg['Subject'] = ndr_message_subject
+
+ msg.preamble = "This is a MIME-encapsulated message."
+
+ part = MIMEText(ndr_message_text)
+ part.add_header("Content-Description", "Notification")
+ msg.attach(part)
+
+ _diag_message = email.message.Message()
+ _diag_message.set_payload(diagnostics)
+ part = MIMEMessage(_diag_message, "delivery-status")
+ part.add_header("Content-Description", "Delivery Report")
+ msg.attach(part)
+
+ part = MIMEMessage(message)
+ part.add_header("Content-Description", "Undelivered Message")
+ msg.attach(part)
+
+ smtp = smtplib.SMTP("localhost", 10027)
+
+ try:
+ smtp.sendmail(
+ "MAILER-DAEMON@%s" % (constants.fqdn),
+ [email.utils.formataddr(envelope_sender)],
+ msg.as_string()
+ )
+ except smtplib.SMTPDataError, errmsg:
+ # DEFER
+ pass
+ except smtplib.SMTPHeloError, errmsg:
+ # DEFER
+ pass
+ except smtplib.SMTPRecipientsRefused, errmsg:
+ # REJECT, send NDR
+ pass
+ except smtplib.SMTPSenderRefused, errmsg:
+ # REJECT, send NDR
+ pass
+ finally:
+ os.unlink(filepath)
def cb_action_ACCEPT(module, filepath):
- log.info(_("Accepting message in %s (by module %s)") %(filepath, module))
- # Deliver for final delivery (use re-injection smtpd), unlink file
- os.unlink(filepath)
+ log.info(_("Accepting message in %s (by module %s)") % (filepath, module))
+ message = email.message_from_file(open(filepath, 'r'))
+ envelope_sender = email.utils.getaddresses(message.get_all('From', []))
+
+ recipients = email.utils.getaddresses(message.get_all('To', [])) + \
+ email.utils.getaddresses(message.get_all('Cc', []))
+
+ smtp = smtplib.SMTP("localhost", 10027)
+
+ try:
+ smtp.sendmail(
+ email.utils.formataddr(envelope_sender),
+ COMMASPACE.join(
+ [email.utils.formataddr(recipient) for recipient in recipients]
+ ),
+ message.as_string()
+ )
+ except smtplib.SMTPDataError, errmsg:
+ # DEFER
+ pass
+ except smtplib.SMTPHeloError, errmsg:
+ # DEFER
+ pass
+ except smtplib.SMTPRecipientsRefused, errmsg:
+ # DEFER
+ pass
+ except smtplib.SMTPSenderRefused, errmsg:
+ # DEFER
+ pass
+ finally:
+ os.unlink(filepath)
def register_group(dirname, module):
modules_base_path = os.path.join(os.path.dirname(__file__), module)
@@ -129,13 +285,13 @@ def register_group(dirname, module):
if filename.startswith('module_') and filename.endswith('.py'):
module_name = filename.replace('.py','')
name = module_name.replace('module_', '')
- #print "exec(\"from %s.%s import __init__ as %s_%s_register\"" %(module,module_name,module,name)
- exec("from %s.%s import __init__ as %s_%s_register" %(module,module_name,module,name))
- exec("%s_%s_register()" %(module,name))
+ #print "exec(\"from %s.%s import __init__ as %s_%s_register\"" % (module,module_name,module,name)
+ exec("from %s.%s import __init__ as %s_%s_register" % (module,module_name,module,name))
+ exec("%s_%s_register()" % (module,name))
def register(name, func, group=None, description=None, aliases=[]):
if not group == None:
- module = "%s_%s" %(group,name)
+ module = "%s_%s" % (group,name)
else:
module = name
@@ -143,7 +299,7 @@ def register(name, func, group=None, description=None, aliases=[]):
aliases = [aliases]
if modules.has_key(module):
- log.fatal(_("Module '%s' already registered") %(module))
+ log.fatal(_("Module '%s' already registered") % (module))
sys.exit(1)
if callable(func):
@@ -165,6 +321,6 @@ def register(name, func, group=None, description=None, aliases=[]):
for alias in aliases:
modules[alias] = {
'function': func,
- 'description': _("Alias for %s") %(name)
+ 'description': _("Alias for %s") % (name)
}