summaryrefslogtreecommitdiffstats
path: root/pykolab/imap/__init__.py
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2019-10-25 13:16:55 +0200
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2019-10-25 13:16:55 +0200
commit28d9e703ea31a070a05589a2bd82b79bc7e6bed4 (patch)
treee7967f619425b94109c6c046075025a6f98aea0f /pykolab/imap/__init__.py
parent334366d52a91de1309df5fc0ce89a57d9b23d5b1 (diff)
downloadpykolab-28d9e703ea31a070a05589a2bd82b79bc7e6bed4.tar.gz
More linting and syntax issues resolved
Diffstat (limited to 'pykolab/imap/__init__.py')
-rw-r--r--pykolab/imap/__init__.py187
1 files changed, 93 insertions, 94 deletions
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py
index 205deaa..3fc72d3 100644
--- a/pykolab/imap/__init__.py
+++ b/pykolab/imap/__init__.py
@@ -54,20 +54,16 @@ class IMAP(object):
if len(aci_subject.split('@')) > 1:
lm_suffix = "@%s" % (aci_subject.split('@')[1])
- shared_folders = self.imap.lm(
- "shared/*%s" % (lm_suffix)
- )
+ shared_folders = self.imap.lm("shared/*%s" % (lm_suffix))
- user_folders = self.imap.lm(
- "user/*%s" % (lm_suffix)
- )
+ user_folders = self.imap.lm("user/*%s" % (lm_suffix))
log.debug(
- _("Cleaning up ACL entries referring to identifier %s") % (
- aci_subject
- ),
- level=5
- )
+ _("Cleaning up ACL entries referring to identifier %s") % (
+ aci_subject
+ ),
+ level=5
+ )
# For all folders (shared and user), ...
folders = user_folders + shared_folders
@@ -128,12 +124,12 @@ class IMAP(object):
# deployment.
backend = conf.get('kolab', 'imap_backend')
- if not domain == None:
+ if domain is not None:
self.domain = domain
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
backend = conf.get(domain, 'imap_backend')
- if uri == None:
+ if uri is None:
if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'):
uri = conf.get(domain, 'imap_uri')
else:
@@ -143,7 +139,7 @@ class IMAP(object):
hostname = None
port = None
- if uri == None:
+ if uri is None:
uri = conf.get(backend, 'uri')
result = urlparse(uri)
@@ -162,13 +158,13 @@ class IMAP(object):
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
- if not server == None:
+ if server is not None:
hostname = server
- if scheme == None or scheme == "":
+ if scheme is None or scheme == "":
scheme = 'imaps'
- if port == None:
+ if port is None:
if scheme == "imaps":
port = 993
elif scheme == "imap":
@@ -182,10 +178,10 @@ class IMAP(object):
admin_login = conf.get(backend, 'admin_login')
admin_password = conf.get(backend, 'admin_password')
- if admin_password == None or admin_password == '':
+ if admin_password is None or admin_password == '':
log.error(_("No administrator password is available."))
- if not self._imap.has_key(hostname):
+ if hostname not in self._imap:
if backend == 'cyrus-imap':
import cyrus
self._imap[hostname] = cyrus.Cyrus(uri)
@@ -216,19 +212,25 @@ class IMAP(object):
else:
if not login:
self.disconnect(hostname)
- self.connect(uri=uri,login=False)
- elif login and not hasattr(self._imap[hostname],'logged_in'):
+ self.connect(uri=uri, login=False)
+ elif login and not hasattr(self._imap[hostname], 'logged_in'):
self.disconnect(hostname)
self.connect(uri=uri)
else:
try:
if hasattr(self._imap[hostname], 'm'):
self._imap[hostname].m.noop()
- elif hasattr(self._imap[hostname], 'noop') and callable(self._imap[hostname].noop):
+ elif hasattr(self._imap[hostname], 'noop') \
+ and callable(self._imap[hostname].noop):
+
self._imap[hostname].noop()
- log.debug(_("Reusing existing IMAP server connection to %s") % (hostname), level=8)
- except:
+ log.debug(
+ _("Reusing existing IMAP server connection to %s") % (hostname),
+ level=8
+ )
+
+ except Exception:
log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8)
self.disconnect(hostname)
self.connect()
@@ -243,7 +245,7 @@ class IMAP(object):
self._set_socket_keepalive(self.imap.sock)
def disconnect(self, server=None):
- if server == None:
+ if server is None:
# No server specified, but make sure self.imap is None anyways
if hasattr(self, 'imap'):
del self.imap
@@ -253,33 +255,30 @@ class IMAP(object):
del self._imap[key]
else:
- if self._imap.has_key(server):
+ if server in self._imap:
del self._imap[server]
else:
- log.warning(_("Called imap.disconnect() on a server that we had no connection to."))
+ log.warning(
+ _("Called imap.disconnect() on a server that we had no connection to.")
+ )
def create_folder(self, folder_path, server=None, partition=None):
folder_path = self.folder_utf7(folder_path)
- if not server == None:
+ if server is not None:
self.connect(server=server)
try:
self._imap[server].cm(folder_path, partition=partition)
return True
- except:
- log.error(
- _("Could not create folder %r on server %r") % (
- folder_path,
- server
- )
- )
+ except Exception:
+ log.error(_("Could not create folder %r on server %r") % (folder_path, server))
else:
try:
self.imap.cm(folder_path, partition=partition)
return True
- except:
+ except Exception:
log.error(_("Could not create folder %r") % (folder_path))
return False
@@ -290,9 +289,9 @@ class IMAP(object):
if hasattr(self.imap.m, name):
return getattr(self.imap.m, name)
else:
- raise AttributeError, _("%r has no attribute %s") % (self,name)
+ raise AttributeError(_("%r has no attribute %s") % (self, name))
else:
- raise AttributeError, _("%r has no attribute %s") % (self,name)
+ raise AttributeError(_("%r has no attribute %s") % (self, name))
def folder_utf7(self, folder):
from pykolab import imap_utf7
@@ -313,13 +312,13 @@ class IMAP(object):
_metadata = self.imap.getannotation(self.folder_utf7(folder), '*')
- for (k,v) in _metadata.items():
+ for (k, v) in _metadata.items():
metadata[self.folder_utf8(k)] = v
return metadata
def get_separator(self):
- if not hasattr(self, 'imap') or self.imap == None:
+ if not hasattr(self, 'imap') or self.imap is None:
self.connect()
if hasattr(self.imap, 'separator'):
@@ -357,13 +356,13 @@ class IMAP(object):
if len(_namespaces) >= 3:
_shared = []
- _shared.append(' '.join(_namespaces[2].replace('((','').replace('))','').split()[:-1]).replace('"', ''))
+ _shared.append(' '.join(_namespaces[2].replace('((', '').replace('))', '').split()[:-1]).replace('"', ''))
if len(_namespaces) >= 2:
- _other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '')
+ _other_users = ' '.join(_namespaces[1].replace('((', '').replace('))', '').split()[:-1]).replace('"', '')
if len(_namespaces) >= 1:
- _personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '')
+ _personal = _namespaces[0].replace('((', '').replace('))', '').split()[0].replace('"', '')
return (_personal, _other_users, _shared)
@@ -385,7 +384,7 @@ class IMAP(object):
'write': 'lrswite',
}
- if short_rights.has_key(acl):
+ if acl in short_rights:
acl = short_rights[acl]
else:
for char in acl:
@@ -428,7 +427,7 @@ class IMAP(object):
try:
self.imap.sam(self.folder_utf7(folder), identifier, acl)
- except Exception, errmsg:
+ except Exception as errmsg:
log.error(
_("Could not set ACL for %s on folder %s: %r") % (
identifier,
@@ -544,10 +543,10 @@ class IMAP(object):
if not hasattr(self, 'domain'):
self.domain = None
- if self.domain == None and len(mailbox_base_name.split('@')) > 1:
+ if self.domain is None and len(mailbox_base_name.split('@')) > 1:
self.domain = mailbox_base_name.split('@')[1]
- if not self.domain == None:
+ if not self.domain is None:
if conf.has_option(self.domain, "autocreate_folders"):
_additional_folders = conf.get_raw(
self.domain,
@@ -564,7 +563,7 @@ class IMAP(object):
auth.disconnect()
if len(domains.keys()) > 0:
- if domains.has_key(self.domain):
+ if self.domain in domains:
primary = domains[self.domain]
if conf.has_option(primary, "autocreate_folders"):
@@ -573,7 +572,7 @@ class IMAP(object):
"autocreate_folders"
)
- if _additional_folders == None:
+ if _additional_folders is None:
if conf.has_option('kolab', "autocreate_folders"):
_additional_folders = conf.get_raw(
'kolab',
@@ -588,13 +587,13 @@ class IMAP(object):
}
)
- if not additional_folders == None:
+ if additional_folders is not None:
self.user_mailbox_create_additional_folders(
mailbox_base_name,
additional_folders
)
- if not self.domain == None:
+ if not self.domain is None:
if conf.has_option(self.domain, "sieve_mgmt"):
sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt')
if utils.true_or_false(sieve_mgmt_enabled):
@@ -634,7 +633,7 @@ class IMAP(object):
self.login_plain(admin_login, admin_password, user)
(personal, other, shared) = self.namespaces()
success = True
- except Exception, errmsg:
+ except Exception as errmsg:
if time.time() - last_log > 5 and self.imap_murder():
log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg))
last_log = time.time()
@@ -661,7 +660,7 @@ class IMAP(object):
log.warning(_("Failed to create folder: %s") % (folder_name))
continue
- if additional_folders[additional_folder].has_key("annotations"):
+ if "annotations" in additional_folders[additional_folder]:
for annotation in additional_folders[additional_folder]["annotations"].keys():
self.set_metadata(
folder_name,
@@ -669,7 +668,7 @@ class IMAP(object):
"%s" % (additional_folders[additional_folder]["annotations"][annotation])
)
- if additional_folders[additional_folder].has_key("acls"):
+ if "acls" in additional_folders[additional_folder]:
for acl in additional_folders[additional_folder]["acls"].keys():
self.set_acl(
folder_name,
@@ -686,7 +685,7 @@ class IMAP(object):
domain = None
domain_suffix = ""
- if not domain == None:
+ if domain is not None:
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
backend = conf.get(domain, 'imap_backend')
@@ -702,10 +701,10 @@ class IMAP(object):
# Subscribe only to personal folders
(personal, other, shared) = self.namespaces()
- if not other == None:
+ if other is not None:
_tests.append(other)
- if not shared == None:
+ if shared is not None:
for _shared in shared:
_tests.append(_shared)
@@ -729,7 +728,7 @@ class IMAP(object):
log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8)
try:
self.subscribe(_folder)
- except Exception, errmsg:
+ except Exception as errmsg:
log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg))
self.logout()
@@ -749,16 +748,16 @@ class IMAP(object):
domain_suffix
)
- if additional_folders[additional_folder].has_key("quota"):
+ if "quota" in additional_folders[additional_folder]:
try:
self.imap.sq(
folder_name,
additional_folders[additional_folder]['quota']
)
- except Exception, errmsg:
+ except Exception as errmsg:
log.error(_("Could not set quota on %s") % (additional_folder))
- if additional_folders[additional_folder].has_key("partition"):
+ if "partition" in additional_folders[additional_folder]:
partition = additional_folders[additional_folder]["partition"]
try:
self.imap._rename(folder_name, folder_name, partition)
@@ -815,7 +814,7 @@ class IMAP(object):
"""
self.connect()
- folder = "user%s%s" %(self.get_separator(),mailbox_base_name)
+ folder = "user%s%s" %(self.get_separator(), mailbox_base_name)
self.delete_mailfolder(folder)
self.cleanup_acls(mailbox_base_name)
@@ -837,23 +836,23 @@ class IMAP(object):
def user_mailbox_rename(self, old_name, new_name, partition=None):
self.connect()
- old_name = "user%s%s" % (self.get_separator(),old_name)
- new_name = "user%s%s" % (self.get_separator(),new_name)
+ old_name = "user%s%s" % (self.get_separator(), old_name)
+ new_name = "user%s%s" % (self.get_separator(), new_name)
- if old_name == new_name and partition == None:
+ if old_name == new_name and partition is None:
return
if not self.has_folder(old_name):
log.error(_("INBOX folder to rename (%s) does not exist") % (old_name))
- if not self.has_folder(new_name) or not partition == None:
- log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name))
+ if not self.has_folder(new_name) or not partition is None:
+ log.info(_("Renaming INBOX from %s to %s") % (old_name, new_name))
try:
- self.imap.rename(old_name,new_name,partition)
+ self.imap.rename(old_name, new_name, partition)
except:
- log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name))
+ log.error(_("Could not rename INBOX folder %s to %s") % (old_name, new_name))
else:
- log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name))
+ log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name, new_name))
def user_mailbox_server(self, mailbox):
server = self.imap.find_mailfolder_server(mailbox.lower()).lower()
@@ -865,7 +864,7 @@ class IMAP(object):
Check if the environment has a folder named folder.
"""
folders = self.imap.lm(self.folder_utf7(folder))
- log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8)
+ log.debug(_("Looking for folder '%s', we found folders: %r") % (folder, [self.folder_utf8(x) for x in folders]), level=8)
# Greater then one, this folder may have subfolders.
if len(folders) > 0:
return True
@@ -924,7 +923,7 @@ class IMAP(object):
if epoch > (int)(time.time()):
log.debug(
_("Setting ACL rights %s for subject %s on folder " + \
- "%s") % (rights,subject,folder), level=8)
+ "%s") % (rights, subject, folder), level=8)
self.set_acl(
folder,
@@ -935,7 +934,7 @@ class IMAP(object):
else:
log.debug(
_("Removing ACL rights %s for subject %s on folder " + \
- "%s") % (rights,subject,folder), level=8)
+ "%s") % (rights, subject, folder), level=8)
self.set_acl(
folder,
@@ -959,7 +958,7 @@ class IMAP(object):
def move_user_folders(self, users=[], domain=None):
for user in users:
if type(user) == dict:
- if user.has_key('old_mail'):
+ if 'old_mail' in user:
inbox = "user/%s" % (user['mail'])
old_inbox = "user/%s" % (user['old_mail'])
@@ -967,11 +966,11 @@ class IMAP(object):
log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8)
if not self.has_folder(inbox):
- log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox))
- self.imap.rename(old_inbox,inbox)
+ log.info(_("Renaming INBOX from %s to %s") % (old_inbox, inbox))
+ self.imap.rename(old_inbox, inbox)
self.inbox_folders.append(inbox)
else:
- log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox))
+ log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox, inbox))
else:
log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8)
else:
@@ -1004,7 +1003,7 @@ class IMAP(object):
default_quota = auth.domain_default_quota(primary_domain)
- if default_quota == "" or default_quota == None:
+ if default_quota == "" or default_quota is None:
default_quota = 0
if len(users) == 0:
@@ -1014,19 +1013,19 @@ class IMAP(object):
quota = None
if type(user) == dict:
- if user.has_key(_quota_attr):
+ if _quota_attr in user:
if type(user[_quota_attr]) == list:
quota = user[_quota_attr].pop(0)
elif type(user[_quota_attr]) == str:
quota = user[_quota_attr]
else:
_quota = auth.get_user_attribute(primary_domain, user, _quota_attr)
- if _quota == None:
+ if _quota is None:
quota = 0
else:
quota = _quota
- if not user.has_key(_inbox_folder_attr):
+ if _inbox_folder_attr not in user:
continue
else:
if type(user[_inbox_folder_attr]) == list:
@@ -1040,7 +1039,7 @@ class IMAP(object):
folder = folder.lower()
try:
- (used,current_quota) = self.imap.lq(folder)
+ (used, current_quota) = self.imap.lq(folder)
except:
# TODO: Go in fact correct the quota.
log.warning(_("Cannot get current IMAP quota for folder %s") % (folder))
@@ -1058,11 +1057,11 @@ class IMAP(object):
log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7)
- if new_quota == None:
+ if new_quota is None:
continue
if not int(new_quota) == int(quota):
- log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota)))
+ log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder, int(new_quota)))
quota = int(new_quota)
auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota)
@@ -1086,7 +1085,7 @@ class IMAP(object):
mailhost = None
if type(user) == dict:
- if user.has_key(_mailserver_attr):
+ if _mailserver_attr in user:
if type(user[_mailserver_attr]) == list:
_mailserver = user[_mailserver_attr].pop(0)
elif type(user[_mailserver_attr]) == str:
@@ -1094,7 +1093,7 @@ class IMAP(object):
else:
_mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr)
- if not user.has_key(_inbox_folder_attr):
+ if _inbox_folder_attr not in user:
continue
else:
if type(user[_inbox_folder_attr]) == list:
@@ -1110,7 +1109,7 @@ class IMAP(object):
_current_mailserver = self.imap.find_mailfolder_server(folder)
- if not _mailserver == None:
+ if _mailserver is not None:
# TODO:
if not _current_mailserver == _mailserver:
self.imap._xfer(folder, _current_mailserver, _mailserver)
@@ -1138,7 +1137,7 @@ class IMAP(object):
primary_domain, secondary_domains
"""
- if inbox_folders == None:
+ if inbox_folders is None:
inbox_folders = []
folders = self.list_user_folders()
@@ -1165,7 +1164,7 @@ class IMAP(object):
mbox_parts = self.parse_mailfolder(mailfolder_path)
- if mbox_parts == None:
+ if mbox_parts is None:
# We got user identifier only
log.error(_("Please don't give us just a user identifier"))
return
@@ -1204,7 +1203,7 @@ class IMAP(object):
acceptable_domain_name_res = []
- if not primary_domain == None:
+ if primary_domain is not None:
for domain in [ primary_domain ] + secondary_domains:
acceptable_domain_name_res.append(domain_re % (domain))
@@ -1221,16 +1220,16 @@ class IMAP(object):
#print "Acceptable indeed"
#acceptable = True
#if not acceptable:
- #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1],folder,domain_name_re)
+ #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1], folder, domain_name_re)
#if acceptable:
- #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0],folder.split('@')[1])
+ #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0], folder.split('@')[1])
- folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0],folder.split('@')[1])
+ folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0], folder.split('@')[1])
else:
folder_name = "%s" % (folder.split(self.get_separator())[1])
- if not folder_name == None:
+ if folder_name is not None:
if not folder_name in folders:
folders.append(folder_name)