diff options
author | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2016-03-01 11:05:09 +0100 |
---|---|---|
committer | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2016-03-01 11:05:09 +0100 |
commit | b52ba41c6ec774d381fda81cb0e72a68e7056644 (patch) | |
tree | 3769260bf302b3a1d897d8e16630c03a03502491 /pykolab/imap/cyrus.py | |
parent | 93baeadf7ee5744522a89e836740ff9e708c96f5 (diff) | |
download | pykolab-b52ba41c6ec774d381fda81cb0e72a68e7056644.tar.gz |
Fix PEP compliance
Diffstat (limited to 'pykolab/imap/cyrus.py')
-rw-r--r-- | pykolab/imap/cyrus.py | 241 |
1 files changed, 190 insertions, 51 deletions
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py index 4dd9284..b5f42cf 100644 --- a/pykolab/imap/cyrus.py +++ b/pykolab/imap/cyrus.py @@ -70,14 +70,19 @@ class Cyrus(cyruslib.CYRUS): self.server = hostname - self.uri = "%s://%s:%s" % (scheme,hostname,port) + self.uri = "%s://%s:%s" % (scheme, hostname, port) while 1: try: cyruslib.CYRUS.__init__(self, self.uri) break except cyruslib.CYRUSError: - log.warning(_("Could not connect to Cyrus IMAP server %r") % (self.uri)) + log.warning( + _("Could not connect to Cyrus IMAP server %r") % ( + self.uri + ) + ) + time.sleep(10) if conf.debuglevel > 8: @@ -135,16 +140,27 @@ class Cyrus(cyruslib.CYRUS): cyruslib.CYRUS.login(self, *args, **kw) self.separator = self.SEP - log.debug(_("Continuing with separator: %r") % (self.separator), level=8) + log.debug( + _("Continuing with separator: %r") % (self.separator), + level=8 + ) + self.murder = False for capability in self.m.capabilities: if capability.startswith("MUPDATE="): - log.debug(_("Detected we are running in a Murder topology"), level=8) + log.debug( + _("Detected we are running in a Murder topology"), + level=8 + ) + self.murder = True if not self.murder: - log.debug(_("This system is not part of a murder topology"), level=8) + log.debug( + _("This system is not part of a murder topology"), + level=8 + ) def find_mailfolder_server(self, mailfolder): annotations = {} @@ -154,7 +170,12 @@ class Cyrus(cyruslib.CYRUS): prefix = _mailfolder['path_parts'].pop(0) mbox = _mailfolder['path_parts'].pop(0) if not _mailfolder['domain'] == None: - mailfolder = "%s%s%s@%s" % (prefix,self.separator,mbox,_mailfolder['domain']) + mailfolder = "%s%s%s@%s" % ( + prefix, + self.separator, + mbox, + _mailfolder['domain'] + ) # TODO: Workaround for undelete if len(self.lm(mailfolder)) < 1: @@ -165,14 +186,20 @@ class Cyrus(cyruslib.CYRUS): if not self.murder: return self.server - log.debug(_("Checking actual backend server for folder %s through annotations") % (mailfolder), level=8) + log.debug( + _("Checking actual backend server for folder %s " + + "through annotations") % ( + mailfolder + ), + level=8 + ) if self.mbox.has_key(mailfolder): log.debug( _( - "Possibly reproducing the find " + \ - "mailfolder server answer from " + \ - "previously detected and stored " + \ + "Possibly reproducing the find " + + "mailfolder server answer from " + + "previously detected and stored " + "annotation value: %r" ) % ( self.mbox[mailfolder] @@ -185,26 +212,53 @@ class Cyrus(cyruslib.CYRUS): max_tries = 20 num_try = 0 + + annotation_path = "/shared/vendor/cmu/cyrus-imapd/server" + while 1: num_try += 1 - annotations = self._getannotation(mailfolder, "/vendor/cmu/cyrus-imapd/server") + annotations = self._getannotation( + mailfolder, + annotation_path + ) if annotations.has_key(mailfolder): break if max_tries <= num_try: - log.error(_("Could not get the annotations after %s tries.") % (num_try)) - annotations = { mailfolder: { '/shared/vendor/cmu/cyrus-imapd/server': self.server }} + log.error( + _("Could not get the annotations after %s tries.") % ( + num_try + ) + ) + + annotations = { + mailfolder: { + annotation_path: self.server + } + } + break - log.warning(_("No annotations for %s: %r") % (mailfolder,annotations)) + log.warning( + _("No annotations for %s: %r") % ( + mailfolder, + annotations + ) + ) time.sleep(1) - server = annotations[mailfolder]['/shared/vendor/cmu/cyrus-imapd/server'] + server = annotations[mailfolder][annotation_path] self.mbox[mailfolder] = server - log.debug(_("Server for INBOX folder %s is %s") % (mailfolder,server), level=8) + log.debug( + _("Server for INBOX folder %s is %s") % ( + mailfolder, + server + ), + level=8 + ) return server @@ -221,14 +275,24 @@ class Cyrus(cyruslib.CYRUS): Login to the actual backend server. """ server = self.find_mailfolder_server(mailfolder) - #print "server:", server self.connect(self.uri.replace(self.server,server)) - log.debug(_("Setting quota for folder %s to %s") % (mailfolder,quota), level=8) + log.debug( + _("Setting quota for folder %s to %s") % ( + mailfolder, + quota + ), + level=8 + ) + try: self.m.setquota(mailfolder, quota) except: - log.error(_("Could not set quota for mailfolder %s") % (mailfolder)) + log.error( + _("Could not set quota for mailfolder %s") % ( + mailfolder + ) + ) def _rename(self, from_mailfolder, to_mailfolder, partition=None): """ @@ -238,11 +302,29 @@ class Cyrus(cyruslib.CYRUS): self.connect(self.uri.replace(self.server,server)) if not partition == None: - log.debug(_("Moving INBOX folder %s to %s on partition %s") % (from_mailfolder,to_mailfolder, partition), level=8) + log.debug( + _("Moving INBOX folder %s to %s on partition %s") % ( + from_mailfolder, + to_mailfolder, + partition + ), + level=8 + ) + else: - log.debug(_("Moving INBOX folder %s to %s") % (from_mailfolder,to_mailfolder), level=8) + log.debug( + _("Moving INBOX folder %s to %s") % ( + from_mailfolder, + to_mailfolder + ), + level=8 + ) - self.m.rename(self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), '"%s"' % (partition)) + self.m.rename( + self.folder_utf7(from_mailfolder), + self.folder_utf7(to_mailfolder), + '"%s"' % (partition) + ) def _getannotation(self, *args, **kw): return self.getannotation(*args, **kw) @@ -256,16 +338,36 @@ class Cyrus(cyruslib.CYRUS): except: server = self.server - log.debug(_("Setting annotation %s on folder %s") % (annotation,mailfolder), level=8) + log.debug( + _("Setting annotation %s on folder %s") % ( + annotation, + mailfolder + ), + level=8 + ) try: self.setannotation(mailfolder, annotation, value, shared) - except cyruslib.CYRUSError, e: - log.error(_("Could not set annotation %r on mail folder %r: %r") % (annotation,mailfolder,e)) + except cyruslib.CYRUSError, errmsg: + log.error( + _("Could not set annotation %r on mail folder %r: %r") % ( + annotation, + mailfolder, + errmsg + ) + ) def _xfer(self, mailfolder, current_server, new_server): self.connect(self.uri.replace(self.server,current_server)) - log.debug(_("Transferring folder %s from %s to %s") % (mailfolder, current_server, new_server), level=8) + log.debug( + _("Transferring folder %s from %s to %s") % ( + mailfolder, + current_server, + new_server + ), + level=8 + ) + self.xfer(mailfolder, new_server) def undelete_mailfolder(self, mailfolder, to_mailfolder=None, recursive=True): @@ -307,25 +409,42 @@ class Cyrus(cyruslib.CYRUS): mbox = undelete_mbox['path_parts'].pop(0) if to_mailfolder == None: - target_folder = self.separator.join([prefix,mbox]) + target_folder = self.separator.join([prefix, mbox]) else: target_folder = self.separator.join(target_mbox['path_parts']) if not to_mailfolder == None: - target_folder = "%s%s%s" % (target_folder,self.separator,mbox) + target_folder = "%s%s%s" % ( + target_folder, + self.separator, + mbox + ) if not len(undelete_mbox['path_parts']) == 0: - target_folder = "%s%s%s" % (target_folder,self.separator,self.separator.join(undelete_mbox['path_parts'])) + target_folder = "%s%s%s" % ( + target_folder, + self.separator, + self.separator.join(undelete_mbox['path_parts']) + ) if target_folder in target_folders: - target_folder = "%s%s%s" % (target_folder,self.separator,undelete_mbox['hex_timestamp']) + target_folder = "%s%s%s" % ( + target_folder, + self.separator, + undelete_mbox['hex_timestamp'] + ) target_folders.append(target_folder) if not target_mbox['domain'] == None: target_folder = "%s@%s" % (target_folder,target_mbox['domain']) - log.info(_("Undeleting %s to %s") % (undelete_folder,target_folder)) + log.info( + _("Undeleting %s to %s") % ( + undelete_folder, + target_folder + ) + ) target_server = self.find_mailfolder_server(target_folder) @@ -336,9 +455,18 @@ class Cyrus(cyruslib.CYRUS): self.rename(undelete_folder,target_folder) else: if not target_server == self.server: - print >> sys.stdout, _("Would have transferred %s from %s to %s") % (undelete_folder, self.server, target_server) - - print >> sys.stdout, _("Would have renamed %s to %s") % (undelete_folder, target_folder) + print >> sys.stdout, \ + _("Would have transferred %s from %s to %s") % ( + undelete_folder, + self.server, + target_server + ) + + print >> sys.stdout, \ + _("Would have renamed %s to %s") % ( + undelete_folder, + target_folder + ) def parse_mailfolder(self, mailfolder): """ @@ -369,7 +497,8 @@ class Cyrus(cyruslib.CYRUS): # This prevents "DELETED/user/userid/Sent", but not # "DELETED/user/userid/FFFFFF" from being specified. try: - epoch = int(mbox['path_parts'][(len(mbox['path_parts'])-1)], 16) + hexstamp = mbox['path_parts'][(len(mbox['path_parts'])-1)] + epoch = int(hexstamp, 16) try: timestamp = time.asctime(time.gmtime(epoch)) except: @@ -386,21 +515,27 @@ class Cyrus(cyruslib.CYRUS): } if not mbox['domain'] == None: - verify_folder_search = "%s@%s" % (verify_folder_search, mbox['domain']) + verify_folder_search = "%s@%s" % ( + verify_folder_search, + mbox['domain'] + ) if ' ' in verify_folder_search: - folders = self.lm('"%s"' % self.folder_utf7(verify_folder_search)) + folders = self.lm( + '"%s"' % self.folder_utf7(verify_folder_search) + ) + else: folders = self.lm(self.folder_utf7(verify_folder_search)) - # NOTE: Case also covered is valid hexadecimal folders; won't be the - # actual check as intended, but doesn't give you anyone else's data - # unless... See the following: + # NOTE: Case also covered is valid hexadecimal folders; won't be + # the actual check as intended, but doesn't give you anyone else's + # data unless... See the following: # # TODO: Case not covered is usernames that are hexadecimal. # - # We could probably attempt to convert the int(hex) into a time.gmtime(), - # but it still would not cover all cases. + # We could probably attempt to convert the int(hex) into a + # time.gmtime(), but it still would not cover all cases. # # If no folders were found... well... then there you go. @@ -414,20 +549,24 @@ class Cyrus(cyruslib.CYRUS): def _find_deleted_folder(self, mbox): """ - Give me the parts that are in an original mailfolder name and I'll find - the deleted folder name. + Give me the parts that are in an original mailfolder name and I'll + find the deleted folder name. TODO: It finds virtdomain folders for non-virtdomain searches. """ - deleted_folder_search = "%(deleted_prefix)s%(separator)s%(mailfolder)s%(separator)s*" % { - # TODO: The prefix used is configurable - 'deleted_prefix': "DELETED", - 'mailfolder': self.separator.join(mbox['path_parts']), - 'separator': self.separator, - } + deleted_folder_search = "%(deleted_prefix)s%(separator)s" + \ + "%(mailfolder)s%(separator)s*" % { + # TODO: The prefix used is configurable + 'deleted_prefix': "DELETED", + 'mailfolder': self.separator.join(mbox['path_parts']), + 'separator': self.separator, + } if not mbox['domain'] == None: - deleted_folder_search = "%s@%s" % (deleted_folder_search,mbox['domain']) + deleted_folder_search = "%s@%s" % ( + deleted_folder_search, + mbox['domain'] + ) folders = self.lm(deleted_folder_search) |