summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Bruederli <bruederli@kolabsys.com>2014-07-06 22:09:42 -0400
committerThomas Bruederli <bruederli@kolabsys.com>2014-07-06 22:09:42 -0400
commit223871e43e7ff6cd3c4dcd49e5c362a1fdf912df (patch)
tree4d1fe11cd8651bfaa14f3af81883315ab31180bd
parent4a76d06a534417920f76fae229c7130a12d2965f (diff)
downloadpykolab-223871e43e7ff6cd3c4dcd49e5c362a1fdf912df.tar.gz
Refactored some iTip functions into a dedicated module for shared use
-rw-r--r--pykolab/itip/__init__.py225
-rw-r--r--tests/unit/test-011-wallace_resources.py22
-rw-r--r--wallace/module_resources.py190
3 files changed, 245 insertions, 192 deletions
diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py
new file mode 100644
index 0000000..04b2d55
--- /dev/null
+++ b/pykolab/itip/__init__.py
@@ -0,0 +1,225 @@
+import icalendar
+import pykolab
+
+from pykolab.xml import to_dt
+from pykolab.xml import event_from_ical
+from pykolab.translate import _
+
+log = pykolab.getLogger('pykolab.wallace')
+
+
+def events_from_message(message, methods=None):
+ return objects_from_message(message, "VEVENT", methods)
+
+def todos_from_message(message, methods=None):
+ return objects_from_message(message, "VTODO", methods)
+
+
+def objects_from_message(message, objname, methods=None):
+ """
+ Obtain the iTip payload from email.message <message>
+ """
+ # Placeholder for any itip_objects found in the message.
+ itip_objects = []
+ seen_uids = []
+
+ # iTip methods we are actually interested in. Other methods will be ignored.
+ if methods is None:
+ methods = [ "REQUEST", "CANCEL" ]
+
+ # Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A
+ # MIME body part containing content information that conforms to this
+ # document MUST have (...)" but does not state whether an iTip message must
+ # therefore also be multipart.
+
+ # Check each part
+ for part in message.walk():
+
+ # The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4)
+ # But in real word, other mime-types are used as well
+ if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]:
+ if not str(part.get_param('method')).upper() in methods:
+ log.info(_("Method %r not really interesting for us.") % (part.get_param('method')))
+ continue
+
+ # Get the itip_payload
+ itip_payload = part.get_payload(decode=True)
+
+ log.debug(_("Raw iTip payload: %s") % (itip_payload), level=9)
+
+ # Python iCalendar prior to 3.0 uses "from_string".
+ if hasattr(icalendar.Calendar, 'from_ical'):
+ cal = icalendar.Calendar.from_ical(itip_payload)
+ elif hasattr(icalendar.Calendar, 'from_string'):
+ cal = icalendar.Calendar.from_string(itip_payload)
+
+ # If we can't read it, we're out
+ else:
+ log.error(_("Could not read iTip from message."))
+ return []
+
+ for c in cal.walk():
+ if c.name == objname:
+ itip = {}
+
+ if c['uid'] in seen_uids:
+ log.debug(_("Duplicate iTip object: %s") % (c['uid']), level=9)
+ continue
+
+ # From the event, take the following properties:
+ #
+ # - method
+ # - uid
+ # - sequence
+ # - start
+ # - end (if any)
+ # - duration (if any)
+ # - organizer
+ # - attendees (if any)
+ # - resources (if any)
+ #
+
+ itip['uid'] = str(c['uid'])
+ itip['method'] = str(cal['method']).upper()
+ itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0
+
+ if c.has_key('dtstart'):
+ itip['start'] = c['dtstart'].dt
+ else:
+ log.error(_("iTip event without a start"))
+ continue
+
+ if c.has_key('dtend'):
+ itip['end'] = c['dtend'].dt
+
+ if c.has_key('duration'):
+ itip['duration'] = c['duration'].dt
+ itip['end'] = itip['start'] + c['duration'].dt
+
+ itip['organizer'] = c['organizer']
+
+ itip['attendees'] = c['attendee']
+
+ if c.has_key('resources'):
+ itip['resources'] = c['resources']
+
+ itip['raw'] = itip_payload
+
+ try:
+ # TODO: distinguish event and todo here
+ itip['xml'] = event_from_ical(c.to_ical())
+ except Exception, e:
+ log.error("event_from_ical() exception: %r" % (e))
+ continue
+
+ itip_objects.append(itip)
+
+ seen_uids.append(c['uid'])
+
+ # end if c.name == "VEVENT"
+
+ # end for c in cal.walk()
+
+ # end if part.get_content_type() == "text/calendar"
+
+ # end for part in message.walk()
+
+ if not len(itip_objects) and not message.is_multipart():
+ log.debug(_("Message is not an iTip message (non-multipart message)"), level=5)
+
+ return itip_objects
+
+
+def check_event_conflict(kolab_event, itip_event):
+ """
+ Determine whether the given kolab event conflicts with the given itip event
+ """
+ conflict = False
+
+ # don't consider conflict with myself
+ if kolab_event.uid == itip_event['uid']:
+ return conflict
+
+ _es = to_dt(kolab_event.get_start())
+ _ee = to_dt(kolab_event.get_end())
+
+ # naive loops to check for collisions in (recurring) events
+ # TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday)
+ while not conflict and _es is not None:
+ _is = to_dt(itip_event['start'])
+ _ie = to_dt(itip_event['end'])
+
+ while not conflict and _is is not None:
+ # log.debug("* Comparing event dates at %s/%s with %s/%s" % (_es, _ee, _is, _ie), level=9)
+ conflict = check_date_conflict(_es, _ee, _is, _ie)
+ _is = to_dt(itip_event['xml'].get_next_occurence(_is)) if kolab_event.is_recurring() else None
+ _ie = to_dt(itip_event['xml'].get_occurence_end_date(_is))
+
+ _es = to_dt(kolab_event.get_next_occurence(_es)) if kolab_event.is_recurring() else None
+ _ee = to_dt(kolab_event.get_occurence_end_date(_es))
+
+ return conflict
+
+
+def check_date_conflict(_es, _ee, _is, _ie):
+ """
+ Check the given event start/end dates for conflicts
+ """
+ conflict = False
+
+ # TODO: add margin for all-day dates (+13h; -12h)
+
+ if _es < _is:
+ if _es <= _ie:
+ if _ee <= _is:
+ conflict = False
+ else:
+ conflict = True
+ else:
+ conflict = True
+ elif _es == _is:
+ conflict = True
+ else: # _es > _is
+ if _es <= _ie:
+ conflict = True
+ else:
+ conflict = False
+
+ return conflict
+
+
+def send_reply(from_address, itip_events, response_text, subject=None):
+ """
+ Send the given iCal events as a valid iTip REPLY to the organizer.
+ """
+
+ import smtplib
+ smtp = smtplib.SMTP("localhost", 10027)
+
+ conf = pykolab.getConf()
+
+ if conf.debuglevel > 8:
+ smtp.set_debuglevel(True)
+
+ if isinstance(itip_events, dict):
+ itip_events = [ itip_events ]
+
+ for itip_event in itip_events:
+ attendee = itip_event['xml'].get_attendee_by_email(from_address)
+ participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee)
+
+ event_summary = itip_event['xml'].get_summary()
+ message_text = response_text % { 'summary':event_summary, 'status':_(participant_status), 'name':attendee.get_name() }
+
+ if subject is not None:
+ subject = subject % { 'summary':event_summary, 'status':_(participant_status), 'name':attendee.get_name() }
+
+ message = itip_event['xml'].to_message_itip(from_address,
+ method="REPLY",
+ participant_status=participant_status,
+ message_text=message_text,
+ subject=subject
+ )
+ smtp.sendmail(message['From'], message['To'], message.as_string())
+
+ smtp.quit()
diff --git a/tests/unit/test-011-wallace_resources.py b/tests/unit/test-011-wallace_resources.py
index 62bfd27..bb586f8 100644
--- a/tests/unit/test-011-wallace_resources.py
+++ b/tests/unit/test-011-wallace_resources.py
@@ -302,29 +302,29 @@ class TestWallaceResources(unittest.TestCase):
return None
def test_001_itip_events_from_message(self):
- itips1 = module_resources.itip_events_from_message(message_from_string(itip_multipart))
+ itips1 = pykolab.itip.events_from_message(message_from_string(itip_multipart))
self.assertEqual(len(itips1), 1, "Multipart iTip message with text/calendar")
self.assertEqual(itips1[0]['method'], "REQUEST", "iTip request method property")
- itips2 = module_resources.itip_events_from_message(message_from_string(itip_non_multipart))
+ itips2 = pykolab.itip.events_from_message(message_from_string(itip_non_multipart))
self.assertEqual(len(itips2), 1, "Detect non-multipart iTip messages")
- itips3 = module_resources.itip_events_from_message(message_from_string(itip_application_ics))
+ itips3 = pykolab.itip.events_from_message(message_from_string(itip_application_ics))
self.assertEqual(len(itips3), 1, "Multipart iTip message with application/ics attachment")
- itips4 = module_resources.itip_events_from_message(message_from_string(itip_google_multipart))
+ itips4 = pykolab.itip.events_from_message(message_from_string(itip_google_multipart))
self.assertEqual(len(itips4), 1, "Multipart iTip message from Google")
- itips5 = module_resources.itip_events_from_message(message_from_string(itip_empty))
+ itips5 = pykolab.itip.events_from_message(message_from_string(itip_empty))
self.assertEqual(len(itips5), 0, "Simple plain text message")
# invalid itip blocks
- self.assertRaises(Exception, module_resources.itip_events_from_message, message_from_string(itip_multipart.replace("BEGIN:VEVENT", "")))
+ self.assertRaises(Exception, pykolab.itip.events_from_message, message_from_string(itip_multipart.replace("BEGIN:VEVENT", "")))
- itips6 = module_resources.itip_events_from_message(message_from_string(itip_multipart.replace("DTSTART;", "X-DTSTART;")))
+ itips6 = pykolab.itip.events_from_message(message_from_string(itip_multipart.replace("DTSTART;", "X-DTSTART;")))
self.assertEqual(len(itips6), 0, "Event with not DTSTART")
- itips7 = module_resources.itip_events_from_message(message_from_string(itip_non_multipart.replace("METHOD:REQUEST", "METHOD:PUBLISH").replace("method=REQUEST", "method=PUBLISH")))
+ itips7 = pykolab.itip.events_from_message(message_from_string(itip_non_multipart.replace("METHOD:REQUEST", "METHOD:PUBLISH").replace("method=REQUEST", "method=PUBLISH")))
self.assertEqual(len(itips7), 0, "Invalid METHOD")
@@ -337,7 +337,7 @@ class TestWallaceResources(unittest.TestCase):
def test_003_resource_records_from_itip_events(self):
message = message_from_string(itip_multipart)
- itips = module_resources.itip_events_from_message(message)
+ itips = pykolab.itip.events_from_message(message)
res = module_resources.resource_records_from_itip_events(itips)
self.assertEqual(len(res), 2, "Return all attendee resources");
@@ -365,7 +365,7 @@ class TestWallaceResources(unittest.TestCase):
def test_005_send_response_accept(self):
- itip_event = module_resources.itip_events_from_message(message_from_string(itip_non_multipart))
+ itip_event = pykolab.itip.events_from_message(message_from_string(itip_non_multipart))
module_resources.send_response("resource-collection-car@example.org", itip_event)
self.assertEqual(len(self.smtplog), 1);
@@ -384,7 +384,7 @@ class TestWallaceResources(unittest.TestCase):
def test_006_send_response_delegate(self):
# delegate resource-collection-car@example.org => resource-car-audi-a4@example.org
- itip_event = module_resources.itip_events_from_message(message_from_string(itip_non_multipart))[0]
+ itip_event = pykolab.itip.events_from_message(message_from_string(itip_non_multipart))[0]
itip_event['xml'].delegate('resource-collection-car@example.org', 'resource-car-audi-a4@example.org')
itip_event['xml'].set_attendee_participant_status(itip_event['xml'].get_attendee('resource-car-audi-a4@example.org'), "ACCEPTED")
diff --git a/wallace/module_resources.py b/wallace/module_resources.py
index 303252b..f398120 100644
--- a/wallace/module_resources.py
+++ b/wallace/module_resources.py
@@ -40,9 +40,10 @@ import kolabformat
from pykolab.auth import Auth
from pykolab.conf import Conf
from pykolab.imap import IMAP
-from pykolab.xml import event_from_ical
from pykolab.xml import event_from_string
from pykolab.xml import to_dt
+from pykolab.itip import events_from_message
+from pykolab.itip import check_event_conflict
from pykolab.translate import _
log = pykolab.getLogger('pykolab.wallace')
@@ -150,7 +151,7 @@ def execute(*args, **kw):
# An iTip message may contain multiple events. Later on, test if the message
# is an iTip message by checking the length of this list.
try:
- itip_events = itip_events_from_message(message)
+ itip_events = events_from_message(message, ['REQUEST', 'CANCEL'])
except Exception, e:
log.error(_("Failed to parse iTip events from message: %r" % (e)))
itip_events = []
@@ -473,33 +474,11 @@ def read_resource_calendar(resource_rec, itip_events):
event = pykolab.xml.event_from_string(payload)
for itip in itip_events:
- _es = to_dt(event.get_start())
- _ee = to_dt(event.get_end())
-
- conflict = False
-
- # naive loops to check for collisions in (recurring) events
- # TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday)
- while not conflict and _es is not None:
- _is = to_dt(itip['start'])
- _ie = to_dt(itip['end'])
-
- while not conflict and _is is not None:
- log.debug("* Comparing event dates at %s/%s with %s/%s" % (_es, _ee, _is, _ie), level=9)
- conflict = check_date_conflict(_es, _ee, _is, _ie)
- _is = to_dt(itip['xml'].get_next_occurence(_is)) if event.is_recurring() else None
- _ie = to_dt(itip['xml'].get_occurence_end_date(_is))
-
- _es = to_dt(event.get_next_occurence(_es)) if event.is_recurring() else None
- _ee = to_dt(event.get_occurence_end_date(_es))
+ conflict = check_event_conflict(event, itip)
if event.get_uid() == itip['uid']:
resource_rec['existing_events'].append(itip['uid'])
- # don't register conflict for updates
- if itip['sequence'] > 0 and itip['sequence'] >= event.get_sequence():
- conflict = False
-
if conflict:
log.info(
_("Event %r conflicts with event %r") % (
@@ -513,29 +492,6 @@ def read_resource_calendar(resource_rec, itip_events):
return num_messages
-def check_date_conflict(_es, _ee, _is, _ie):
- conflict = False
-
- # TODO: add margin for all-day dates (+13h; -12h)
-
- if _es < _is:
- if _es <= _ie:
- if _ee <= _is:
- conflict = False
- else:
- conflict = True
- else:
- conflict = True
- elif _es == _is:
- conflict = True
- else: # _es > _is
- if _es <= _ie:
- conflict = True
- else:
- conflict = False
-
- return conflict
-
def accept_reservation_request(itip_event, resource, delegator=None):
"""
@@ -617,118 +573,6 @@ def delete_resource_event(uid, resource):
imap.imap.m.expunge()
-def itip_events_from_message(message):
- """
- Obtain the iTip payload from email.message <message>
- """
- # Placeholder for any itip_events found in the message.
- itip_events = []
- seen_uids = []
-
- # iTip methods we are actually interested in. Other methods will be ignored.
- itip_methods = [ "REQUEST", "CANCEL" ]
-
- # Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A
- # MIME body part containing content information that conforms to this
- # document MUST have (...)" but does not state whether an iTip message must
- # therefore also be multipart.
-
- # Check each part
- for part in message.walk():
-
- # The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4)
- # But in real word, other mime-types are used as well
- if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]:
- if not str(part.get_param('method')).upper() in itip_methods:
- log.error(_("Method %r not really interesting for us.") % (part.get_param('method')))
- continue
-
- # Get the itip_payload
- itip_payload = part.get_payload(decode=True)
-
- log.debug(_("Raw iTip payload: %s") % (itip_payload), level=9)
-
- # Python iCalendar prior to 3.0 uses "from_string".
- if hasattr(icalendar.Calendar, 'from_ical'):
- cal = icalendar.Calendar.from_ical(itip_payload)
- elif hasattr(icalendar.Calendar, 'from_string'):
- cal = icalendar.Calendar.from_string(itip_payload)
-
- # If we can't read it, we're out
- else:
- log.error(_("Could not read iTip from message."))
- return []
-
- for c in cal.walk():
- if c.name == "VEVENT":
- itip = {}
-
- if c['uid'] in seen_uids:
- log.debug(_("Duplicate iTip event: %s") % (c['uid']), level=9)
- continue
-
- # From the event, take the following properties:
- #
- # - method
- # - uid
- # - sequence
- # - start
- # - end (if any)
- # - duration (if any)
- # - organizer
- # - attendees (if any)
- # - resources (if any)
- #
-
- itip['uid'] = str(c['uid'])
- itip['method'] = str(cal['method']).upper()
- itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0
-
- if c.has_key('dtstart'):
- itip['start'] = c['dtstart'].dt
- else:
- log.error(_("iTip event without a start"))
- continue
-
- if c.has_key('dtend'):
- itip['end'] = c['dtend'].dt
-
- if c.has_key('duration'):
- itip['duration'] = c['duration'].dt
- itip['end'] = itip['start'] + c['duration'].dt
-
- itip['organizer'] = c['organizer']
-
- itip['attendees'] = c['attendee']
-
- if c.has_key('resources'):
- itip['resources'] = c['resources']
-
- itip['raw'] = itip_payload
-
- try:
- itip['xml'] = event_from_ical(c.to_ical())
- except Exception, e:
- log.error("event_from_ical() exception: %r" % (e))
- continue
-
- itip_events.append(itip)
-
- seen_uids.append(c['uid'])
-
- # end if c.name == "VEVENT"
-
- # end for c in cal.walk()
-
- # end if part.get_content_type() == "text/calendar"
-
- # end for part in message.walk()
-
- if not len(itip_events) and not message.is_multipart():
- log.debug(_("Message is not an iTip message (non-multipart message)"), level=5)
-
- return itip_events
-
def reject(filepath):
new_filepath = os.path.join(
mybasepath,
@@ -986,12 +830,6 @@ def send_response(from_address, itip_events, owner=None):
resource, this will send an additional DELEGATED response message.
"""
- import smtplib
- smtp = smtplib.SMTP("localhost", 10027)
-
- if conf.debuglevel > 8:
- smtp.set_debuglevel(True)
-
if isinstance(itip_events, dict):
itip_events = [ itip_events ]
@@ -1000,6 +838,7 @@ def send_response(from_address, itip_events, owner=None):
participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee)
message_text = reservation_response_text(participant_status, owner)
+ subject_template = _("Reservation Request for %(summary)s was %(status)s")
if participant_status == "DELEGATED":
# Extra actions to take
@@ -1007,32 +846,21 @@ def send_response(from_address, itip_events, owner=None):
delegatee = [a for a in itip_event['xml'].get_attendees() if from_address in [b.email() for b in a.get_delegated_from()]][0]
delegatee_status = itip_event['xml'].get_ical_attendee_participant_status(delegatee)
- message = itip_event['xml'].to_message_itip(delegatee.get_email(),
- method="REPLY",
- participant_status=delegatee_status,
- message_text=reservation_response_text(delegatee_status, owner)
- )
- smtp.sendmail(message['From'], message['To'], message.as_string())
+ pykolab.itip.send_reply(delegatee.get_email(), itip_event, reservation_response_text(delegatee_status, owner),
+ subject=subject_template)
# restore list of attendees after to_message_itip()
itip_event['xml']._attendees = [ delegator, delegatee ]
itip_event['xml'].event.setAttendees(itip_event['xml']._attendees)
- participant_status = "DELEGATED"
message_text = _("""
*** This is an automated response, please do not reply! ***
Your reservation was delegated to "%s" which is available for the requested time.
""") % (delegatee.get_name())
- message = itip_event['xml'].to_message_itip(from_address,
- method="REPLY",
- participant_status=participant_status,
- message_text=message_text
- )
- smtp.sendmail(message['From'], message['To'], message.as_string())
-
- smtp.quit()
+ pykolab.itip.send_reply(from_address, itip_event, message_text,
+ subject=subject_template)
def reservation_response_text(status, owner):