summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Bruederli <bruederli@kolabsys.com>2014-07-09 16:52:32 -0400
committerThomas Bruederli <bruederli@kolabsys.com>2014-07-09 16:52:32 -0400
commit3e492389ad7f2d628134b3471bc2e4e054b7d204 (patch)
treed2cc561958b9455f4ee54ac62dcae10e6162ce07
parent7977586fcff0eb783ff4f17c2aa0bd2a1de262aa (diff)
downloadpykolab-3e492389ad7f2d628134b3471bc2e4e054b7d204.tar.gz
Send iTip replies through wallace again; use a locking mechanism to sequencially process partstat updates from (automated) replies
-rw-r--r--pykolab/itip/__init__.py2
-rw-r--r--tests/functional/test_wallace/test_007_invitationpolicy.py8
-rw-r--r--tests/unit/test-012-wallace_invitationpolicy.py28
-rw-r--r--wallace/module_invitationpolicy.py104
4 files changed, 118 insertions, 24 deletions
diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py
index 43646df..8cf6435 100644
--- a/pykolab/itip/__init__.py
+++ b/pykolab/itip/__init__.py
@@ -224,7 +224,7 @@ def send_reply(from_address, itip_events, response_text, subject=None):
log.error(_("Failed to compose iTip reply message: %r") % (e))
return
- smtp = smtplib.SMTP("localhost", 10027)
+ smtp = smtplib.SMTP("localhost", 10026) # replies go through wallace again
if conf.debuglevel > 8:
smtp.set_debuglevel(True)
diff --git a/tests/functional/test_wallace/test_007_invitationpolicy.py b/tests/functional/test_wallace/test_007_invitationpolicy.py
index 2b669ff..b12b785 100644
--- a/tests/functional/test_wallace/test_007_invitationpolicy.py
+++ b/tests/functional/test_wallace/test_007_invitationpolicy.py
@@ -623,13 +623,7 @@ class TestWallaceInvitationpolicy(unittest.TestCase):
self.send_itip_invitation(self.jane['mail'], start, template=event_itip)
self.send_itip_invitation(self.jack['mail'], start, template=event_itip)
- # send replies from jack and jane
- # FIXME: replies should not be necessary if auto-replies get through wallace as well
- self.send_itip_reply(uid, self.jane['mail'], self.john['mail'], start=start, partstat='ACCEPTED')
- time.sleep(10) # FIXME: implement locking in wallace
- self.send_itip_reply(uid, self.jack['mail'], self.john['mail'], start=start, partstat='TENTATIVE')
-
- # wait for replies to be processed and propagated
+ # wait for replies from jack and jane to be processed and propagated
time.sleep(10)
event = self.check_user_calendar_event(self.john['kolabtargetfolder'], uid)
self.assertIsInstance(event, pykolab.xml.Event)
diff --git a/tests/unit/test-012-wallace_invitationpolicy.py b/tests/unit/test-012-wallace_invitationpolicy.py
index 650879b..0b64f6a 100644
--- a/tests/unit/test-012-wallace_invitationpolicy.py
+++ b/tests/unit/test-012-wallace_invitationpolicy.py
@@ -1,6 +1,7 @@
+import os
import pykolab
import logging
-import datetime
+import time
from icalendar import Calendar
from email import message
@@ -72,11 +73,10 @@ class TestWallaceInvitationpolicy(unittest.TestCase):
def setUp(self):
# monkey-patch the pykolab.auth module to check API calls
# without actually connecting to LDAP
- #self.patch(pykolab.auth.Auth, "connect", self._mock_nop)
- #self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop)
- #self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn)
- #self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes)
- #self.patch(pykolab.auth.Auth, "search_entry_by_attribute", self._mock_search_entry_by_attribute)
+ self.patch(pykolab.auth.Auth, "connect", self._mock_nop)
+ self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop)
+ self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn)
+ self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes)
# intercept calls to smtplib.SMTP.sendmail()
import smtplib
@@ -127,3 +127,19 @@ class TestWallaceInvitationpolicy(unittest.TestCase):
user = { 'kolabinvitationpolicy': ['ACT_ACCEPT:example.org', 'ACT_MANUAL:others'] }
self.assertEqual(MIP.get_matching_invitation_policies(user, 'somedomain.net'), [MIP.ACT_MANUAL])
+
+ def test_004_write_locks(self):
+ user = { 'cn': 'John Doe', 'mail': "doe@example.org" }
+
+ lock_key = MIP.get_lock_key(user, '1234567890-abcdef')
+ lock_file = os.path.join(MIP.mybasepath, 'locks', lock_key + '.lock')
+ MIP.set_write_lock(lock_key)
+
+ time.sleep(1)
+ self.assertTrue(os.path.isfile(lock_file))
+ self.assertFalse(MIP.set_write_lock(lock_key, False))
+
+ MIP.remove_write_lock(lock_key)
+ self.assertFalse(os.path.isfile(lock_file))
+
+ \ No newline at end of file
diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py
index ec3ad44..b7f59de 100644
--- a/wallace/module_invitationpolicy.py
+++ b/wallace/module_invitationpolicy.py
@@ -23,6 +23,7 @@ import tempfile
import time
from urlparse import urlparse
import urllib
+import md5
from email import message_from_string
from email.parser import Parser
@@ -92,6 +93,7 @@ mybasepath = '/var/spool/pykolab/wallace/invitationpolicy/'
auth = None
imap = None
+write_locks = []
def __init__():
modules.register('invitationpolicy', execute, description=description())
@@ -123,7 +125,7 @@ def description():
return """Invitation policy execution module."""
def cleanup():
- global auth, imap
+ global auth, imap, write_locks
log.debug("cleanup(): %r, %r" % (auth, imap), level=9)
@@ -134,13 +136,17 @@ def cleanup():
imap.disconnect()
del imap
+ # remove remaining write locks
+ for key in write_locks:
+ remove_write_lock(key, False)
+
def execute(*args, **kw):
global auth, imap
if not os.path.isdir(mybasepath):
os.makedirs(mybasepath)
- for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER' ]:
+ for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER', 'locks']:
if not os.path.isdir(os.path.join(mybasepath, stage)):
os.makedirs(os.path.join(mybasepath, stage))
@@ -149,9 +155,14 @@ def execute(*args, **kw):
auth = Auth()
imap = IMAP()
- # TODO: Test for correct call.
filepath = args[0]
+ # ignore calls on lock files
+ if '/locks/' in filepath or kw.has_key('stage') and kw['stage'] == 'locks':
+ return False
+
+ log.debug("Invitation policy executing for %r, %r" % (filepath, '/locks/' in filepath), level=8)
+
if kw.has_key('stage'):
log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8)
@@ -276,6 +287,9 @@ def execute(*args, **kw):
if done is not None:
break
+ # remove possible write lock from this iteration
+ remove_write_lock(get_lock_key(receiving_user, itip_event['uid']))
+
else:
log.debug(_("Ignoring '%s' iTip method") % (itip_event['method']), level=8)
@@ -316,7 +330,7 @@ def process_itip_request(itip_event, policy, recipient_email, sender_email, rece
condition_fulfilled = True
# find existing event in user's calendar
- existing = find_existing_event(itip_event['uid'], receiving_user)
+ existing = find_existing_event(itip_event['uid'], receiving_user, True)
# compare sequence number to determine a (re-)scheduling request
if existing is not None:
@@ -407,7 +421,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
# find existing event in user's calendar
# TODO: set/check lock to avoid concurrent wallace processes trying to update the same event simultaneously
- existing = find_existing_event(itip_event['uid'], receiving_user)
+ existing = find_existing_event(itip_event['uid'], receiving_user, True)
if existing:
# compare sequence number to avoid outdated replies?
@@ -415,6 +429,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
log.info(_("The iTip reply sequence (%r) doesn't match the referred event version (%r). Forwarding to Inbox.") % (
itip_event['sequence'], existing.get_sequence()
))
+ remove_write_lock(existing._lock_key)
return MESSAGE_FORWARD
log.debug(_("Auto-updating event %r on iTip REPLY") % (existing.uid), level=8)
@@ -424,6 +439,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
log.error("Could not find corresponding attende in organizer's event: %r" % (e))
# TODO: accept new participant if ACT_ACCEPT ?
+ remove_write_lock(existing._lock_key)
return MESSAGE_FORWARD
# update the organizer's copy of the event
@@ -457,7 +473,7 @@ def process_itip_cancel(itip_event, policy, recipient_email, sender_email, recei
# auto-update the local copy with STATUS=CANCELLED
if policy & ACT_UPDATE:
# find existing event in user's calendar
- existing = find_existing_event(itip_event['uid'], receiving_user)
+ existing = find_existing_event(itip_event['uid'], receiving_user, True)
if existing:
existing.set_status('CANCELLED')
@@ -615,12 +631,18 @@ def list_user_calendars(user_rec):
return calendars
-def find_existing_event(uid, user_rec):
+def find_existing_event(uid, user_rec, lock=False):
"""
Search user's calendar folders for the given event (by UID)
"""
global imap
+ lock_key = None
+
+ if lock:
+ lock_key = get_lock_key(user_rec, uid)
+ set_write_lock(lock_key)
+
event = None
for folder in list_user_calendars(user_rec):
log.debug(_("Searching folder %r for event %r") % (folder, uid), level=8)
@@ -633,6 +655,7 @@ def find_existing_event(uid, user_rec):
try:
event = event_from_message(message_from_string(data[0][1]))
setattr(event, '_imap_folder', folder)
+ setattr(event, '_lock_key', lock_key)
except Exception, e:
log.error(_("Failed to parse event from message %s/%s: %r") % (folder, num, e))
continue
@@ -640,6 +663,9 @@ def find_existing_event(uid, user_rec):
if event and event.uid == uid:
return event
+ if lock_key is not None:
+ remove_write_lock(lock_key)
+
return event
@@ -691,15 +717,73 @@ def check_availability(itip_event, receiving_user):
return not conflict
+def set_write_lock(key, wait=True):
+ """
+ Set a write-lock for the given key and wait if such a lock already exists
+ """
+ if not os.path.isdir(mybasepath):
+ os.makedirs(mybasepath)
+ if not os.path.isdir(os.path.join(mybasepath, 'locks')):
+ os.makedirs(os.path.join(mybasepath, 'locks'))
+
+ file = os.path.join(mybasepath, 'locks', key + '.lock')
+ locked = os.path.getmtime(file) if os.path.isfile(file) else 0
+ expired = time.time() - 300
+
+ # wait if file lock is in place
+ while locked and locked > expired:
+ if not wait:
+ return False
+
+ log.debug(_("%r is locked, waiting...") % (key), level=9)
+ time.sleep(0.5)
+ locked = os.path.getmtime(file) if os.path.isfile(file) else 0
+
+ # touch the file
+ if os.path.isfile(file):
+ os.utime(file, None)
+ else:
+ open(file, 'w').close()
+
+ # register active lock
+ write_locks.append(key)
+
+ return True
+
+
+def remove_write_lock(key, update=True):
+ """
+ Remove the lock file for the given key
+ """
+ global write_locks
+
+ if key is not None:
+ file = os.path.join(mybasepath, 'locks', key + '.lock')
+ if os.path.isfile(file):
+ os.remove(file)
+ if update:
+ write_locks = [k for k in write_locks if not k == key]
+
+
+def get_lock_key(user, uid):
+ return md5.new("%s/%s" % (user['mail'], uid)).hexdigest()
+
+
def update_event(event, user_rec):
"""
Update the given event in IMAP (i.e. delete + append)
"""
+ success = False
+
if hasattr(event, '_imap_folder'):
delete_event(event)
- return store_event(event, user_rec, event._imap_folder)
+ success = store_event(event, user_rec, event._imap_folder)
- return False
+ # remove write lock for this event
+ if hasattr(event, '_lock_key') and event._lock_key is not None:
+ remove_write_lock(event._lock_key)
+
+ return success
def store_event(event, user_rec, targetfolder=None):
@@ -829,7 +913,7 @@ def propagate_changes_to_attendees_calendars(event):
log.debug(_("Update attendee copy of %r") % (attendee_user_dn), level=9)
attendee_user = auth.get_entry_attributes(None, attendee_user_dn, ['*'])
- attendee_event = find_existing_event(event.uid, attendee_user) # does IMAP authenticate
+ attendee_event = find_existing_event(event.uid, attendee_user, True) # does IMAP authenticate
if attendee_event:
attendee_event.event.setAttendees(event.get_attendees())
success = update_event(attendee_event, attendee_user)