diff options
author | Thomas Bruederli <bruederli@kolabsys.com> | 2014-07-09 16:52:32 -0400 |
---|---|---|
committer | Thomas Bruederli <bruederli@kolabsys.com> | 2014-07-09 16:52:32 -0400 |
commit | 3e492389ad7f2d628134b3471bc2e4e054b7d204 (patch) | |
tree | d2cc561958b9455f4ee54ac62dcae10e6162ce07 | |
parent | 7977586fcff0eb783ff4f17c2aa0bd2a1de262aa (diff) | |
download | pykolab-3e492389ad7f2d628134b3471bc2e4e054b7d204.tar.gz |
Send iTip replies through wallace again; use a locking mechanism to sequencially process partstat updates from (automated) replies
-rw-r--r-- | pykolab/itip/__init__.py | 2 | ||||
-rw-r--r-- | tests/functional/test_wallace/test_007_invitationpolicy.py | 8 | ||||
-rw-r--r-- | tests/unit/test-012-wallace_invitationpolicy.py | 28 | ||||
-rw-r--r-- | wallace/module_invitationpolicy.py | 104 |
4 files changed, 118 insertions, 24 deletions
diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py index 43646df..8cf6435 100644 --- a/pykolab/itip/__init__.py +++ b/pykolab/itip/__init__.py @@ -224,7 +224,7 @@ def send_reply(from_address, itip_events, response_text, subject=None): log.error(_("Failed to compose iTip reply message: %r") % (e)) return - smtp = smtplib.SMTP("localhost", 10027) + smtp = smtplib.SMTP("localhost", 10026) # replies go through wallace again if conf.debuglevel > 8: smtp.set_debuglevel(True) diff --git a/tests/functional/test_wallace/test_007_invitationpolicy.py b/tests/functional/test_wallace/test_007_invitationpolicy.py index 2b669ff..b12b785 100644 --- a/tests/functional/test_wallace/test_007_invitationpolicy.py +++ b/tests/functional/test_wallace/test_007_invitationpolicy.py @@ -623,13 +623,7 @@ class TestWallaceInvitationpolicy(unittest.TestCase): self.send_itip_invitation(self.jane['mail'], start, template=event_itip) self.send_itip_invitation(self.jack['mail'], start, template=event_itip) - # send replies from jack and jane - # FIXME: replies should not be necessary if auto-replies get through wallace as well - self.send_itip_reply(uid, self.jane['mail'], self.john['mail'], start=start, partstat='ACCEPTED') - time.sleep(10) # FIXME: implement locking in wallace - self.send_itip_reply(uid, self.jack['mail'], self.john['mail'], start=start, partstat='TENTATIVE') - - # wait for replies to be processed and propagated + # wait for replies from jack and jane to be processed and propagated time.sleep(10) event = self.check_user_calendar_event(self.john['kolabtargetfolder'], uid) self.assertIsInstance(event, pykolab.xml.Event) diff --git a/tests/unit/test-012-wallace_invitationpolicy.py b/tests/unit/test-012-wallace_invitationpolicy.py index 650879b..0b64f6a 100644 --- a/tests/unit/test-012-wallace_invitationpolicy.py +++ b/tests/unit/test-012-wallace_invitationpolicy.py @@ -1,6 +1,7 @@ +import os import pykolab import logging -import datetime +import time from icalendar import Calendar from email import message @@ -72,11 +73,10 @@ class TestWallaceInvitationpolicy(unittest.TestCase): def setUp(self): # monkey-patch the pykolab.auth module to check API calls # without actually connecting to LDAP - #self.patch(pykolab.auth.Auth, "connect", self._mock_nop) - #self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop) - #self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn) - #self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes) - #self.patch(pykolab.auth.Auth, "search_entry_by_attribute", self._mock_search_entry_by_attribute) + self.patch(pykolab.auth.Auth, "connect", self._mock_nop) + self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop) + self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn) + self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes) # intercept calls to smtplib.SMTP.sendmail() import smtplib @@ -127,3 +127,19 @@ class TestWallaceInvitationpolicy(unittest.TestCase): user = { 'kolabinvitationpolicy': ['ACT_ACCEPT:example.org', 'ACT_MANUAL:others'] } self.assertEqual(MIP.get_matching_invitation_policies(user, 'somedomain.net'), [MIP.ACT_MANUAL]) + + def test_004_write_locks(self): + user = { 'cn': 'John Doe', 'mail': "doe@example.org" } + + lock_key = MIP.get_lock_key(user, '1234567890-abcdef') + lock_file = os.path.join(MIP.mybasepath, 'locks', lock_key + '.lock') + MIP.set_write_lock(lock_key) + + time.sleep(1) + self.assertTrue(os.path.isfile(lock_file)) + self.assertFalse(MIP.set_write_lock(lock_key, False)) + + MIP.remove_write_lock(lock_key) + self.assertFalse(os.path.isfile(lock_file)) + +
\ No newline at end of file diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py index ec3ad44..b7f59de 100644 --- a/wallace/module_invitationpolicy.py +++ b/wallace/module_invitationpolicy.py @@ -23,6 +23,7 @@ import tempfile import time from urlparse import urlparse import urllib +import md5 from email import message_from_string from email.parser import Parser @@ -92,6 +93,7 @@ mybasepath = '/var/spool/pykolab/wallace/invitationpolicy/' auth = None imap = None +write_locks = [] def __init__(): modules.register('invitationpolicy', execute, description=description()) @@ -123,7 +125,7 @@ def description(): return """Invitation policy execution module.""" def cleanup(): - global auth, imap + global auth, imap, write_locks log.debug("cleanup(): %r, %r" % (auth, imap), level=9) @@ -134,13 +136,17 @@ def cleanup(): imap.disconnect() del imap + # remove remaining write locks + for key in write_locks: + remove_write_lock(key, False) + def execute(*args, **kw): global auth, imap if not os.path.isdir(mybasepath): os.makedirs(mybasepath) - for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER' ]: + for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER', 'locks']: if not os.path.isdir(os.path.join(mybasepath, stage)): os.makedirs(os.path.join(mybasepath, stage)) @@ -149,9 +155,14 @@ def execute(*args, **kw): auth = Auth() imap = IMAP() - # TODO: Test for correct call. filepath = args[0] + # ignore calls on lock files + if '/locks/' in filepath or kw.has_key('stage') and kw['stage'] == 'locks': + return False + + log.debug("Invitation policy executing for %r, %r" % (filepath, '/locks/' in filepath), level=8) + if kw.has_key('stage'): log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8) @@ -276,6 +287,9 @@ def execute(*args, **kw): if done is not None: break + # remove possible write lock from this iteration + remove_write_lock(get_lock_key(receiving_user, itip_event['uid'])) + else: log.debug(_("Ignoring '%s' iTip method") % (itip_event['method']), level=8) @@ -316,7 +330,7 @@ def process_itip_request(itip_event, policy, recipient_email, sender_email, rece condition_fulfilled = True # find existing event in user's calendar - existing = find_existing_event(itip_event['uid'], receiving_user) + existing = find_existing_event(itip_event['uid'], receiving_user, True) # compare sequence number to determine a (re-)scheduling request if existing is not None: @@ -407,7 +421,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv # find existing event in user's calendar # TODO: set/check lock to avoid concurrent wallace processes trying to update the same event simultaneously - existing = find_existing_event(itip_event['uid'], receiving_user) + existing = find_existing_event(itip_event['uid'], receiving_user, True) if existing: # compare sequence number to avoid outdated replies? @@ -415,6 +429,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv log.info(_("The iTip reply sequence (%r) doesn't match the referred event version (%r). Forwarding to Inbox.") % ( itip_event['sequence'], existing.get_sequence() )) + remove_write_lock(existing._lock_key) return MESSAGE_FORWARD log.debug(_("Auto-updating event %r on iTip REPLY") % (existing.uid), level=8) @@ -424,6 +439,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv log.error("Could not find corresponding attende in organizer's event: %r" % (e)) # TODO: accept new participant if ACT_ACCEPT ? + remove_write_lock(existing._lock_key) return MESSAGE_FORWARD # update the organizer's copy of the event @@ -457,7 +473,7 @@ def process_itip_cancel(itip_event, policy, recipient_email, sender_email, recei # auto-update the local copy with STATUS=CANCELLED if policy & ACT_UPDATE: # find existing event in user's calendar - existing = find_existing_event(itip_event['uid'], receiving_user) + existing = find_existing_event(itip_event['uid'], receiving_user, True) if existing: existing.set_status('CANCELLED') @@ -615,12 +631,18 @@ def list_user_calendars(user_rec): return calendars -def find_existing_event(uid, user_rec): +def find_existing_event(uid, user_rec, lock=False): """ Search user's calendar folders for the given event (by UID) """ global imap + lock_key = None + + if lock: + lock_key = get_lock_key(user_rec, uid) + set_write_lock(lock_key) + event = None for folder in list_user_calendars(user_rec): log.debug(_("Searching folder %r for event %r") % (folder, uid), level=8) @@ -633,6 +655,7 @@ def find_existing_event(uid, user_rec): try: event = event_from_message(message_from_string(data[0][1])) setattr(event, '_imap_folder', folder) + setattr(event, '_lock_key', lock_key) except Exception, e: log.error(_("Failed to parse event from message %s/%s: %r") % (folder, num, e)) continue @@ -640,6 +663,9 @@ def find_existing_event(uid, user_rec): if event and event.uid == uid: return event + if lock_key is not None: + remove_write_lock(lock_key) + return event @@ -691,15 +717,73 @@ def check_availability(itip_event, receiving_user): return not conflict +def set_write_lock(key, wait=True): + """ + Set a write-lock for the given key and wait if such a lock already exists + """ + if not os.path.isdir(mybasepath): + os.makedirs(mybasepath) + if not os.path.isdir(os.path.join(mybasepath, 'locks')): + os.makedirs(os.path.join(mybasepath, 'locks')) + + file = os.path.join(mybasepath, 'locks', key + '.lock') + locked = os.path.getmtime(file) if os.path.isfile(file) else 0 + expired = time.time() - 300 + + # wait if file lock is in place + while locked and locked > expired: + if not wait: + return False + + log.debug(_("%r is locked, waiting...") % (key), level=9) + time.sleep(0.5) + locked = os.path.getmtime(file) if os.path.isfile(file) else 0 + + # touch the file + if os.path.isfile(file): + os.utime(file, None) + else: + open(file, 'w').close() + + # register active lock + write_locks.append(key) + + return True + + +def remove_write_lock(key, update=True): + """ + Remove the lock file for the given key + """ + global write_locks + + if key is not None: + file = os.path.join(mybasepath, 'locks', key + '.lock') + if os.path.isfile(file): + os.remove(file) + if update: + write_locks = [k for k in write_locks if not k == key] + + +def get_lock_key(user, uid): + return md5.new("%s/%s" % (user['mail'], uid)).hexdigest() + + def update_event(event, user_rec): """ Update the given event in IMAP (i.e. delete + append) """ + success = False + if hasattr(event, '_imap_folder'): delete_event(event) - return store_event(event, user_rec, event._imap_folder) + success = store_event(event, user_rec, event._imap_folder) - return False + # remove write lock for this event + if hasattr(event, '_lock_key') and event._lock_key is not None: + remove_write_lock(event._lock_key) + + return success def store_event(event, user_rec, targetfolder=None): @@ -829,7 +913,7 @@ def propagate_changes_to_attendees_calendars(event): log.debug(_("Update attendee copy of %r") % (attendee_user_dn), level=9) attendee_user = auth.get_entry_attributes(None, attendee_user_dn, ['*']) - attendee_event = find_existing_event(event.uid, attendee_user) # does IMAP authenticate + attendee_event = find_existing_event(event.uid, attendee_user, True) # does IMAP authenticate if attendee_event: attendee_event.event.setAttendees(event.get_attendees()) success = update_event(attendee_event, attendee_user) |