diff options
Diffstat (limited to 'pykolab/imap/__init__.py')
-rw-r--r-- | pykolab/imap/__init__.py | 187 |
1 files changed, 93 insertions, 94 deletions
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py index 205deaa..3fc72d3 100644 --- a/pykolab/imap/__init__.py +++ b/pykolab/imap/__init__.py @@ -54,20 +54,16 @@ class IMAP(object): if len(aci_subject.split('@')) > 1: lm_suffix = "@%s" % (aci_subject.split('@')[1]) - shared_folders = self.imap.lm( - "shared/*%s" % (lm_suffix) - ) + shared_folders = self.imap.lm("shared/*%s" % (lm_suffix)) - user_folders = self.imap.lm( - "user/*%s" % (lm_suffix) - ) + user_folders = self.imap.lm("user/*%s" % (lm_suffix)) log.debug( - _("Cleaning up ACL entries referring to identifier %s") % ( - aci_subject - ), - level=5 - ) + _("Cleaning up ACL entries referring to identifier %s") % ( + aci_subject + ), + level=5 + ) # For all folders (shared and user), ... folders = user_folders + shared_folders @@ -128,12 +124,12 @@ class IMAP(object): # deployment. backend = conf.get('kolab', 'imap_backend') - if not domain == None: + if domain is not None: self.domain = domain if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') - if uri == None: + if uri is None: if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): uri = conf.get(domain, 'imap_uri') else: @@ -143,7 +139,7 @@ class IMAP(object): hostname = None port = None - if uri == None: + if uri is None: uri = conf.get(backend, 'uri') result = urlparse(uri) @@ -162,13 +158,13 @@ class IMAP(object): scheme = uri.split(':')[0] (hostname, port) = uri.split('/')[2].split(':') - if not server == None: + if server is not None: hostname = server - if scheme == None or scheme == "": + if scheme is None or scheme == "": scheme = 'imaps' - if port == None: + if port is None: if scheme == "imaps": port = 993 elif scheme == "imap": @@ -182,10 +178,10 @@ class IMAP(object): admin_login = conf.get(backend, 'admin_login') admin_password = conf.get(backend, 'admin_password') - if admin_password == None or admin_password == '': + if admin_password is None or admin_password == '': log.error(_("No administrator password is available.")) - if not self._imap.has_key(hostname): + if hostname not in self._imap: if backend == 'cyrus-imap': import cyrus self._imap[hostname] = cyrus.Cyrus(uri) @@ -216,19 +212,25 @@ class IMAP(object): else: if not login: self.disconnect(hostname) - self.connect(uri=uri,login=False) - elif login and not hasattr(self._imap[hostname],'logged_in'): + self.connect(uri=uri, login=False) + elif login and not hasattr(self._imap[hostname], 'logged_in'): self.disconnect(hostname) self.connect(uri=uri) else: try: if hasattr(self._imap[hostname], 'm'): self._imap[hostname].m.noop() - elif hasattr(self._imap[hostname], 'noop') and callable(self._imap[hostname].noop): + elif hasattr(self._imap[hostname], 'noop') \ + and callable(self._imap[hostname].noop): + self._imap[hostname].noop() - log.debug(_("Reusing existing IMAP server connection to %s") % (hostname), level=8) - except: + log.debug( + _("Reusing existing IMAP server connection to %s") % (hostname), + level=8 + ) + + except Exception: log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8) self.disconnect(hostname) self.connect() @@ -243,7 +245,7 @@ class IMAP(object): self._set_socket_keepalive(self.imap.sock) def disconnect(self, server=None): - if server == None: + if server is None: # No server specified, but make sure self.imap is None anyways if hasattr(self, 'imap'): del self.imap @@ -253,33 +255,30 @@ class IMAP(object): del self._imap[key] else: - if self._imap.has_key(server): + if server in self._imap: del self._imap[server] else: - log.warning(_("Called imap.disconnect() on a server that we had no connection to.")) + log.warning( + _("Called imap.disconnect() on a server that we had no connection to.") + ) def create_folder(self, folder_path, server=None, partition=None): folder_path = self.folder_utf7(folder_path) - if not server == None: + if server is not None: self.connect(server=server) try: self._imap[server].cm(folder_path, partition=partition) return True - except: - log.error( - _("Could not create folder %r on server %r") % ( - folder_path, - server - ) - ) + except Exception: + log.error(_("Could not create folder %r on server %r") % (folder_path, server)) else: try: self.imap.cm(folder_path, partition=partition) return True - except: + except Exception: log.error(_("Could not create folder %r") % (folder_path)) return False @@ -290,9 +289,9 @@ class IMAP(object): if hasattr(self.imap.m, name): return getattr(self.imap.m, name) else: - raise AttributeError, _("%r has no attribute %s") % (self,name) + raise AttributeError(_("%r has no attribute %s") % (self, name)) else: - raise AttributeError, _("%r has no attribute %s") % (self,name) + raise AttributeError(_("%r has no attribute %s") % (self, name)) def folder_utf7(self, folder): from pykolab import imap_utf7 @@ -313,13 +312,13 @@ class IMAP(object): _metadata = self.imap.getannotation(self.folder_utf7(folder), '*') - for (k,v) in _metadata.items(): + for (k, v) in _metadata.items(): metadata[self.folder_utf8(k)] = v return metadata def get_separator(self): - if not hasattr(self, 'imap') or self.imap == None: + if not hasattr(self, 'imap') or self.imap is None: self.connect() if hasattr(self.imap, 'separator'): @@ -357,13 +356,13 @@ class IMAP(object): if len(_namespaces) >= 3: _shared = [] - _shared.append(' '.join(_namespaces[2].replace('((','').replace('))','').split()[:-1]).replace('"', '')) + _shared.append(' '.join(_namespaces[2].replace('((', '').replace('))', '').split()[:-1]).replace('"', '')) if len(_namespaces) >= 2: - _other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '') + _other_users = ' '.join(_namespaces[1].replace('((', '').replace('))', '').split()[:-1]).replace('"', '') if len(_namespaces) >= 1: - _personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '') + _personal = _namespaces[0].replace('((', '').replace('))', '').split()[0].replace('"', '') return (_personal, _other_users, _shared) @@ -385,7 +384,7 @@ class IMAP(object): 'write': 'lrswite', } - if short_rights.has_key(acl): + if acl in short_rights: acl = short_rights[acl] else: for char in acl: @@ -428,7 +427,7 @@ class IMAP(object): try: self.imap.sam(self.folder_utf7(folder), identifier, acl) - except Exception, errmsg: + except Exception as errmsg: log.error( _("Could not set ACL for %s on folder %s: %r") % ( identifier, @@ -544,10 +543,10 @@ class IMAP(object): if not hasattr(self, 'domain'): self.domain = None - if self.domain == None and len(mailbox_base_name.split('@')) > 1: + if self.domain is None and len(mailbox_base_name.split('@')) > 1: self.domain = mailbox_base_name.split('@')[1] - if not self.domain == None: + if not self.domain is None: if conf.has_option(self.domain, "autocreate_folders"): _additional_folders = conf.get_raw( self.domain, @@ -564,7 +563,7 @@ class IMAP(object): auth.disconnect() if len(domains.keys()) > 0: - if domains.has_key(self.domain): + if self.domain in domains: primary = domains[self.domain] if conf.has_option(primary, "autocreate_folders"): @@ -573,7 +572,7 @@ class IMAP(object): "autocreate_folders" ) - if _additional_folders == None: + if _additional_folders is None: if conf.has_option('kolab', "autocreate_folders"): _additional_folders = conf.get_raw( 'kolab', @@ -588,13 +587,13 @@ class IMAP(object): } ) - if not additional_folders == None: + if additional_folders is not None: self.user_mailbox_create_additional_folders( mailbox_base_name, additional_folders ) - if not self.domain == None: + if not self.domain is None: if conf.has_option(self.domain, "sieve_mgmt"): sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') if utils.true_or_false(sieve_mgmt_enabled): @@ -634,7 +633,7 @@ class IMAP(object): self.login_plain(admin_login, admin_password, user) (personal, other, shared) = self.namespaces() success = True - except Exception, errmsg: + except Exception as errmsg: if time.time() - last_log > 5 and self.imap_murder(): log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg)) last_log = time.time() @@ -661,7 +660,7 @@ class IMAP(object): log.warning(_("Failed to create folder: %s") % (folder_name)) continue - if additional_folders[additional_folder].has_key("annotations"): + if "annotations" in additional_folders[additional_folder]: for annotation in additional_folders[additional_folder]["annotations"].keys(): self.set_metadata( folder_name, @@ -669,7 +668,7 @@ class IMAP(object): "%s" % (additional_folders[additional_folder]["annotations"][annotation]) ) - if additional_folders[additional_folder].has_key("acls"): + if "acls" in additional_folders[additional_folder]: for acl in additional_folders[additional_folder]["acls"].keys(): self.set_acl( folder_name, @@ -686,7 +685,7 @@ class IMAP(object): domain = None domain_suffix = "" - if not domain == None: + if domain is not None: if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') @@ -702,10 +701,10 @@ class IMAP(object): # Subscribe only to personal folders (personal, other, shared) = self.namespaces() - if not other == None: + if other is not None: _tests.append(other) - if not shared == None: + if shared is not None: for _shared in shared: _tests.append(_shared) @@ -729,7 +728,7 @@ class IMAP(object): log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8) try: self.subscribe(_folder) - except Exception, errmsg: + except Exception as errmsg: log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) self.logout() @@ -749,16 +748,16 @@ class IMAP(object): domain_suffix ) - if additional_folders[additional_folder].has_key("quota"): + if "quota" in additional_folders[additional_folder]: try: self.imap.sq( folder_name, additional_folders[additional_folder]['quota'] ) - except Exception, errmsg: + except Exception as errmsg: log.error(_("Could not set quota on %s") % (additional_folder)) - if additional_folders[additional_folder].has_key("partition"): + if "partition" in additional_folders[additional_folder]: partition = additional_folders[additional_folder]["partition"] try: self.imap._rename(folder_name, folder_name, partition) @@ -815,7 +814,7 @@ class IMAP(object): """ self.connect() - folder = "user%s%s" %(self.get_separator(),mailbox_base_name) + folder = "user%s%s" %(self.get_separator(), mailbox_base_name) self.delete_mailfolder(folder) self.cleanup_acls(mailbox_base_name) @@ -837,23 +836,23 @@ class IMAP(object): def user_mailbox_rename(self, old_name, new_name, partition=None): self.connect() - old_name = "user%s%s" % (self.get_separator(),old_name) - new_name = "user%s%s" % (self.get_separator(),new_name) + old_name = "user%s%s" % (self.get_separator(), old_name) + new_name = "user%s%s" % (self.get_separator(), new_name) - if old_name == new_name and partition == None: + if old_name == new_name and partition is None: return if not self.has_folder(old_name): log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) - if not self.has_folder(new_name) or not partition == None: - log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) + if not self.has_folder(new_name) or not partition is None: + log.info(_("Renaming INBOX from %s to %s") % (old_name, new_name)) try: - self.imap.rename(old_name,new_name,partition) + self.imap.rename(old_name, new_name, partition) except: - log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name)) + log.error(_("Could not rename INBOX folder %s to %s") % (old_name, new_name)) else: - log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) + log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name, new_name)) def user_mailbox_server(self, mailbox): server = self.imap.find_mailfolder_server(mailbox.lower()).lower() @@ -865,7 +864,7 @@ class IMAP(object): Check if the environment has a folder named folder. """ folders = self.imap.lm(self.folder_utf7(folder)) - log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8) + log.debug(_("Looking for folder '%s', we found folders: %r") % (folder, [self.folder_utf8(x) for x in folders]), level=8) # Greater then one, this folder may have subfolders. if len(folders) > 0: return True @@ -924,7 +923,7 @@ class IMAP(object): if epoch > (int)(time.time()): log.debug( _("Setting ACL rights %s for subject %s on folder " + \ - "%s") % (rights,subject,folder), level=8) + "%s") % (rights, subject, folder), level=8) self.set_acl( folder, @@ -935,7 +934,7 @@ class IMAP(object): else: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ - "%s") % (rights,subject,folder), level=8) + "%s") % (rights, subject, folder), level=8) self.set_acl( folder, @@ -959,7 +958,7 @@ class IMAP(object): def move_user_folders(self, users=[], domain=None): for user in users: if type(user) == dict: - if user.has_key('old_mail'): + if 'old_mail' in user: inbox = "user/%s" % (user['mail']) old_inbox = "user/%s" % (user['old_mail']) @@ -967,11 +966,11 @@ class IMAP(object): log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) if not self.has_folder(inbox): - log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) - self.imap.rename(old_inbox,inbox) + log.info(_("Renaming INBOX from %s to %s") % (old_inbox, inbox)) + self.imap.rename(old_inbox, inbox) self.inbox_folders.append(inbox) else: - log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) + log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox, inbox)) else: log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) else: @@ -1004,7 +1003,7 @@ class IMAP(object): default_quota = auth.domain_default_quota(primary_domain) - if default_quota == "" or default_quota == None: + if default_quota == "" or default_quota is None: default_quota = 0 if len(users) == 0: @@ -1014,19 +1013,19 @@ class IMAP(object): quota = None if type(user) == dict: - if user.has_key(_quota_attr): + if _quota_attr in user: if type(user[_quota_attr]) == list: quota = user[_quota_attr].pop(0) elif type(user[_quota_attr]) == str: quota = user[_quota_attr] else: _quota = auth.get_user_attribute(primary_domain, user, _quota_attr) - if _quota == None: + if _quota is None: quota = 0 else: quota = _quota - if not user.has_key(_inbox_folder_attr): + if _inbox_folder_attr not in user: continue else: if type(user[_inbox_folder_attr]) == list: @@ -1040,7 +1039,7 @@ class IMAP(object): folder = folder.lower() try: - (used,current_quota) = self.imap.lq(folder) + (used, current_quota) = self.imap.lq(folder) except: # TODO: Go in fact correct the quota. log.warning(_("Cannot get current IMAP quota for folder %s") % (folder)) @@ -1058,11 +1057,11 @@ class IMAP(object): log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7) - if new_quota == None: + if new_quota is None: continue if not int(new_quota) == int(quota): - log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota))) + log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder, int(new_quota))) quota = int(new_quota) auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) @@ -1086,7 +1085,7 @@ class IMAP(object): mailhost = None if type(user) == dict: - if user.has_key(_mailserver_attr): + if _mailserver_attr in user: if type(user[_mailserver_attr]) == list: _mailserver = user[_mailserver_attr].pop(0) elif type(user[_mailserver_attr]) == str: @@ -1094,7 +1093,7 @@ class IMAP(object): else: _mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr) - if not user.has_key(_inbox_folder_attr): + if _inbox_folder_attr not in user: continue else: if type(user[_inbox_folder_attr]) == list: @@ -1110,7 +1109,7 @@ class IMAP(object): _current_mailserver = self.imap.find_mailfolder_server(folder) - if not _mailserver == None: + if _mailserver is not None: # TODO: if not _current_mailserver == _mailserver: self.imap._xfer(folder, _current_mailserver, _mailserver) @@ -1138,7 +1137,7 @@ class IMAP(object): primary_domain, secondary_domains """ - if inbox_folders == None: + if inbox_folders is None: inbox_folders = [] folders = self.list_user_folders() @@ -1165,7 +1164,7 @@ class IMAP(object): mbox_parts = self.parse_mailfolder(mailfolder_path) - if mbox_parts == None: + if mbox_parts is None: # We got user identifier only log.error(_("Please don't give us just a user identifier")) return @@ -1204,7 +1203,7 @@ class IMAP(object): acceptable_domain_name_res = [] - if not primary_domain == None: + if primary_domain is not None: for domain in [ primary_domain ] + secondary_domains: acceptable_domain_name_res.append(domain_re % (domain)) @@ -1221,16 +1220,16 @@ class IMAP(object): #print "Acceptable indeed" #acceptable = True #if not acceptable: - #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1],folder,domain_name_re) + #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1], folder, domain_name_re) #if acceptable: - #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0],folder.split('@')[1]) + #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0], folder.split('@')[1]) - folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0],folder.split('@')[1]) + folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0], folder.split('@')[1]) else: folder_name = "%s" % (folder.split(self.get_separator())[1]) - if not folder_name == None: + if folder_name is not None: if not folder_name in folders: folders.append(folder_name) |