diff options
author | Thomas Bruederli <bruederli@kolabsys.com> | 2014-10-23 17:20:48 -0400 |
---|---|---|
committer | Thomas Bruederli <bruederli@kolabsys.com> | 2014-10-23 17:20:48 -0400 |
commit | 1ad076886f2ca1f61438005c1547a7229c0ef287 (patch) | |
tree | bc0b17a1aee4d32ea5782b8bede6a159d771b79d | |
parent | b3e71834dfbdbe51c730580aa92dae3ea6797516 (diff) | |
download | pykolab-1ad076886f2ca1f61438005c1547a7229c0ef287.tar.gz |
Run archival jobs in another Wallace child process (#3843)
-rw-r--r-- | wallace/__init__.py | 27 | ||||
-rw-r--r-- | wallace/module_resources.py | 104 | ||||
-rw-r--r-- | wallace/modules.py | 11 |
3 files changed, 139 insertions, 3 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py index 6698666..8d43be2 100644 --- a/wallace/__init__.py +++ b/wallace/__init__.py @@ -87,6 +87,23 @@ def pickup_message(filepath, *args, **kw): if continue_with_accept: cb_action_ACCEPT('wallace', filepath) +def modules_heartbeat(wallace_modules): + lastrun = 0 + + while True: + try: + for module in wallace_modules: + try: + modules.heartbeat(module, lastrun) + except: + log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc()))) + + lastrun = int(time.time()) + time.sleep(60) + except (SystemExit, KeyboardInterrupt), e: + log.info("Terminating heartbeat process") + break + def worker_process(*args, **kw): log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) @@ -255,6 +272,11 @@ class WallaceDaemon(object): self.pool.apply_async(pickup_message, (filepath, (self.modules))) self.current_connections -= 1 + # start background process to run periodic jobs in active modules + self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules]) + self.heartbeat.daemon = True + self.heartbeat.start() + try: while 1: while self.current_connections >= self.max_connections: @@ -273,6 +295,9 @@ class WallaceDaemon(object): s.shutdown(1) s.close() + # shut down hearbeat process + self.heartbeat.terminate() + def data_header(self, mailfrom, rcpttos): COMMASPACE = ', ' return "X-Kolab-From: " + mailfrom + "\r\n" + \ @@ -305,6 +330,8 @@ class WallaceDaemon(object): pass def remove_pid(self, *args, **kw): + if hasattr(self, 'heartbeat'): + self.heartbeat.terminate() if os.access(conf.pidfile, os.R_OK): os.remove(conf.pidfile) raise SystemExit diff --git a/wallace/module_resources.py b/wallace/module_resources.py index 3d16844..9376749 100644 --- a/wallace/module_resources.py +++ b/wallace/module_resources.py @@ -25,6 +25,7 @@ import random import tempfile import time from urlparse import urlparse +from dateutil.tz import tzlocal import base64 import uuid import re @@ -70,7 +71,7 @@ auth = None imap = None def __init__(): - modules.register('resources', execute, description=description()) + modules.register('resources', execute, description=description(), heartbeat=heartbeat) def accept(filepath): new_filepath = os.path.join( @@ -391,6 +392,105 @@ def execute(*args, **kw): os.unlink(filepath) +def heartbeat(lastrun): + global imap + + # run archival job every hour only + now = int(time.time()) + if lastrun == 0 or now - heartbeat._lastrun < 3600: + return + + log.debug(_("module_resources.heartbeat(%d)") % (heartbeat._lastrun), level=8) + + # get a list of resource records from LDAP + auth = Auth() + auth.connect() + + resource_dns = auth.find_resource('*') + + # filter by resource_base_dn + resource_base_dn = conf.get('ldap', 'resource_base_dn', None) + if resource_base_dn is not None: + resource_dns = [dn for dn in resource_dns if resource_base_dn in dn] + + if len(resource_dns) > 0: + imap = IMAP() + imap.connect() + + for resource_dn in resource_dns: + resource_attrs = auth.get_entry_attributes(None, resource_dn, ['kolabtargetfolder']) + if resource_attrs.has_key('kolabtargetfolder'): + try: + expunge_resource_calendar(resource_attrs['kolabtargetfolder']) + except Exception, e: + log.error(_("Expunge resource calendar for %s (%s) failed: %r") % ( + resource_dn, resource_attrs['kolabtargetfolder'], e + )) + + imap.disconnect() + + auth.disconnect() + + heartbeat._lastrun = now + +heartbeat._lastrun = 0 + + +def expunge_resource_calendar(mailbox): + """ + Cleanup routine to remove events older than 100 days from the given resource calendar + """ + global imap + + log.debug( + _("Expunge events in resource folder %r") % (mailbox), + level=8 + ) + + now = datetime.datetime.now(tzlocal()) + expire_date = now - datetime.timedelta(days=100) + + # might raise an exception, let that bubble + targetfolder = imap.folder_quote(mailbox) + imap.set_acl(targetfolder, conf.get(conf.get('kolab', 'imap_backend'), 'admin_login'), "lrswipkxtecda") + imap.imap.m.select(targetfolder) + + typ, data = imap.imap.m.search(None, 'UNDELETED') + + for num in data[0].split(): + log.debug( + _("Fetching message ID %r from folder %r") % (num, mailbox), + level=9 + ) + + typ, data = imap.imap.m.fetch(num, '(RFC822)') + + event_message = message_from_string(data[0][1]) + + try: + event = event_from_message(message_from_string(data[0][1])) + except Exception, e: + log.error(_("Failed to parse event from message %s/%s: %r") % (mailbox, num, e)) + continue + + if event: + dt_end = to_dt(event.get_end()) + + # consider recurring events and get real end date + if event.is_recurring(): + dt_end = event.get_last_occurrence() + if dt_end is None: + # skip if recurring forever + continue + + if dt_end and dt_end < expire_date: + age = now - dt_end + log.debug(_("Flag event %s from message %s/%s as deleted (age = %d days)") % (event.uid, mailbox, num, age.days), level=8) + imap.imap.m.store(num, '+FLAGS', '\\Deleted') + + imap.imap.m.expunge() + + def check_availability(itip_events, resource_dns, resources, receiving_attendee=None): """ For each resource, determine if any of the events in question are in conflict. @@ -540,7 +640,7 @@ def read_resource_calendar(resource_rec, itip_events): # might raise an exception, let that bubble imap.imap.m.select(imap.folder_quote(mailbox)) - typ, data = imap.imap.m.search(None, 'ALL') + typ, data = imap.imap.m.search(None, 'UNDELETED') num_messages = len(data[0].split()) diff --git a/wallace/modules.py b/wallace/modules.py index d4432d0..632a6fc 100644 --- a/wallace/modules.py +++ b/wallace/modules.py @@ -115,6 +115,13 @@ def execute(name, *args, **kw): return modules[name]['function'](*args, **kw) +def heartbeat(name, *args, **kw): + if not modules.has_key(name): + log.warning(_("No such module %r in modules %r (1).") % (name, modules)) + + if modules[name].has_key('heartbeat'): + return modules[name]['heartbeat'](*args, **kw) + def cb_action_HOLD(module, filepath): log.info(_("Holding message in queue for manual review (%s by %s)") % (filepath, module)) @@ -334,7 +341,7 @@ def register_group(dirname, module): exec("%s_%s_register()" % (module,name)) -def register(name, func, group=None, description=None, aliases=[]): +def register(name, func, group=None, description=None, aliases=[], heartbeat=None): if not group == None: module = "%s_%s" % (group,name) else: @@ -369,3 +376,5 @@ def register(name, func, group=None, description=None, aliases=[]): 'description': _("Alias for %s") % (name) } + if callable(heartbeat): + modules[module]['heartbeat'] = heartbeat |