diff options
author | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2012-04-19 17:00:20 +0100 |
---|---|---|
committer | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2012-04-19 17:00:20 +0100 |
commit | 77d3804884bd6fc13182ec26f8d2d447cda3be61 (patch) | |
tree | ce79d070bf9629d2067d253372342d893c9b7935 /pykolab/imap/__init__.py | |
parent | 1a2d4b809307316a59961811d3f252e0b50477fe (diff) | |
download | pykolab-77d3804884bd6fc13182ec26f8d2d447cda3be61.tar.gz |
Rebase on specification
Diffstat (limited to 'pykolab/imap/__init__.py')
-rw-r--r-- | pykolab/imap/__init__.py | 367 |
1 files changed, 217 insertions, 150 deletions
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py index ee7e722..f544beb 100644 --- a/pykolab/imap/__init__.py +++ b/pykolab/imap/__init__.py @@ -31,8 +31,6 @@ from pykolab.translate import _ log = pykolab.getLogger('pykolab.imap') conf = pykolab.getConf() -auth = pykolab.auth - class IMAP(object): def __init__(self): # Pool of named IMAP connections, by hostname @@ -41,10 +39,50 @@ class IMAP(object): # Place holder for the current IMAP connection self.imap = None - self.users = [] - self.inbox_folders = [] + def cleanup_acls(self, aci_subject): + lm_suffix = "" + + log.info(_("Cleaning up ACL entries for %s across all folders") % (aci_subject)) + + if len(aci_subject.split('@')) > 1: + lm_suffix = "@%s" % (aci_subject.split('@')[1]) + + + shared_folders = self.imap.lm( + "shared/*%s" % (lm_suffix) + ) - def connect(self, uri=None, domain=None, login=True): + user_folders = self.imap.lm( + "user/*%s" % (lm_suffix) + ) + + log.debug( + _("Cleaning up ACL entries referring to identifier %s") % ( + aci_subject + ), + level=5 + ) + + # For all folders (shared and user), ... + folders = user_folders + shared_folders + + log.debug(_("Iterating over %d folders") % (len(folders)), level=5) + + # ... loop through them and ... + for folder in folders: + # ... list the ACL entries + acls = self.imap.lam(folder) + + # For each ACL entry, see if we think it is a current, valid entry + for acl_entry in acls.keys(): + # If the key 'acl_entry' does not exist in the dictionary of valid + # ACL entries, this ACL entry has got to go. + if acl_entry == aci_subject: + # Set the ACL to '' (effectively deleting the ACL entry) + log.debug(_("Removing acl %r for subject %r from folder %r") % (acls[acl_entry],acl_entry,folder), level=8) + self.imap.sam(folder, acl_entry, '') + + def connect(self, uri=None, server=None, domain=None, login=True): """ Connect to the appropriate IMAP backend. @@ -64,6 +102,7 @@ class IMAP(object): backend = conf.get('kolab', 'imap_backend') if not domain == None: + self.domain = domain if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') @@ -85,6 +124,9 @@ class IMAP(object): scheme = uri.split(':')[0] (hostname, port) = uri.split('/')[2].split(':') + if not server == None: + hostname = server + if port == None: port = 993 @@ -155,117 +197,91 @@ class IMAP(object): log.warning(_("Called imap.disconnect() on a server that " + \ "we had no connection to")) + def create_folder(self, folder_path, server=None): + if not server == None: + if not self._imap.has_key(server): + self.connect(server=server) + + self._imap[server].cm(folder_path) + else: + self.imap.cm(folder_path) + def __getattr__(self, name): if hasattr(self.imap, name): return getattr(self.imap, name) else: raise AttributeError, _("%r has no attribute %s") % (self,name) - def has_folder(self, folder): - folders = self.imap.lm(folder) - log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,folders), level=8) - # Greater then one, this folder may have subfolders. - if len(folders) > 0: - return True - else: - return False - - def move_user_folders(self, users=[], domain=None): - self.connect(domain=domain) - - for user in users: - if type(user) == dict: - if user.has_key('old_mail'): - inbox = "user/%s" % (user['mail']) - old_inbox = "user/%s" % (user['old_mail']) - - if self.has_folder(old_inbox): - log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) - - if not self.has_folder(inbox): - log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) - self.imap.rename(old_inbox,inbox) - self.inbox_folders.append(inbox) - else: - log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) - else: - log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) - else: - log.debug(_("Value for user is not a dictionary"), level=8) - - def create_user_folders(self, users, primary_domain, secondary_domains): - self.connect(domain=primary_domain) + def shared_folder_create(self, folder_path, server=None): + """ + Create a shared folder. + """ - inbox_folders = [] + folder_name = "shared%s%s" % (self.imap.separator, folder_path) + log.info(_("Creating new shared folder %s") %(folder_path)) + self.create_folder(folder_name, server) - domain_section = auth.domain_section(primary_domain) + def shared_folder_exists(self, folder_path): + """ + Check if a shared mailbox exists. + """ + return self.has_folder('shared%s%s' % (self.imap.separator, folder_path)) - folders = self.list_user_folders(primary_domain, secondary_domains) + def shared_folder_set_type(self, folder_path, folder_type): + self.imap._setannotation('shared%s%s' % (self.imap.separator, folder_path), '/vendor/kolab/folder-type', folder_type) - # See if the folder belongs to any of the users - _match_attr = conf.get('cyrus-sasl', 'result_attribute') + def shared_mailbox_create(self, mailbox_base_name, server=None): + """ + Create a shared folder. + """ - if not users: - users = auth.list_users(primary_domain) + folder_name = "shared%s%s" % (self.imap.separator, mailbox_base_name) + log.info(_("Creating new shared folder %s") %(mailbox_base_name)) + self.create_folder(folder_name, server) - for user in users: - if type(user) == dict: - if user.has_key(_match_attr): - inbox_folders.append(user[_match_attr].lower()) - else: - # If the user passed on to this function does not have - # a key for _match_attr, then we have to bail out and - # continue - continue + def shared_mailbox_exists(self, mailbox_base_name): + """ + Check if a shared mailbox exists. + """ + return self.has_folder('shared%s%s' %(self.imap.separator, mailbox_base_name)) - elif type(user) == str: - inbox_folders.append(user.lower()) - - for folder in inbox_folders: - additional_folders = None - if not self.has_folder("user%s%s" % (self.imap.separator, folder)): - # TODO: Perhaps this block is moot - log.info(_("Creating new INBOX for user (%d): %s") - % (1,folder)) - try: - self.imap.cm("user%s%s" % (self.imap.separator, folder)) - except: - log.warning( - _("Mailbox already exists: user%s%s") % ( - self.imap.separator,folder - ) - ) + def user_mailbox_create(self, mailbox_base_name, server=None): + """ + Create a user mailbox. - continue + Returns the full path to the new mailbox folder. + """ + folder_name = "user%s%s" % (self.imap.separator, mailbox_base_name) + log.info(_("Creating new mailbox for user %s") %(mailbox_base_name)) - if conf.get('kolab', 'imap_backend') == 'cyrus-imap': - self.imap._setquota( - "user%s%s" % (self.imap.separator, folder), - 0 - ) + self.create_folder(folder_name, server) - if conf.has_option(domain_section, "autocreate_folders"): + if not self.domain == None: + if conf.has_option(self.domain, "autocreate_folders"): _additional_folders = conf.get_raw( - domain_section, + self.domain, "autocreate_folders" ) additional_folders = conf.plugins.exec_hook( "create_user_folders", kw={ - 'folder': folder, + 'folder': folder_name, 'additional_folders': _additional_folders } ) - if not additional_folders == None: - self.create_user_additional_folders(folder, additional_folders) + if not additional_folders == None: + self.user_mailbox_create_additional_folders(mailbox_base_name, additional_folders) - return inbox_folders + return folder_name + + def user_mailbox_create_additional_folders(self, folder, additional_folders): + log.debug(_("Creating additional folders for user %s") % (folder), level=8) - def create_user_additional_folders(self, folder, additional_folders): for additional_folder in additional_folders.keys(): _add_folder = {} + if len(folder.split('@')) > 1: folder_name = "user%(separator)s%(username)s%(separator)s%(additional_folder_name)s@%(domainname)s" _add_folder['username'] = folder.split('@')[0] @@ -283,7 +299,7 @@ class IMAP(object): try: self.imap.cm(folder_name) except: - log.warning(_("Mailbox already exists: user/%s") % (folder)) + log.warning(_("Mailbox already exists: %s") % (folder_name)) if additional_folders[additional_folder].has_key("annotations"): for annotation in additional_folders[additional_folder]["annotations"].keys(): @@ -312,8 +328,10 @@ class IMAP(object): if len(folder.split('@')) > 1: domain = folder.split('@')[1] + domain_suffix = "@%s" % (domain) else: domain = None + domain_suffix = "" if not domain == None: if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): @@ -324,6 +342,7 @@ class IMAP(object): else: uri = None + log.debug(_("Subscribing user to the additional folders"), level=8) # Get the credentials admin_login = conf.get(backend, 'admin_login') admin_password = conf.get(backend, 'admin_password') @@ -331,11 +350,122 @@ class IMAP(object): self.connect(login=False) self.login_plain(admin_login, admin_password, folder) - for _folder in self.lm(): + for _folder in self.lm("%s/*%s" % (folder_name.split('@')[0],domain_suffix)): self.subscribe(_folder) self.logout() + def user_mailbox_delete(self, mailbox_base_name): + """ + Delete a user mailbox. + """ + folder = "user%s%s" %(self.imap.separator,mailbox_base_name) + self.delete_mailfolder(folder) + self.cleanup_acls(mailbox_base_name) + + def user_mailbox_exists(self, mailbox_base_name): + """ + Check if a user mailbox exists. + """ + return self.has_folder('user%s%s' %(self.imap.separator, mailbox_base_name)) + + def user_mailbox_rename(self, old_name, new_name): + old_name = "user%s%s" % (self.imap.separator,old_name) + new_name = "user%s%s" % (self.imap.separator,new_name) + + if old_name == new_name: + return + + if not self.has_folder(new_name): + log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) + try: + self.imap.rename(old_name,new_name) + except: + log.error(_("Could not rename INBOX folder %s to %s") % (oldname,new_name)) + else: + log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) + + def user_mailbox_server(self, mailbox): + self.connect(domain=self.domain) + return self.imap.find_mailfolder_server(mailbox) + + def has_folder(self, folder): + """ + Check if the environment has a folder named folder. + """ + self.connect(domain=self.domain) + + folders = self.imap.lm(folder) + log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,folders), level=8) + # Greater then one, this folder may have subfolders. + if len(folders) > 0: + return True + else: + return False + + def _set_kolab_mailfolder_acls(self, acls): + if isinstance(acls, basestring): + acls = [ acls ] + + for acl in acls: + exec("acl = %s" % (acl)) + folder = acl[0] + subject = acl[1] + rights = acl[2] + if len(acl) == 4: + epoch = acl[3] + else: + epoch = (int)(time.time()) + 3600 + + if epoch > (int)(time.time()): + log.debug( + _("Setting ACL rights %s for subject %s on folder " + \ + "%s") % (rights,subject,folder), level=8) + + self.imap.sam( + folder, + "%s" % (subject), + "%s" % (rights) + ) + + else: + log.debug( + _("Removing ACL rights %s for subject %s on folder " + \ + "%s") % (rights,subject,folder), level=8) + + self.imap.sam( + folder, + "%s" % (subject), + "" + ) + + pass + + """ Blah functions """ + + def move_user_folders(self, users=[], domain=None): + self.connect(domain=domain) + + for user in users: + if type(user) == dict: + if user.has_key('old_mail'): + inbox = "user/%s" % (user['mail']) + old_inbox = "user/%s" % (user['old_mail']) + + if self.has_folder(old_inbox): + log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) + + if not self.has_folder(inbox): + log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) + self.imap.rename(old_inbox,inbox) + self.inbox_folders.append(inbox) + else: + log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) + else: + log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) + else: + log.debug(_("Value for user is not a dictionary"), level=8) + def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): """ @@ -530,69 +660,6 @@ class IMAP(object): self.imap.dm(mailfolder_path) - clean_acls = False - - section = False - - if mbox_parts['domain']: - if conf.has_option(mbox_parts['domain'], 'delete_clean_acls'): - section = mbox_parts['domain'] - elif conf.has_option('kolab', 'delete_clean_acls'): - section = 'kolab' - elif conf.has_option('kolab', 'delete_clean_acls'): - section = 'kolab' - - if not section == False: - clean_acls = conf.get(section, 'delete_clean_acls') - - if not clean_acls == False and not clean_acls == 0: - log.info(_("Cleaning up ACL entries across all folders")) - - if mbox_parts['domain']: - # List the shared and user folders - shared_folders = self.imap.lm( - "shared/*@%s" % (mbox_parts['domain']) - ) - - user_folders = self.imap.lm( - "user/*@%s" % (mbox_parts['domain']) - ) - - aci_identifier = "%s@%s" % ( - mbox_parts['path_parts'][1], - mbox_parts['domain'] - ) - - else: - shared_folders = self.imap.lm("shared/*") - user_folders = self.imap.lm("user/*") - aci_identifier = "%s" % (mbox_parts['path_parts'][1]) - - log.debug( - _("Cleaning up ACL entries referring to identifier %s") % ( - aci_identifier - ), - level=5 - ) - - # For all folders (shared and user), ... - folders = user_folders + shared_folders - - log.debug(_("Iterating over %d folders") % (len(folders)), level=5) - - # ... loop through them and ... - for folder in folders: - # ... list the ACL entries - acls = self.imap.lam(folder) - - # For each ACL entry, see if we think it is a current, valid entry - for acl_entry in acls.keys(): - # If the key 'acl_entry' does not exist in the dictionary of valid - # ACL entries, this ACL entry has got to go. - if acl_entry == aci_identifier: - # Set the ACL to '' (effectively deleting the ACL entry) - self.imap.sam(folder, acl_entry, '') - def list_user_folders(self, primary_domain=None, secondary_domains=[]): """ List the INBOX folders in the IMAP backend. Returns a list of unique |