diff options
-rw-r--r-- | pykolab/itip/__init__.py | 225 | ||||
-rw-r--r-- | tests/unit/test-011-wallace_resources.py | 22 | ||||
-rw-r--r-- | wallace/module_resources.py | 190 |
3 files changed, 245 insertions, 192 deletions
diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py new file mode 100644 index 0000000..04b2d55 --- /dev/null +++ b/pykolab/itip/__init__.py @@ -0,0 +1,225 @@ +import icalendar +import pykolab + +from pykolab.xml import to_dt +from pykolab.xml import event_from_ical +from pykolab.translate import _ + +log = pykolab.getLogger('pykolab.wallace') + + +def events_from_message(message, methods=None): + return objects_from_message(message, "VEVENT", methods) + +def todos_from_message(message, methods=None): + return objects_from_message(message, "VTODO", methods) + + +def objects_from_message(message, objname, methods=None): + """ + Obtain the iTip payload from email.message <message> + """ + # Placeholder for any itip_objects found in the message. + itip_objects = [] + seen_uids = [] + + # iTip methods we are actually interested in. Other methods will be ignored. + if methods is None: + methods = [ "REQUEST", "CANCEL" ] + + # Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A + # MIME body part containing content information that conforms to this + # document MUST have (...)" but does not state whether an iTip message must + # therefore also be multipart. + + # Check each part + for part in message.walk(): + + # The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4) + # But in real word, other mime-types are used as well + if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]: + if not str(part.get_param('method')).upper() in methods: + log.info(_("Method %r not really interesting for us.") % (part.get_param('method'))) + continue + + # Get the itip_payload + itip_payload = part.get_payload(decode=True) + + log.debug(_("Raw iTip payload: %s") % (itip_payload), level=9) + + # Python iCalendar prior to 3.0 uses "from_string". + if hasattr(icalendar.Calendar, 'from_ical'): + cal = icalendar.Calendar.from_ical(itip_payload) + elif hasattr(icalendar.Calendar, 'from_string'): + cal = icalendar.Calendar.from_string(itip_payload) + + # If we can't read it, we're out + else: + log.error(_("Could not read iTip from message.")) + return [] + + for c in cal.walk(): + if c.name == objname: + itip = {} + + if c['uid'] in seen_uids: + log.debug(_("Duplicate iTip object: %s") % (c['uid']), level=9) + continue + + # From the event, take the following properties: + # + # - method + # - uid + # - sequence + # - start + # - end (if any) + # - duration (if any) + # - organizer + # - attendees (if any) + # - resources (if any) + # + + itip['uid'] = str(c['uid']) + itip['method'] = str(cal['method']).upper() + itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0 + + if c.has_key('dtstart'): + itip['start'] = c['dtstart'].dt + else: + log.error(_("iTip event without a start")) + continue + + if c.has_key('dtend'): + itip['end'] = c['dtend'].dt + + if c.has_key('duration'): + itip['duration'] = c['duration'].dt + itip['end'] = itip['start'] + c['duration'].dt + + itip['organizer'] = c['organizer'] + + itip['attendees'] = c['attendee'] + + if c.has_key('resources'): + itip['resources'] = c['resources'] + + itip['raw'] = itip_payload + + try: + # TODO: distinguish event and todo here + itip['xml'] = event_from_ical(c.to_ical()) + except Exception, e: + log.error("event_from_ical() exception: %r" % (e)) + continue + + itip_objects.append(itip) + + seen_uids.append(c['uid']) + + # end if c.name == "VEVENT" + + # end for c in cal.walk() + + # end if part.get_content_type() == "text/calendar" + + # end for part in message.walk() + + if not len(itip_objects) and not message.is_multipart(): + log.debug(_("Message is not an iTip message (non-multipart message)"), level=5) + + return itip_objects + + +def check_event_conflict(kolab_event, itip_event): + """ + Determine whether the given kolab event conflicts with the given itip event + """ + conflict = False + + # don't consider conflict with myself + if kolab_event.uid == itip_event['uid']: + return conflict + + _es = to_dt(kolab_event.get_start()) + _ee = to_dt(kolab_event.get_end()) + + # naive loops to check for collisions in (recurring) events + # TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday) + while not conflict and _es is not None: + _is = to_dt(itip_event['start']) + _ie = to_dt(itip_event['end']) + + while not conflict and _is is not None: + # log.debug("* Comparing event dates at %s/%s with %s/%s" % (_es, _ee, _is, _ie), level=9) + conflict = check_date_conflict(_es, _ee, _is, _ie) + _is = to_dt(itip_event['xml'].get_next_occurence(_is)) if kolab_event.is_recurring() else None + _ie = to_dt(itip_event['xml'].get_occurence_end_date(_is)) + + _es = to_dt(kolab_event.get_next_occurence(_es)) if kolab_event.is_recurring() else None + _ee = to_dt(kolab_event.get_occurence_end_date(_es)) + + return conflict + + +def check_date_conflict(_es, _ee, _is, _ie): + """ + Check the given event start/end dates for conflicts + """ + conflict = False + + # TODO: add margin for all-day dates (+13h; -12h) + + if _es < _is: + if _es <= _ie: + if _ee <= _is: + conflict = False + else: + conflict = True + else: + conflict = True + elif _es == _is: + conflict = True + else: # _es > _is + if _es <= _ie: + conflict = True + else: + conflict = False + + return conflict + + +def send_reply(from_address, itip_events, response_text, subject=None): + """ + Send the given iCal events as a valid iTip REPLY to the organizer. + """ + + import smtplib + smtp = smtplib.SMTP("localhost", 10027) + + conf = pykolab.getConf() + + if conf.debuglevel > 8: + smtp.set_debuglevel(True) + + if isinstance(itip_events, dict): + itip_events = [ itip_events ] + + for itip_event in itip_events: + attendee = itip_event['xml'].get_attendee_by_email(from_address) + participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee) + + event_summary = itip_event['xml'].get_summary() + message_text = response_text % { 'summary':event_summary, 'status':_(participant_status), 'name':attendee.get_name() } + + if subject is not None: + subject = subject % { 'summary':event_summary, 'status':_(participant_status), 'name':attendee.get_name() } + + message = itip_event['xml'].to_message_itip(from_address, + method="REPLY", + participant_status=participant_status, + message_text=message_text, + subject=subject + ) + smtp.sendmail(message['From'], message['To'], message.as_string()) + + smtp.quit() diff --git a/tests/unit/test-011-wallace_resources.py b/tests/unit/test-011-wallace_resources.py index 62bfd27..bb586f8 100644 --- a/tests/unit/test-011-wallace_resources.py +++ b/tests/unit/test-011-wallace_resources.py @@ -302,29 +302,29 @@ class TestWallaceResources(unittest.TestCase): return None def test_001_itip_events_from_message(self): - itips1 = module_resources.itip_events_from_message(message_from_string(itip_multipart)) + itips1 = pykolab.itip.events_from_message(message_from_string(itip_multipart)) self.assertEqual(len(itips1), 1, "Multipart iTip message with text/calendar") self.assertEqual(itips1[0]['method'], "REQUEST", "iTip request method property") - itips2 = module_resources.itip_events_from_message(message_from_string(itip_non_multipart)) + itips2 = pykolab.itip.events_from_message(message_from_string(itip_non_multipart)) self.assertEqual(len(itips2), 1, "Detect non-multipart iTip messages") - itips3 = module_resources.itip_events_from_message(message_from_string(itip_application_ics)) + itips3 = pykolab.itip.events_from_message(message_from_string(itip_application_ics)) self.assertEqual(len(itips3), 1, "Multipart iTip message with application/ics attachment") - itips4 = module_resources.itip_events_from_message(message_from_string(itip_google_multipart)) + itips4 = pykolab.itip.events_from_message(message_from_string(itip_google_multipart)) self.assertEqual(len(itips4), 1, "Multipart iTip message from Google") - itips5 = module_resources.itip_events_from_message(message_from_string(itip_empty)) + itips5 = pykolab.itip.events_from_message(message_from_string(itip_empty)) self.assertEqual(len(itips5), 0, "Simple plain text message") # invalid itip blocks - self.assertRaises(Exception, module_resources.itip_events_from_message, message_from_string(itip_multipart.replace("BEGIN:VEVENT", ""))) + self.assertRaises(Exception, pykolab.itip.events_from_message, message_from_string(itip_multipart.replace("BEGIN:VEVENT", ""))) - itips6 = module_resources.itip_events_from_message(message_from_string(itip_multipart.replace("DTSTART;", "X-DTSTART;"))) + itips6 = pykolab.itip.events_from_message(message_from_string(itip_multipart.replace("DTSTART;", "X-DTSTART;"))) self.assertEqual(len(itips6), 0, "Event with not DTSTART") - itips7 = module_resources.itip_events_from_message(message_from_string(itip_non_multipart.replace("METHOD:REQUEST", "METHOD:PUBLISH").replace("method=REQUEST", "method=PUBLISH"))) + itips7 = pykolab.itip.events_from_message(message_from_string(itip_non_multipart.replace("METHOD:REQUEST", "METHOD:PUBLISH").replace("method=REQUEST", "method=PUBLISH"))) self.assertEqual(len(itips7), 0, "Invalid METHOD") @@ -337,7 +337,7 @@ class TestWallaceResources(unittest.TestCase): def test_003_resource_records_from_itip_events(self): message = message_from_string(itip_multipart) - itips = module_resources.itip_events_from_message(message) + itips = pykolab.itip.events_from_message(message) res = module_resources.resource_records_from_itip_events(itips) self.assertEqual(len(res), 2, "Return all attendee resources"); @@ -365,7 +365,7 @@ class TestWallaceResources(unittest.TestCase): def test_005_send_response_accept(self): - itip_event = module_resources.itip_events_from_message(message_from_string(itip_non_multipart)) + itip_event = pykolab.itip.events_from_message(message_from_string(itip_non_multipart)) module_resources.send_response("resource-collection-car@example.org", itip_event) self.assertEqual(len(self.smtplog), 1); @@ -384,7 +384,7 @@ class TestWallaceResources(unittest.TestCase): def test_006_send_response_delegate(self): # delegate resource-collection-car@example.org => resource-car-audi-a4@example.org - itip_event = module_resources.itip_events_from_message(message_from_string(itip_non_multipart))[0] + itip_event = pykolab.itip.events_from_message(message_from_string(itip_non_multipart))[0] itip_event['xml'].delegate('resource-collection-car@example.org', 'resource-car-audi-a4@example.org') itip_event['xml'].set_attendee_participant_status(itip_event['xml'].get_attendee('resource-car-audi-a4@example.org'), "ACCEPTED") diff --git a/wallace/module_resources.py b/wallace/module_resources.py index 303252b..f398120 100644 --- a/wallace/module_resources.py +++ b/wallace/module_resources.py @@ -40,9 +40,10 @@ import kolabformat from pykolab.auth import Auth from pykolab.conf import Conf from pykolab.imap import IMAP -from pykolab.xml import event_from_ical from pykolab.xml import event_from_string from pykolab.xml import to_dt +from pykolab.itip import events_from_message +from pykolab.itip import check_event_conflict from pykolab.translate import _ log = pykolab.getLogger('pykolab.wallace') @@ -150,7 +151,7 @@ def execute(*args, **kw): # An iTip message may contain multiple events. Later on, test if the message # is an iTip message by checking the length of this list. try: - itip_events = itip_events_from_message(message) + itip_events = events_from_message(message, ['REQUEST', 'CANCEL']) except Exception, e: log.error(_("Failed to parse iTip events from message: %r" % (e))) itip_events = [] @@ -473,33 +474,11 @@ def read_resource_calendar(resource_rec, itip_events): event = pykolab.xml.event_from_string(payload) for itip in itip_events: - _es = to_dt(event.get_start()) - _ee = to_dt(event.get_end()) - - conflict = False - - # naive loops to check for collisions in (recurring) events - # TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday) - while not conflict and _es is not None: - _is = to_dt(itip['start']) - _ie = to_dt(itip['end']) - - while not conflict and _is is not None: - log.debug("* Comparing event dates at %s/%s with %s/%s" % (_es, _ee, _is, _ie), level=9) - conflict = check_date_conflict(_es, _ee, _is, _ie) - _is = to_dt(itip['xml'].get_next_occurence(_is)) if event.is_recurring() else None - _ie = to_dt(itip['xml'].get_occurence_end_date(_is)) - - _es = to_dt(event.get_next_occurence(_es)) if event.is_recurring() else None - _ee = to_dt(event.get_occurence_end_date(_es)) + conflict = check_event_conflict(event, itip) if event.get_uid() == itip['uid']: resource_rec['existing_events'].append(itip['uid']) - # don't register conflict for updates - if itip['sequence'] > 0 and itip['sequence'] >= event.get_sequence(): - conflict = False - if conflict: log.info( _("Event %r conflicts with event %r") % ( @@ -513,29 +492,6 @@ def read_resource_calendar(resource_rec, itip_events): return num_messages -def check_date_conflict(_es, _ee, _is, _ie): - conflict = False - - # TODO: add margin for all-day dates (+13h; -12h) - - if _es < _is: - if _es <= _ie: - if _ee <= _is: - conflict = False - else: - conflict = True - else: - conflict = True - elif _es == _is: - conflict = True - else: # _es > _is - if _es <= _ie: - conflict = True - else: - conflict = False - - return conflict - def accept_reservation_request(itip_event, resource, delegator=None): """ @@ -617,118 +573,6 @@ def delete_resource_event(uid, resource): imap.imap.m.expunge() -def itip_events_from_message(message): - """ - Obtain the iTip payload from email.message <message> - """ - # Placeholder for any itip_events found in the message. - itip_events = [] - seen_uids = [] - - # iTip methods we are actually interested in. Other methods will be ignored. - itip_methods = [ "REQUEST", "CANCEL" ] - - # Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A - # MIME body part containing content information that conforms to this - # document MUST have (...)" but does not state whether an iTip message must - # therefore also be multipart. - - # Check each part - for part in message.walk(): - - # The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4) - # But in real word, other mime-types are used as well - if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]: - if not str(part.get_param('method')).upper() in itip_methods: - log.error(_("Method %r not really interesting for us.") % (part.get_param('method'))) - continue - - # Get the itip_payload - itip_payload = part.get_payload(decode=True) - - log.debug(_("Raw iTip payload: %s") % (itip_payload), level=9) - - # Python iCalendar prior to 3.0 uses "from_string". - if hasattr(icalendar.Calendar, 'from_ical'): - cal = icalendar.Calendar.from_ical(itip_payload) - elif hasattr(icalendar.Calendar, 'from_string'): - cal = icalendar.Calendar.from_string(itip_payload) - - # If we can't read it, we're out - else: - log.error(_("Could not read iTip from message.")) - return [] - - for c in cal.walk(): - if c.name == "VEVENT": - itip = {} - - if c['uid'] in seen_uids: - log.debug(_("Duplicate iTip event: %s") % (c['uid']), level=9) - continue - - # From the event, take the following properties: - # - # - method - # - uid - # - sequence - # - start - # - end (if any) - # - duration (if any) - # - organizer - # - attendees (if any) - # - resources (if any) - # - - itip['uid'] = str(c['uid']) - itip['method'] = str(cal['method']).upper() - itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0 - - if c.has_key('dtstart'): - itip['start'] = c['dtstart'].dt - else: - log.error(_("iTip event without a start")) - continue - - if c.has_key('dtend'): - itip['end'] = c['dtend'].dt - - if c.has_key('duration'): - itip['duration'] = c['duration'].dt - itip['end'] = itip['start'] + c['duration'].dt - - itip['organizer'] = c['organizer'] - - itip['attendees'] = c['attendee'] - - if c.has_key('resources'): - itip['resources'] = c['resources'] - - itip['raw'] = itip_payload - - try: - itip['xml'] = event_from_ical(c.to_ical()) - except Exception, e: - log.error("event_from_ical() exception: %r" % (e)) - continue - - itip_events.append(itip) - - seen_uids.append(c['uid']) - - # end if c.name == "VEVENT" - - # end for c in cal.walk() - - # end if part.get_content_type() == "text/calendar" - - # end for part in message.walk() - - if not len(itip_events) and not message.is_multipart(): - log.debug(_("Message is not an iTip message (non-multipart message)"), level=5) - - return itip_events - def reject(filepath): new_filepath = os.path.join( mybasepath, @@ -986,12 +830,6 @@ def send_response(from_address, itip_events, owner=None): resource, this will send an additional DELEGATED response message. """ - import smtplib - smtp = smtplib.SMTP("localhost", 10027) - - if conf.debuglevel > 8: - smtp.set_debuglevel(True) - if isinstance(itip_events, dict): itip_events = [ itip_events ] @@ -1000,6 +838,7 @@ def send_response(from_address, itip_events, owner=None): participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee) message_text = reservation_response_text(participant_status, owner) + subject_template = _("Reservation Request for %(summary)s was %(status)s") if participant_status == "DELEGATED": # Extra actions to take @@ -1007,32 +846,21 @@ def send_response(from_address, itip_events, owner=None): delegatee = [a for a in itip_event['xml'].get_attendees() if from_address in [b.email() for b in a.get_delegated_from()]][0] delegatee_status = itip_event['xml'].get_ical_attendee_participant_status(delegatee) - message = itip_event['xml'].to_message_itip(delegatee.get_email(), - method="REPLY", - participant_status=delegatee_status, - message_text=reservation_response_text(delegatee_status, owner) - ) - smtp.sendmail(message['From'], message['To'], message.as_string()) + pykolab.itip.send_reply(delegatee.get_email(), itip_event, reservation_response_text(delegatee_status, owner), + subject=subject_template) # restore list of attendees after to_message_itip() itip_event['xml']._attendees = [ delegator, delegatee ] itip_event['xml'].event.setAttendees(itip_event['xml']._attendees) - participant_status = "DELEGATED" message_text = _(""" *** This is an automated response, please do not reply! *** Your reservation was delegated to "%s" which is available for the requested time. """) % (delegatee.get_name()) - message = itip_event['xml'].to_message_itip(from_address, - method="REPLY", - participant_status=participant_status, - message_text=message_text - ) - smtp.sendmail(message['From'], message['To'], message.as_string()) - - smtp.quit() + pykolab.itip.send_reply(from_address, itip_event, message_text, + subject=subject_template) def reservation_response_text(status, owner): |