summaryrefslogtreecommitdiffstats
path: root/pykolab/imap/cyrus.py
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2016-03-01 11:05:09 +0100
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2016-03-01 11:05:09 +0100
commitb52ba41c6ec774d381fda81cb0e72a68e7056644 (patch)
tree3769260bf302b3a1d897d8e16630c03a03502491 /pykolab/imap/cyrus.py
parent93baeadf7ee5744522a89e836740ff9e708c96f5 (diff)
downloadpykolab-b52ba41c6ec774d381fda81cb0e72a68e7056644.tar.gz
Fix PEP compliance
Diffstat (limited to 'pykolab/imap/cyrus.py')
-rw-r--r--pykolab/imap/cyrus.py241
1 files changed, 190 insertions, 51 deletions
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py
index 4dd9284..b5f42cf 100644
--- a/pykolab/imap/cyrus.py
+++ b/pykolab/imap/cyrus.py
@@ -70,14 +70,19 @@ class Cyrus(cyruslib.CYRUS):
self.server = hostname
- self.uri = "%s://%s:%s" % (scheme,hostname,port)
+ self.uri = "%s://%s:%s" % (scheme, hostname, port)
while 1:
try:
cyruslib.CYRUS.__init__(self, self.uri)
break
except cyruslib.CYRUSError:
- log.warning(_("Could not connect to Cyrus IMAP server %r") % (self.uri))
+ log.warning(
+ _("Could not connect to Cyrus IMAP server %r") % (
+ self.uri
+ )
+ )
+
time.sleep(10)
if conf.debuglevel > 8:
@@ -135,16 +140,27 @@ class Cyrus(cyruslib.CYRUS):
cyruslib.CYRUS.login(self, *args, **kw)
self.separator = self.SEP
- log.debug(_("Continuing with separator: %r") % (self.separator), level=8)
+ log.debug(
+ _("Continuing with separator: %r") % (self.separator),
+ level=8
+ )
+
self.murder = False
for capability in self.m.capabilities:
if capability.startswith("MUPDATE="):
- log.debug(_("Detected we are running in a Murder topology"), level=8)
+ log.debug(
+ _("Detected we are running in a Murder topology"),
+ level=8
+ )
+
self.murder = True
if not self.murder:
- log.debug(_("This system is not part of a murder topology"), level=8)
+ log.debug(
+ _("This system is not part of a murder topology"),
+ level=8
+ )
def find_mailfolder_server(self, mailfolder):
annotations = {}
@@ -154,7 +170,12 @@ class Cyrus(cyruslib.CYRUS):
prefix = _mailfolder['path_parts'].pop(0)
mbox = _mailfolder['path_parts'].pop(0)
if not _mailfolder['domain'] == None:
- mailfolder = "%s%s%s@%s" % (prefix,self.separator,mbox,_mailfolder['domain'])
+ mailfolder = "%s%s%s@%s" % (
+ prefix,
+ self.separator,
+ mbox,
+ _mailfolder['domain']
+ )
# TODO: Workaround for undelete
if len(self.lm(mailfolder)) < 1:
@@ -165,14 +186,20 @@ class Cyrus(cyruslib.CYRUS):
if not self.murder:
return self.server
- log.debug(_("Checking actual backend server for folder %s through annotations") % (mailfolder), level=8)
+ log.debug(
+ _("Checking actual backend server for folder %s " +
+ "through annotations") % (
+ mailfolder
+ ),
+ level=8
+ )
if self.mbox.has_key(mailfolder):
log.debug(
_(
- "Possibly reproducing the find " + \
- "mailfolder server answer from " + \
- "previously detected and stored " + \
+ "Possibly reproducing the find " +
+ "mailfolder server answer from " +
+ "previously detected and stored " +
"annotation value: %r"
) % (
self.mbox[mailfolder]
@@ -185,26 +212,53 @@ class Cyrus(cyruslib.CYRUS):
max_tries = 20
num_try = 0
+
+ annotation_path = "/shared/vendor/cmu/cyrus-imapd/server"
+
while 1:
num_try += 1
- annotations = self._getannotation(mailfolder, "/vendor/cmu/cyrus-imapd/server")
+ annotations = self._getannotation(
+ mailfolder,
+ annotation_path
+ )
if annotations.has_key(mailfolder):
break
if max_tries <= num_try:
- log.error(_("Could not get the annotations after %s tries.") % (num_try))
- annotations = { mailfolder: { '/shared/vendor/cmu/cyrus-imapd/server': self.server }}
+ log.error(
+ _("Could not get the annotations after %s tries.") % (
+ num_try
+ )
+ )
+
+ annotations = {
+ mailfolder: {
+ annotation_path: self.server
+ }
+ }
+
break
- log.warning(_("No annotations for %s: %r") % (mailfolder,annotations))
+ log.warning(
+ _("No annotations for %s: %r") % (
+ mailfolder,
+ annotations
+ )
+ )
time.sleep(1)
- server = annotations[mailfolder]['/shared/vendor/cmu/cyrus-imapd/server']
+ server = annotations[mailfolder][annotation_path]
self.mbox[mailfolder] = server
- log.debug(_("Server for INBOX folder %s is %s") % (mailfolder,server), level=8)
+ log.debug(
+ _("Server for INBOX folder %s is %s") % (
+ mailfolder,
+ server
+ ),
+ level=8
+ )
return server
@@ -221,14 +275,24 @@ class Cyrus(cyruslib.CYRUS):
Login to the actual backend server.
"""
server = self.find_mailfolder_server(mailfolder)
- #print "server:", server
self.connect(self.uri.replace(self.server,server))
- log.debug(_("Setting quota for folder %s to %s") % (mailfolder,quota), level=8)
+ log.debug(
+ _("Setting quota for folder %s to %s") % (
+ mailfolder,
+ quota
+ ),
+ level=8
+ )
+
try:
self.m.setquota(mailfolder, quota)
except:
- log.error(_("Could not set quota for mailfolder %s") % (mailfolder))
+ log.error(
+ _("Could not set quota for mailfolder %s") % (
+ mailfolder
+ )
+ )
def _rename(self, from_mailfolder, to_mailfolder, partition=None):
"""
@@ -238,11 +302,29 @@ class Cyrus(cyruslib.CYRUS):
self.connect(self.uri.replace(self.server,server))
if not partition == None:
- log.debug(_("Moving INBOX folder %s to %s on partition %s") % (from_mailfolder,to_mailfolder, partition), level=8)
+ log.debug(
+ _("Moving INBOX folder %s to %s on partition %s") % (
+ from_mailfolder,
+ to_mailfolder,
+ partition
+ ),
+ level=8
+ )
+
else:
- log.debug(_("Moving INBOX folder %s to %s") % (from_mailfolder,to_mailfolder), level=8)
+ log.debug(
+ _("Moving INBOX folder %s to %s") % (
+ from_mailfolder,
+ to_mailfolder
+ ),
+ level=8
+ )
- self.m.rename(self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), '"%s"' % (partition))
+ self.m.rename(
+ self.folder_utf7(from_mailfolder),
+ self.folder_utf7(to_mailfolder),
+ '"%s"' % (partition)
+ )
def _getannotation(self, *args, **kw):
return self.getannotation(*args, **kw)
@@ -256,16 +338,36 @@ class Cyrus(cyruslib.CYRUS):
except:
server = self.server
- log.debug(_("Setting annotation %s on folder %s") % (annotation,mailfolder), level=8)
+ log.debug(
+ _("Setting annotation %s on folder %s") % (
+ annotation,
+ mailfolder
+ ),
+ level=8
+ )
try:
self.setannotation(mailfolder, annotation, value, shared)
- except cyruslib.CYRUSError, e:
- log.error(_("Could not set annotation %r on mail folder %r: %r") % (annotation,mailfolder,e))
+ except cyruslib.CYRUSError, errmsg:
+ log.error(
+ _("Could not set annotation %r on mail folder %r: %r") % (
+ annotation,
+ mailfolder,
+ errmsg
+ )
+ )
def _xfer(self, mailfolder, current_server, new_server):
self.connect(self.uri.replace(self.server,current_server))
- log.debug(_("Transferring folder %s from %s to %s") % (mailfolder, current_server, new_server), level=8)
+ log.debug(
+ _("Transferring folder %s from %s to %s") % (
+ mailfolder,
+ current_server,
+ new_server
+ ),
+ level=8
+ )
+
self.xfer(mailfolder, new_server)
def undelete_mailfolder(self, mailfolder, to_mailfolder=None, recursive=True):
@@ -307,25 +409,42 @@ class Cyrus(cyruslib.CYRUS):
mbox = undelete_mbox['path_parts'].pop(0)
if to_mailfolder == None:
- target_folder = self.separator.join([prefix,mbox])
+ target_folder = self.separator.join([prefix, mbox])
else:
target_folder = self.separator.join(target_mbox['path_parts'])
if not to_mailfolder == None:
- target_folder = "%s%s%s" % (target_folder,self.separator,mbox)
+ target_folder = "%s%s%s" % (
+ target_folder,
+ self.separator,
+ mbox
+ )
if not len(undelete_mbox['path_parts']) == 0:
- target_folder = "%s%s%s" % (target_folder,self.separator,self.separator.join(undelete_mbox['path_parts']))
+ target_folder = "%s%s%s" % (
+ target_folder,
+ self.separator,
+ self.separator.join(undelete_mbox['path_parts'])
+ )
if target_folder in target_folders:
- target_folder = "%s%s%s" % (target_folder,self.separator,undelete_mbox['hex_timestamp'])
+ target_folder = "%s%s%s" % (
+ target_folder,
+ self.separator,
+ undelete_mbox['hex_timestamp']
+ )
target_folders.append(target_folder)
if not target_mbox['domain'] == None:
target_folder = "%s@%s" % (target_folder,target_mbox['domain'])
- log.info(_("Undeleting %s to %s") % (undelete_folder,target_folder))
+ log.info(
+ _("Undeleting %s to %s") % (
+ undelete_folder,
+ target_folder
+ )
+ )
target_server = self.find_mailfolder_server(target_folder)
@@ -336,9 +455,18 @@ class Cyrus(cyruslib.CYRUS):
self.rename(undelete_folder,target_folder)
else:
if not target_server == self.server:
- print >> sys.stdout, _("Would have transferred %s from %s to %s") % (undelete_folder, self.server, target_server)
-
- print >> sys.stdout, _("Would have renamed %s to %s") % (undelete_folder, target_folder)
+ print >> sys.stdout, \
+ _("Would have transferred %s from %s to %s") % (
+ undelete_folder,
+ self.server,
+ target_server
+ )
+
+ print >> sys.stdout, \
+ _("Would have renamed %s to %s") % (
+ undelete_folder,
+ target_folder
+ )
def parse_mailfolder(self, mailfolder):
"""
@@ -369,7 +497,8 @@ class Cyrus(cyruslib.CYRUS):
# This prevents "DELETED/user/userid/Sent", but not
# "DELETED/user/userid/FFFFFF" from being specified.
try:
- epoch = int(mbox['path_parts'][(len(mbox['path_parts'])-1)], 16)
+ hexstamp = mbox['path_parts'][(len(mbox['path_parts'])-1)]
+ epoch = int(hexstamp, 16)
try:
timestamp = time.asctime(time.gmtime(epoch))
except:
@@ -386,21 +515,27 @@ class Cyrus(cyruslib.CYRUS):
}
if not mbox['domain'] == None:
- verify_folder_search = "%s@%s" % (verify_folder_search, mbox['domain'])
+ verify_folder_search = "%s@%s" % (
+ verify_folder_search,
+ mbox['domain']
+ )
if ' ' in verify_folder_search:
- folders = self.lm('"%s"' % self.folder_utf7(verify_folder_search))
+ folders = self.lm(
+ '"%s"' % self.folder_utf7(verify_folder_search)
+ )
+
else:
folders = self.lm(self.folder_utf7(verify_folder_search))
- # NOTE: Case also covered is valid hexadecimal folders; won't be the
- # actual check as intended, but doesn't give you anyone else's data
- # unless... See the following:
+ # NOTE: Case also covered is valid hexadecimal folders; won't be
+ # the actual check as intended, but doesn't give you anyone else's
+ # data unless... See the following:
#
# TODO: Case not covered is usernames that are hexadecimal.
#
- # We could probably attempt to convert the int(hex) into a time.gmtime(),
- # but it still would not cover all cases.
+ # We could probably attempt to convert the int(hex) into a
+ # time.gmtime(), but it still would not cover all cases.
#
# If no folders were found... well... then there you go.
@@ -414,20 +549,24 @@ class Cyrus(cyruslib.CYRUS):
def _find_deleted_folder(self, mbox):
"""
- Give me the parts that are in an original mailfolder name and I'll find
- the deleted folder name.
+ Give me the parts that are in an original mailfolder name and I'll
+ find the deleted folder name.
TODO: It finds virtdomain folders for non-virtdomain searches.
"""
- deleted_folder_search = "%(deleted_prefix)s%(separator)s%(mailfolder)s%(separator)s*" % {
- # TODO: The prefix used is configurable
- 'deleted_prefix': "DELETED",
- 'mailfolder': self.separator.join(mbox['path_parts']),
- 'separator': self.separator,
- }
+ deleted_folder_search = "%(deleted_prefix)s%(separator)s" + \
+ "%(mailfolder)s%(separator)s*" % {
+ # TODO: The prefix used is configurable
+ 'deleted_prefix': "DELETED",
+ 'mailfolder': self.separator.join(mbox['path_parts']),
+ 'separator': self.separator,
+ }
if not mbox['domain'] == None:
- deleted_folder_search = "%s@%s" % (deleted_folder_search,mbox['domain'])
+ deleted_folder_search = "%s@%s" % (
+ deleted_folder_search,
+ mbox['domain']
+ )
folders = self.lm(deleted_folder_search)