summaryrefslogtreecommitdiffstats
path: root/wallace
diff options
context:
space:
mode:
authorThomas Bruederli <bruederli@kolabsys.com>2014-07-09 16:52:32 -0400
committerThomas Bruederli <bruederli@kolabsys.com>2014-07-09 16:52:32 -0400
commit3e492389ad7f2d628134b3471bc2e4e054b7d204 (patch)
treed2cc561958b9455f4ee54ac62dcae10e6162ce07 /wallace
parent7977586fcff0eb783ff4f17c2aa0bd2a1de262aa (diff)
downloadpykolab-3e492389ad7f2d628134b3471bc2e4e054b7d204.tar.gz
Send iTip replies through wallace again; use a locking mechanism to sequencially process partstat updates from (automated) replies
Diffstat (limited to 'wallace')
-rw-r--r--wallace/module_invitationpolicy.py104
1 files changed, 94 insertions, 10 deletions
diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py
index ec3ad44..b7f59de 100644
--- a/wallace/module_invitationpolicy.py
+++ b/wallace/module_invitationpolicy.py
@@ -23,6 +23,7 @@ import tempfile
import time
from urlparse import urlparse
import urllib
+import md5
from email import message_from_string
from email.parser import Parser
@@ -92,6 +93,7 @@ mybasepath = '/var/spool/pykolab/wallace/invitationpolicy/'
auth = None
imap = None
+write_locks = []
def __init__():
modules.register('invitationpolicy', execute, description=description())
@@ -123,7 +125,7 @@ def description():
return """Invitation policy execution module."""
def cleanup():
- global auth, imap
+ global auth, imap, write_locks
log.debug("cleanup(): %r, %r" % (auth, imap), level=9)
@@ -134,13 +136,17 @@ def cleanup():
imap.disconnect()
del imap
+ # remove remaining write locks
+ for key in write_locks:
+ remove_write_lock(key, False)
+
def execute(*args, **kw):
global auth, imap
if not os.path.isdir(mybasepath):
os.makedirs(mybasepath)
- for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER' ]:
+ for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER', 'locks']:
if not os.path.isdir(os.path.join(mybasepath, stage)):
os.makedirs(os.path.join(mybasepath, stage))
@@ -149,9 +155,14 @@ def execute(*args, **kw):
auth = Auth()
imap = IMAP()
- # TODO: Test for correct call.
filepath = args[0]
+ # ignore calls on lock files
+ if '/locks/' in filepath or kw.has_key('stage') and kw['stage'] == 'locks':
+ return False
+
+ log.debug("Invitation policy executing for %r, %r" % (filepath, '/locks/' in filepath), level=8)
+
if kw.has_key('stage'):
log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8)
@@ -276,6 +287,9 @@ def execute(*args, **kw):
if done is not None:
break
+ # remove possible write lock from this iteration
+ remove_write_lock(get_lock_key(receiving_user, itip_event['uid']))
+
else:
log.debug(_("Ignoring '%s' iTip method") % (itip_event['method']), level=8)
@@ -316,7 +330,7 @@ def process_itip_request(itip_event, policy, recipient_email, sender_email, rece
condition_fulfilled = True
# find existing event in user's calendar
- existing = find_existing_event(itip_event['uid'], receiving_user)
+ existing = find_existing_event(itip_event['uid'], receiving_user, True)
# compare sequence number to determine a (re-)scheduling request
if existing is not None:
@@ -407,7 +421,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
# find existing event in user's calendar
# TODO: set/check lock to avoid concurrent wallace processes trying to update the same event simultaneously
- existing = find_existing_event(itip_event['uid'], receiving_user)
+ existing = find_existing_event(itip_event['uid'], receiving_user, True)
if existing:
# compare sequence number to avoid outdated replies?
@@ -415,6 +429,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
log.info(_("The iTip reply sequence (%r) doesn't match the referred event version (%r). Forwarding to Inbox.") % (
itip_event['sequence'], existing.get_sequence()
))
+ remove_write_lock(existing._lock_key)
return MESSAGE_FORWARD
log.debug(_("Auto-updating event %r on iTip REPLY") % (existing.uid), level=8)
@@ -424,6 +439,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
log.error("Could not find corresponding attende in organizer's event: %r" % (e))
# TODO: accept new participant if ACT_ACCEPT ?
+ remove_write_lock(existing._lock_key)
return MESSAGE_FORWARD
# update the organizer's copy of the event
@@ -457,7 +473,7 @@ def process_itip_cancel(itip_event, policy, recipient_email, sender_email, recei
# auto-update the local copy with STATUS=CANCELLED
if policy & ACT_UPDATE:
# find existing event in user's calendar
- existing = find_existing_event(itip_event['uid'], receiving_user)
+ existing = find_existing_event(itip_event['uid'], receiving_user, True)
if existing:
existing.set_status('CANCELLED')
@@ -615,12 +631,18 @@ def list_user_calendars(user_rec):
return calendars
-def find_existing_event(uid, user_rec):
+def find_existing_event(uid, user_rec, lock=False):
"""
Search user's calendar folders for the given event (by UID)
"""
global imap
+ lock_key = None
+
+ if lock:
+ lock_key = get_lock_key(user_rec, uid)
+ set_write_lock(lock_key)
+
event = None
for folder in list_user_calendars(user_rec):
log.debug(_("Searching folder %r for event %r") % (folder, uid), level=8)
@@ -633,6 +655,7 @@ def find_existing_event(uid, user_rec):
try:
event = event_from_message(message_from_string(data[0][1]))
setattr(event, '_imap_folder', folder)
+ setattr(event, '_lock_key', lock_key)
except Exception, e:
log.error(_("Failed to parse event from message %s/%s: %r") % (folder, num, e))
continue
@@ -640,6 +663,9 @@ def find_existing_event(uid, user_rec):
if event and event.uid == uid:
return event
+ if lock_key is not None:
+ remove_write_lock(lock_key)
+
return event
@@ -691,15 +717,73 @@ def check_availability(itip_event, receiving_user):
return not conflict
+def set_write_lock(key, wait=True):
+ """
+ Set a write-lock for the given key and wait if such a lock already exists
+ """
+ if not os.path.isdir(mybasepath):
+ os.makedirs(mybasepath)
+ if not os.path.isdir(os.path.join(mybasepath, 'locks')):
+ os.makedirs(os.path.join(mybasepath, 'locks'))
+
+ file = os.path.join(mybasepath, 'locks', key + '.lock')
+ locked = os.path.getmtime(file) if os.path.isfile(file) else 0
+ expired = time.time() - 300
+
+ # wait if file lock is in place
+ while locked and locked > expired:
+ if not wait:
+ return False
+
+ log.debug(_("%r is locked, waiting...") % (key), level=9)
+ time.sleep(0.5)
+ locked = os.path.getmtime(file) if os.path.isfile(file) else 0
+
+ # touch the file
+ if os.path.isfile(file):
+ os.utime(file, None)
+ else:
+ open(file, 'w').close()
+
+ # register active lock
+ write_locks.append(key)
+
+ return True
+
+
+def remove_write_lock(key, update=True):
+ """
+ Remove the lock file for the given key
+ """
+ global write_locks
+
+ if key is not None:
+ file = os.path.join(mybasepath, 'locks', key + '.lock')
+ if os.path.isfile(file):
+ os.remove(file)
+ if update:
+ write_locks = [k for k in write_locks if not k == key]
+
+
+def get_lock_key(user, uid):
+ return md5.new("%s/%s" % (user['mail'], uid)).hexdigest()
+
+
def update_event(event, user_rec):
"""
Update the given event in IMAP (i.e. delete + append)
"""
+ success = False
+
if hasattr(event, '_imap_folder'):
delete_event(event)
- return store_event(event, user_rec, event._imap_folder)
+ success = store_event(event, user_rec, event._imap_folder)
- return False
+ # remove write lock for this event
+ if hasattr(event, '_lock_key') and event._lock_key is not None:
+ remove_write_lock(event._lock_key)
+
+ return success
def store_event(event, user_rec, targetfolder=None):
@@ -829,7 +913,7 @@ def propagate_changes_to_attendees_calendars(event):
log.debug(_("Update attendee copy of %r") % (attendee_user_dn), level=9)
attendee_user = auth.get_entry_attributes(None, attendee_user_dn, ['*'])
- attendee_event = find_existing_event(event.uid, attendee_user) # does IMAP authenticate
+ attendee_event = find_existing_event(event.uid, attendee_user, True) # does IMAP authenticate
if attendee_event:
attendee_event.event.setAttendees(event.get_attendees())
success = update_event(attendee_event, attendee_user)