summaryrefslogtreecommitdiffstats
path: root/pykolab/imap
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2012-04-19 17:00:20 +0100
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2012-04-19 17:00:20 +0100
commit77d3804884bd6fc13182ec26f8d2d447cda3be61 (patch)
treece79d070bf9629d2067d253372342d893c9b7935 /pykolab/imap
parent1a2d4b809307316a59961811d3f252e0b50477fe (diff)
downloadpykolab-77d3804884bd6fc13182ec26f8d2d447cda3be61.tar.gz
Rebase on specification
Diffstat (limited to 'pykolab/imap')
-rw-r--r--pykolab/imap/__init__.py367
-rw-r--r--pykolab/imap/cyrus.py7
2 files changed, 218 insertions, 156 deletions
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py
index ee7e722..f544beb 100644
--- a/pykolab/imap/__init__.py
+++ b/pykolab/imap/__init__.py
@@ -31,8 +31,6 @@ from pykolab.translate import _
log = pykolab.getLogger('pykolab.imap')
conf = pykolab.getConf()
-auth = pykolab.auth
-
class IMAP(object):
def __init__(self):
# Pool of named IMAP connections, by hostname
@@ -41,10 +39,50 @@ class IMAP(object):
# Place holder for the current IMAP connection
self.imap = None
- self.users = []
- self.inbox_folders = []
+ def cleanup_acls(self, aci_subject):
+ lm_suffix = ""
+
+ log.info(_("Cleaning up ACL entries for %s across all folders") % (aci_subject))
+
+ if len(aci_subject.split('@')) > 1:
+ lm_suffix = "@%s" % (aci_subject.split('@')[1])
+
+
+ shared_folders = self.imap.lm(
+ "shared/*%s" % (lm_suffix)
+ )
- def connect(self, uri=None, domain=None, login=True):
+ user_folders = self.imap.lm(
+ "user/*%s" % (lm_suffix)
+ )
+
+ log.debug(
+ _("Cleaning up ACL entries referring to identifier %s") % (
+ aci_subject
+ ),
+ level=5
+ )
+
+ # For all folders (shared and user), ...
+ folders = user_folders + shared_folders
+
+ log.debug(_("Iterating over %d folders") % (len(folders)), level=5)
+
+ # ... loop through them and ...
+ for folder in folders:
+ # ... list the ACL entries
+ acls = self.imap.lam(folder)
+
+ # For each ACL entry, see if we think it is a current, valid entry
+ for acl_entry in acls.keys():
+ # If the key 'acl_entry' does not exist in the dictionary of valid
+ # ACL entries, this ACL entry has got to go.
+ if acl_entry == aci_subject:
+ # Set the ACL to '' (effectively deleting the ACL entry)
+ log.debug(_("Removing acl %r for subject %r from folder %r") % (acls[acl_entry],acl_entry,folder), level=8)
+ self.imap.sam(folder, acl_entry, '')
+
+ def connect(self, uri=None, server=None, domain=None, login=True):
"""
Connect to the appropriate IMAP backend.
@@ -64,6 +102,7 @@ class IMAP(object):
backend = conf.get('kolab', 'imap_backend')
if not domain == None:
+ self.domain = domain
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
backend = conf.get(domain, 'imap_backend')
@@ -85,6 +124,9 @@ class IMAP(object):
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
+ if not server == None:
+ hostname = server
+
if port == None:
port = 993
@@ -155,117 +197,91 @@ class IMAP(object):
log.warning(_("Called imap.disconnect() on a server that " + \
"we had no connection to"))
+ def create_folder(self, folder_path, server=None):
+ if not server == None:
+ if not self._imap.has_key(server):
+ self.connect(server=server)
+
+ self._imap[server].cm(folder_path)
+ else:
+ self.imap.cm(folder_path)
+
def __getattr__(self, name):
if hasattr(self.imap, name):
return getattr(self.imap, name)
else:
raise AttributeError, _("%r has no attribute %s") % (self,name)
- def has_folder(self, folder):
- folders = self.imap.lm(folder)
- log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,folders), level=8)
- # Greater then one, this folder may have subfolders.
- if len(folders) > 0:
- return True
- else:
- return False
-
- def move_user_folders(self, users=[], domain=None):
- self.connect(domain=domain)
-
- for user in users:
- if type(user) == dict:
- if user.has_key('old_mail'):
- inbox = "user/%s" % (user['mail'])
- old_inbox = "user/%s" % (user['old_mail'])
-
- if self.has_folder(old_inbox):
- log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8)
-
- if not self.has_folder(inbox):
- log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox))
- self.imap.rename(old_inbox,inbox)
- self.inbox_folders.append(inbox)
- else:
- log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox))
- else:
- log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8)
- else:
- log.debug(_("Value for user is not a dictionary"), level=8)
-
- def create_user_folders(self, users, primary_domain, secondary_domains):
- self.connect(domain=primary_domain)
+ def shared_folder_create(self, folder_path, server=None):
+ """
+ Create a shared folder.
+ """
- inbox_folders = []
+ folder_name = "shared%s%s" % (self.imap.separator, folder_path)
+ log.info(_("Creating new shared folder %s") %(folder_path))
+ self.create_folder(folder_name, server)
- domain_section = auth.domain_section(primary_domain)
+ def shared_folder_exists(self, folder_path):
+ """
+ Check if a shared mailbox exists.
+ """
+ return self.has_folder('shared%s%s' % (self.imap.separator, folder_path))
- folders = self.list_user_folders(primary_domain, secondary_domains)
+ def shared_folder_set_type(self, folder_path, folder_type):
+ self.imap._setannotation('shared%s%s' % (self.imap.separator, folder_path), '/vendor/kolab/folder-type', folder_type)
- # See if the folder belongs to any of the users
- _match_attr = conf.get('cyrus-sasl', 'result_attribute')
+ def shared_mailbox_create(self, mailbox_base_name, server=None):
+ """
+ Create a shared folder.
+ """
- if not users:
- users = auth.list_users(primary_domain)
+ folder_name = "shared%s%s" % (self.imap.separator, mailbox_base_name)
+ log.info(_("Creating new shared folder %s") %(mailbox_base_name))
+ self.create_folder(folder_name, server)
- for user in users:
- if type(user) == dict:
- if user.has_key(_match_attr):
- inbox_folders.append(user[_match_attr].lower())
- else:
- # If the user passed on to this function does not have
- # a key for _match_attr, then we have to bail out and
- # continue
- continue
+ def shared_mailbox_exists(self, mailbox_base_name):
+ """
+ Check if a shared mailbox exists.
+ """
+ return self.has_folder('shared%s%s' %(self.imap.separator, mailbox_base_name))
- elif type(user) == str:
- inbox_folders.append(user.lower())
-
- for folder in inbox_folders:
- additional_folders = None
- if not self.has_folder("user%s%s" % (self.imap.separator, folder)):
- # TODO: Perhaps this block is moot
- log.info(_("Creating new INBOX for user (%d): %s")
- % (1,folder))
- try:
- self.imap.cm("user%s%s" % (self.imap.separator, folder))
- except:
- log.warning(
- _("Mailbox already exists: user%s%s") % (
- self.imap.separator,folder
- )
- )
+ def user_mailbox_create(self, mailbox_base_name, server=None):
+ """
+ Create a user mailbox.
- continue
+ Returns the full path to the new mailbox folder.
+ """
+ folder_name = "user%s%s" % (self.imap.separator, mailbox_base_name)
+ log.info(_("Creating new mailbox for user %s") %(mailbox_base_name))
- if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
- self.imap._setquota(
- "user%s%s" % (self.imap.separator, folder),
- 0
- )
+ self.create_folder(folder_name, server)
- if conf.has_option(domain_section, "autocreate_folders"):
+ if not self.domain == None:
+ if conf.has_option(self.domain, "autocreate_folders"):
_additional_folders = conf.get_raw(
- domain_section,
+ self.domain,
"autocreate_folders"
)
additional_folders = conf.plugins.exec_hook(
"create_user_folders",
kw={
- 'folder': folder,
+ 'folder': folder_name,
'additional_folders': _additional_folders
}
)
- if not additional_folders == None:
- self.create_user_additional_folders(folder, additional_folders)
+ if not additional_folders == None:
+ self.user_mailbox_create_additional_folders(mailbox_base_name, additional_folders)
- return inbox_folders
+ return folder_name
+
+ def user_mailbox_create_additional_folders(self, folder, additional_folders):
+ log.debug(_("Creating additional folders for user %s") % (folder), level=8)
- def create_user_additional_folders(self, folder, additional_folders):
for additional_folder in additional_folders.keys():
_add_folder = {}
+
if len(folder.split('@')) > 1:
folder_name = "user%(separator)s%(username)s%(separator)s%(additional_folder_name)s@%(domainname)s"
_add_folder['username'] = folder.split('@')[0]
@@ -283,7 +299,7 @@ class IMAP(object):
try:
self.imap.cm(folder_name)
except:
- log.warning(_("Mailbox already exists: user/%s") % (folder))
+ log.warning(_("Mailbox already exists: %s") % (folder_name))
if additional_folders[additional_folder].has_key("annotations"):
for annotation in additional_folders[additional_folder]["annotations"].keys():
@@ -312,8 +328,10 @@ class IMAP(object):
if len(folder.split('@')) > 1:
domain = folder.split('@')[1]
+ domain_suffix = "@%s" % (domain)
else:
domain = None
+ domain_suffix = ""
if not domain == None:
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
@@ -324,6 +342,7 @@ class IMAP(object):
else:
uri = None
+ log.debug(_("Subscribing user to the additional folders"), level=8)
# Get the credentials
admin_login = conf.get(backend, 'admin_login')
admin_password = conf.get(backend, 'admin_password')
@@ -331,11 +350,122 @@ class IMAP(object):
self.connect(login=False)
self.login_plain(admin_login, admin_password, folder)
- for _folder in self.lm():
+ for _folder in self.lm("%s/*%s" % (folder_name.split('@')[0],domain_suffix)):
self.subscribe(_folder)
self.logout()
+ def user_mailbox_delete(self, mailbox_base_name):
+ """
+ Delete a user mailbox.
+ """
+ folder = "user%s%s" %(self.imap.separator,mailbox_base_name)
+ self.delete_mailfolder(folder)
+ self.cleanup_acls(mailbox_base_name)
+
+ def user_mailbox_exists(self, mailbox_base_name):
+ """
+ Check if a user mailbox exists.
+ """
+ return self.has_folder('user%s%s' %(self.imap.separator, mailbox_base_name))
+
+ def user_mailbox_rename(self, old_name, new_name):
+ old_name = "user%s%s" % (self.imap.separator,old_name)
+ new_name = "user%s%s" % (self.imap.separator,new_name)
+
+ if old_name == new_name:
+ return
+
+ if not self.has_folder(new_name):
+ log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name))
+ try:
+ self.imap.rename(old_name,new_name)
+ except:
+ log.error(_("Could not rename INBOX folder %s to %s") % (oldname,new_name))
+ else:
+ log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name))
+
+ def user_mailbox_server(self, mailbox):
+ self.connect(domain=self.domain)
+ return self.imap.find_mailfolder_server(mailbox)
+
+ def has_folder(self, folder):
+ """
+ Check if the environment has a folder named folder.
+ """
+ self.connect(domain=self.domain)
+
+ folders = self.imap.lm(folder)
+ log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,folders), level=8)
+ # Greater then one, this folder may have subfolders.
+ if len(folders) > 0:
+ return True
+ else:
+ return False
+
+ def _set_kolab_mailfolder_acls(self, acls):
+ if isinstance(acls, basestring):
+ acls = [ acls ]
+
+ for acl in acls:
+ exec("acl = %s" % (acl))
+ folder = acl[0]
+ subject = acl[1]
+ rights = acl[2]
+ if len(acl) == 4:
+ epoch = acl[3]
+ else:
+ epoch = (int)(time.time()) + 3600
+
+ if epoch > (int)(time.time()):
+ log.debug(
+ _("Setting ACL rights %s for subject %s on folder " + \
+ "%s") % (rights,subject,folder), level=8)
+
+ self.imap.sam(
+ folder,
+ "%s" % (subject),
+ "%s" % (rights)
+ )
+
+ else:
+ log.debug(
+ _("Removing ACL rights %s for subject %s on folder " + \
+ "%s") % (rights,subject,folder), level=8)
+
+ self.imap.sam(
+ folder,
+ "%s" % (subject),
+ ""
+ )
+
+ pass
+
+ """ Blah functions """
+
+ def move_user_folders(self, users=[], domain=None):
+ self.connect(domain=domain)
+
+ for user in users:
+ if type(user) == dict:
+ if user.has_key('old_mail'):
+ inbox = "user/%s" % (user['mail'])
+ old_inbox = "user/%s" % (user['old_mail'])
+
+ if self.has_folder(old_inbox):
+ log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8)
+
+ if not self.has_folder(inbox):
+ log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox))
+ self.imap.rename(old_inbox,inbox)
+ self.inbox_folders.append(inbox)
+ else:
+ log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox))
+ else:
+ log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8)
+ else:
+ log.debug(_("Value for user is not a dictionary"), level=8)
+
def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]):
"""
@@ -530,69 +660,6 @@ class IMAP(object):
self.imap.dm(mailfolder_path)
- clean_acls = False
-
- section = False
-
- if mbox_parts['domain']:
- if conf.has_option(mbox_parts['domain'], 'delete_clean_acls'):
- section = mbox_parts['domain']
- elif conf.has_option('kolab', 'delete_clean_acls'):
- section = 'kolab'
- elif conf.has_option('kolab', 'delete_clean_acls'):
- section = 'kolab'
-
- if not section == False:
- clean_acls = conf.get(section, 'delete_clean_acls')
-
- if not clean_acls == False and not clean_acls == 0:
- log.info(_("Cleaning up ACL entries across all folders"))
-
- if mbox_parts['domain']:
- # List the shared and user folders
- shared_folders = self.imap.lm(
- "shared/*@%s" % (mbox_parts['domain'])
- )
-
- user_folders = self.imap.lm(
- "user/*@%s" % (mbox_parts['domain'])
- )
-
- aci_identifier = "%s@%s" % (
- mbox_parts['path_parts'][1],
- mbox_parts['domain']
- )
-
- else:
- shared_folders = self.imap.lm("shared/*")
- user_folders = self.imap.lm("user/*")
- aci_identifier = "%s" % (mbox_parts['path_parts'][1])
-
- log.debug(
- _("Cleaning up ACL entries referring to identifier %s") % (
- aci_identifier
- ),
- level=5
- )
-
- # For all folders (shared and user), ...
- folders = user_folders + shared_folders
-
- log.debug(_("Iterating over %d folders") % (len(folders)), level=5)
-
- # ... loop through them and ...
- for folder in folders:
- # ... list the ACL entries
- acls = self.imap.lam(folder)
-
- # For each ACL entry, see if we think it is a current, valid entry
- for acl_entry in acls.keys():
- # If the key 'acl_entry' does not exist in the dictionary of valid
- # ACL entries, this ACL entry has got to go.
- if acl_entry == aci_identifier:
- # Set the ACL to '' (effectively deleting the ACL entry)
- self.imap.sam(folder, acl_entry, '')
-
def list_user_folders(self, primary_domain=None, secondary_domains=[]):
"""
List the INBOX folders in the IMAP backend. Returns a list of unique
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py
index 1d5b9b4..1eae6d7 100644
--- a/pykolab/imap/cyrus.py
+++ b/pykolab/imap/cyrus.py
@@ -29,8 +29,6 @@ from pykolab.translate import _
log = pykolab.getLogger('pykolab.imap')
conf = pykolab.getConf()
-imap = pykolab.imap
-
class Cyrus(cyruslib.CYRUS):
"""
Abstraction class for some common actions to do exclusively in Cyrus.
@@ -100,6 +98,7 @@ class Cyrus(cyruslib.CYRUS):
cyruslib.CYRUS.login(self, *args, **kw)
self.separator = self.SEP
+ log.debug(_("Continuing with separator: %r") % (self.separator), level=8)
self.murder = False
for capability in self.m.capabilities:
@@ -113,10 +112,7 @@ class Cyrus(cyruslib.CYRUS):
def find_mailfolder_server(self, mailfolder):
annotations = {}
- #print "mailfolder:", mailfolder
-
_mailfolder = self.parse_mailfolder(mailfolder)
- #print "_mailfolder:", _mailfolder
prefix = _mailfolder['path_parts'].pop(0)
mbox = _mailfolder['path_parts'].pop(0)
@@ -198,7 +194,6 @@ class Cyrus(cyruslib.CYRUS):
Login to the actual backend server, then set annotation.
"""
server = self.find_mailfolder_server(mailfolder)
- imap.connect(self.uri.replace(self.server,server))
log.debug(_("Setting annotation %s on folder %s") % (annotation,mailfolder), level=8)