summaryrefslogtreecommitdiffstats
path: root/pykolab/itip/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'pykolab/itip/__init__.py')
-rw-r--r--pykolab/itip/__init__.py157
1 files changed, 111 insertions, 46 deletions
diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py
index 6258dfb..b959bc2 100644
--- a/pykolab/itip/__init__.py
+++ b/pykolab/itip/__init__.py
@@ -1,26 +1,33 @@
-import icalendar
-import pykolab
+import re
import traceback
+
+import icalendar
import kolabformat
-import re
+import pykolab
+from pykolab.translate import _
from pykolab.xml import to_dt
from pykolab.xml import event_from_ical
from pykolab.xml import todo_from_ical
from pykolab.xml import participant_status_label
-from pykolab.translate import _
+
from tzlocal import windows_tz
+# pylint: disable=invalid-name
log = pykolab.getLogger('pykolab.wallace')
def events_from_message(message, methods=None):
return objects_from_message(message, ["VEVENT"], methods)
+
def todos_from_message(message, methods=None):
return objects_from_message(message, ["VTODO"], methods)
-def objects_from_message(message, objnames, methods=None):
+
+# pylint: disable=too-many-branches
+# pylint: disable=too-many-statements
+def objects_from_message(message, objnames, methods=None): # noqa: C901
"""
Obtain the iTip payload from email.message <message>
"""
@@ -30,7 +37,7 @@ def objects_from_message(message, objnames, methods=None):
# iTip methods we are actually interested in. Other methods will be ignored.
if methods is None:
- methods = [ "REQUEST", "CANCEL" ]
+ methods = ["REQUEST", "CANCEL"]
# Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A
# MIME body part containing content information that conforms to this
@@ -38,19 +45,23 @@ def objects_from_message(message, objnames, methods=None):
# therefore also be multipart.
# Check each part
+ # pylint: disable=too-many-nested-blocks
for part in message.walk():
# The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4)
# But in real word, other mime-types are used as well
- if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]:
- if not str(part.get_param('method')).upper() in methods:
- log.info(_("Method %r not really interesting for us.") % (part.get_param('method')))
+ if part.get_content_type() in ["text/calendar", "text/x-vcalendar", "application/ics"]:
+ if str(part.get_param('method')).upper() not in methods:
+ log.info("Method %r not really interesting for us." % (part.get_param('method')))
continue
# Get the itip_payload
itip_payload = part.get_payload(decode=True)
- log.debug(_("Raw iTip payload (%r): %r") % (part.get_param('charset'), itip_payload), level=8)
+ log.debug(
+ "Raw iTip payload (%r): %r" % (part.get_param('charset'), itip_payload),
+ level=8
+ )
# Convert unsupported timezones, etc.
itip_payload = _convert_itip_payload(itip_payload)
@@ -90,33 +101,37 @@ def objects_from_message(message, objnames, methods=None):
itip['type'] = 'task' if c.name == 'VTODO' else 'event'
itip['uid'] = str(c['uid'])
itip['method'] = str(cal['method']).upper()
- itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0
- itip['recurrence-id'] = c['recurrence-id'].dt if c.has_key('recurrence-id') and hasattr(c['recurrence-id'], 'dt') else None
+ itip['sequence'] = int(c['sequence']) if 'sequence' in c else 0
- if c.has_key('dtstart'):
+ itip['recurrence-id'] = None
+ if 'recurrence-id' in c:
+ if hasattr(c['recurrence-id'], 'dt'):
+ itip['recurrence-id'] = c['recurrence-id'].dt
+
+ if 'dtstart' in c:
itip['start'] = c['dtstart'].dt
elif itip['type'] == 'event':
log.error(_("iTip event without a start"))
continue
- if c.has_key('dtend'):
+ if 'dtend' in c:
itip['end'] = c['dtend'].dt
- if c.has_key('duration'):
+ if 'duration' in c:
itip['duration'] = c['duration'].dt
itip['end'] = itip['start'] + c['duration'].dt
# Outlook can send itip replies with no organizer property
- if c.has_key('organizer'):
+ if 'organizer' in c:
itip['organizer'] = c['organizer']
- if c.has_key('attendee'):
+ if 'attendee' in c:
itip['attendees'] = c['attendee']
- if itip.has_key('attendees') and not isinstance(itip['attendees'], list):
+ if 'attendees' in itip and not isinstance(itip['attendees'], list):
itip['attendees'] = [c['attendee']]
- if c.has_key('resources'):
+ if 'resources' in c:
itip['resources'] = c['resources']
itip['raw'] = itip_payload
@@ -127,8 +142,13 @@ def objects_from_message(message, objnames, methods=None):
itip['xml'] = todo_from_ical(c, itip_payload)
else:
itip['xml'] = event_from_ical(c, itip_payload)
- except Exception, e:
- log.error("event|todo_from_ical() exception: %r; iCal: %s" % (e, itip_payload))
+
+ # pylint: disable=broad-except
+ except Exception as e:
+ log.error(
+ "event|todo_from_ical() exception: %r; iCal: %s" % (e, itip_payload)
+ )
+
continue
itip_objects.append(itip)
@@ -148,6 +168,7 @@ def objects_from_message(message, objnames, methods=None):
return itip_objects
+
def check_event_conflict(kolab_event, itip_event):
"""
Determine whether the given kolab event conflicts with the given itip event
@@ -166,14 +187,11 @@ def check_event_conflict(kolab_event, itip_event):
return conflict
_es = to_dt(kolab_event.get_start())
- _ee = to_dt(kolab_event.get_ical_dtend()) # use iCal style end date: next day for all-day events
- _ev = kolab_event
- _ei = 0
+ # use iCal style end date: next day for all-day events
+ _ee = to_dt(kolab_event.get_ical_dtend())
_is = to_dt(itip_event['start'])
_ie = to_dt(itip_event['end'])
- _iv = itip_event['xml']
- _ii = 0
# Escape looping through anything if neither of the events is recurring.
if not itip_event['xml'].is_recurring() and not kolab_event.is_recurring():
@@ -193,10 +211,17 @@ def check_event_conflict(kolab_event, itip_event):
# the older one.
if _ee < _is:
while _ee < _is and _es is not None and kolab_event.is_recurring():
- log.debug("Attempt to move forward kolab event recurrence from %s closer to %s" % (_ee, _is), level=8)
+ log.debug(
+ "Attempt to move forward kolab event recurrence from {} closer to {}".format(
+ _ee,
+ _is
+ ),
+ level=8
+ )
+
__es = to_dt(kolab_event.get_next_occurence(_es))
- if not __es is None:
+ if __es is not None and not __es == _es:
_es = __es
_ee = to_dt(kolab_event.get_occurence_end_date(_es))
else:
@@ -207,10 +232,17 @@ def check_event_conflict(kolab_event, itip_event):
# prime spot, this time with the iTip event.
elif _ie < _es:
while _ie < _es and _is is not None and itip_event['xml'].is_recurring():
- log.debug("Attempt to move forward itip event recurrence from %s closer to %s" % (_ie, _es), level=8)
+ log.debug(
+ "Attempt to move forward itip event recurrence from {} closer to {}".format(
+ _ie,
+ _es
+ ),
+ level=8
+ )
+
__is = to_dt(itip_event['xml'].get_next_occurence(_is))
- if not __is is None:
+ if __is is not None and not _is == __is:
_is = __is
_ie = to_dt(itip_event['xml'].get_occurence_end_date(_is))
else:
@@ -219,7 +251,12 @@ def check_event_conflict(kolab_event, itip_event):
# Now that we have some events somewhere in the same neighborhood...
conflict = check_date_conflict(_es, _ee, _is, _ie)
- log.debug("* Comparing itip at %s/%s with kolab at %s/%s: %r (%d)" % (_is, _ie, _es, _ee, conflict, loop), level=8)
+ log.debug(
+ "* Comparing itip at %s/%s with kolab at %s/%s: conflict - %r (occurence - %d)" % (
+ _is, _ie, _es, _ee, conflict, loop
+ ),
+ level=8
+ )
if not conflict:
if kolab_event.is_recurring() and itip_event['xml'].is_recurring():
@@ -237,9 +274,11 @@ def check_event_conflict(kolab_event, itip_event):
return conflict
+
def _is_transparent(event):
return event.get_transparency() or event.get_status() == kolabformat.StatusCancelled
+
def _convert_itip_payload(itip):
matchlist = re.findall("^((DTSTART|DTEND|DUE|EXDATE|COMPLETED)[:;][^\n]+)$", itip, re.MULTILINE)
@@ -267,6 +306,7 @@ def _convert_itip_payload(itip):
return itip
+
def check_date_conflict(_es, _ee, _is, _ie):
"""
Check the given event start/end dates for conflicts
@@ -285,7 +325,7 @@ def check_date_conflict(_es, _ee, _is, _ie):
conflict = True
elif _es == _is:
conflict = True
- else: # _es > _is
+ else: # _es > _is
if _es < _ie:
conflict = True
else:
@@ -304,29 +344,47 @@ def send_reply(from_address, itip_events, response_text, subject=None):
smtp = None
if isinstance(itip_events, dict):
- itip_events = [ itip_events ]
+ itip_events = [itip_events]
for itip_event in itip_events:
attendee = itip_event['xml'].get_attendee_by_email(from_address)
participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee)
- log.debug(_("Send iTip reply %s for %s %r") % (participant_status, itip_event['xml'].type, itip_event['xml'].uid), level=8)
+ log.debug(
+ "Send iTip reply {} for {} {}".format(
+ participant_status,
+ itip_event['xml'].type,
+ itip_event['xml'].uid
+ ),
+ level=8
+ )
event_summary = itip_event['xml'].get_summary()
- message_text = response_text % { 'summary':event_summary, 'status':participant_status_label(participant_status), 'name':attendee.get_name() }
+ message_text = response_text % {
+ 'summary': event_summary,
+ 'status': participant_status_label(participant_status),
+ 'name': attendee.get_name()
+ }
if subject is not None:
- subject = subject % { 'summary':event_summary, 'status':participant_status_label(participant_status), 'name':attendee.get_name() }
+ subject = subject % {
+ 'summary': event_summary,
+ 'status': participant_status_label(participant_status),
+ 'name': attendee.get_name()
+ }
try:
- message = itip_event['xml'].to_message_itip(from_address,
+ message = itip_event['xml'].to_message_itip(
+ from_address,
method="REPLY",
participant_status=participant_status,
message_text=message_text,
subject=subject
)
- except Exception, e:
- log.error(_("Failed to compose iTip reply message: %r: %s") % (e, traceback.format_exc()))
+
+ # pylint: disable=broad-except
+ except Exception as e:
+ log.error("Failed to compose iTip reply message: %r: %s" % (e, traceback.format_exc()))
return
smtp = smtplib.SMTP("localhost", 10026) # replies go through wallace again
@@ -336,7 +394,9 @@ def send_reply(from_address, itip_events, response_text, subject=None):
try:
smtp.sendmail(message['From'], message['To'], message.as_string())
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception as e:
log.error(_("SMTP sendmail error: %r") % (e))
if smtp:
@@ -353,22 +413,25 @@ def send_request(to_address, itip_events, request_text, subject=None, direct=Fal
smtp = None
if isinstance(itip_events, dict):
- itip_events = [ itip_events ]
+ itip_events = [itip_events]
for itip_event in itip_events:
event_summary = itip_event['xml'].get_summary()
- message_text = request_text % { 'summary':event_summary }
+ message_text = request_text % {'summary': event_summary}
if subject is not None:
- subject = subject % { 'summary':event_summary }
+ subject = subject % {'summary': event_summary}
try:
- message = itip_event['xml'].to_message_itip(None,
+ message = itip_event['xml'].to_message_itip(
+ None,
method="REQUEST",
message_text=message_text,
subject=subject
)
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception as e:
log.error(_("Failed to compose iTip request message: %r") % (e))
return
@@ -380,7 +443,9 @@ def send_request(to_address, itip_events, request_text, subject=None, direct=Fal
try:
smtp.sendmail(message['From'], to_address, message.as_string())
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception as e:
log.error(_("SMTP sendmail error: %r") % (e))
if smtp: