summaryrefslogtreecommitdiffstats
path: root/pykolab/auth/ldap/__init__.py
diff options
context:
space:
mode:
authorPaul Boddie <paul@boddie.org.uk>2013-12-05 17:11:38 +0100
committerPaul Boddie <paul@boddie.org.uk>2013-12-05 17:11:38 +0100
commit265063a09a2a77470c2cefd95b4667f86567126d (patch)
tree200362d1e44e2719daedaf8090763851c5462521 /pykolab/auth/ldap/__init__.py
parent156f4f98a26e7d54484e3306d941eae74f8ee2ed (diff)
downloadpykolab-265063a09a2a77470c2cefd95b4667f86567126d.tar.gz
Consolidated duplicate code regions and simplified control flow.
Diffstat (limited to 'pykolab/auth/ldap/__init__.py')
-rw-r--r--pykolab/auth/ldap/__init__.py498
1 files changed, 151 insertions, 347 deletions
diff --git a/pykolab/auth/ldap/__init__.py b/pykolab/auth/ldap/__init__.py
index 04e9982..952af86 100644
--- a/pykolab/auth/ldap/__init__.py
+++ b/pykolab/auth/ldap/__init__.py
@@ -16,13 +16,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
-import datetime
-import _ldap
import ldap
import ldap.async
import ldap.controls
import logging
-import time
import pykolab
import pykolab.base
@@ -374,60 +371,135 @@ class LDAP(pykolab.base.Base):
return utils.normalize(_entry_attrs)
- def find_recipient(self, address="*", exclude_entry_id=None):
+ def init_entry_attribute(self, entry, attrname):
+ """
+ Initialise the attribute with the given attrname in the entry if no
+ definition is already present.
"""
- Given an address string or list of addresses, find one or more valid
- recipients.
- Use this function only to detect whether an address is already in
- use by any entry in the tree.
+ if not entry.has_key(attrname):
+ entry[attrname] = self.get_entry_attribute(
+ entry['id'],
+ attrname
+ )
- Specify an additional entry_id to exclude to exclude matches against
- the current entry.
+ def init_folder_acl(self, entry):
+ """
+ Initialise the folder ACL entry using the given entry details.
"""
- self._bind()
+ folder_path = self.get_folder_path(entry)
+ folderacl_entry_attribute = self.config_get('sharedfolder_acl_entry_attribute')
- if exclude_entry_id is not None:
- __filter_prefix = "(&"
- __filter_suffix = "(!(%s=%s)))" % (
- self.config_get('unique_attribute'),
- exclude_entry_id
- )
+ if folderacl_entry_attribute is not None:
+ self.init_entry_attribute(entry, folderacl_entry_attribute)
+
+ if entry[folderacl_entry_attribute] is not None:
+ # Parse it before assigning it
+ entry['kolabmailfolderaclentry'] = []
+ if not isinstance(entry[folderacl_entry_attribute], list):
+ entry[folderacl_entry_attribute] = [ entry[folderacl_entry_attribute] ]
+
+ for acl_entry in entry[folderacl_entry_attribute]:
+ acl_access = acl_entry.split()[-1]
+ aci_subject = ' '.join(acl_entry.split()[:-1])
+
+ log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8)
+
+ access_lookup_dict = {
+ 'read': 'lrs',
+ 'post': 'p',
+ 'append': 'wip',
+ 'write': 'lrswite',
+ 'all': 'lrsedntxakcpiw'
+ }
+
+ if access_lookup_dict.has_key(acl_access):
+ acl_access = access_lookup_dict[acl_access]
+
+ log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8)
+
+ entry['kolabmailfolderaclentry'].append("(%r, %r, %r)" % (folder_path, aci_subject, acl_access))
+
+ self.init_entry_attribute(entry, 'kolabmailfolderaclentry')
+
+ def get_folder_path(self, entry):
+ """
+ Return the folder path indicated by the given entry or from the
+ configuration.
+ """
+ if entry.get('kolabtargetfolder'):
+ return entry['kolabtargetfolder']
else:
- __filter_prefix = ""
- __filter_suffix = ""
+ # TODO: What is *the* way to see if we need to create an @domain
+ # shared mailbox?
+ # TODO^2: self.domain, really? Presumes any mail attribute is
+ # set to the primary domain name space...
+ # TODO^3: Test if the cn is already something@domain
+ result_attribute = conf.get('cyrus-sasl', 'result_attribute')
+ if result_attribute in ['mail']:
+ return "%s@%s" % (entry['cn'], self.domain)
+ else:
+ return entry['cn']
+
+ def get_search_attributes(self):
+ """
+ Return search attributes and result attributes.
+ """
- kolab_filter = self._kolab_filter()
recipient_address_attrs = self.config_get_list("mail_attributes")
- result_attributes = []
+ result_attributes = recipient_address_attrs[:]
+ result_attributes.append(self.config_get('unique_attribute'))
- for recipient_address_attr in recipient_address_attrs:
- result_attributes.append(recipient_address_attr)
+ return recipient_address_attrs, result_attributes
- result_attributes.append(self.config_get('unique_attribute'))
+ def get_filter_for_addresses(self, attrs, prefix, suffix):
+ """
+ Return the filter string for addresses provided by the given
+ attributes, using the specified prefix and suffix.
+ """
_filter = "(|"
- for recipient_address_attr in recipient_address_attrs:
+ for attr in attrs:
if isinstance(address, basestring):
- _filter += "(%s=%s)" % (recipient_address_attr, address)
+ _filter += "(%s=%s)" % (attr, address)
else:
for _address in address:
- _filter += "(%s=%s)" % (recipient_address_attr, _address)
+ _filter += "(%s=%s)" % (attr, _address)
_filter += ")"
- _filter = "%s%s%s" % (__filter_prefix,_filter,__filter_suffix)
+ return "%s%s%s" % (prefix, _filter, suffix)
- log.debug(_("Finding recipient with filter %r") % (_filter), level=8)
+ def get_filter_prefix_and_suffix(self, exclude_entry_id):
+ """
+ Return a tuple containing a filter prefix and suffix according to
+ whether the entry identifier should be excluded from results.
+ """
+
+ if exclude_entry_id is not None:
+ return (
+ "(&",
+ "(!(%s=%s)))" % (
+ self.config_get('unique_attribute'),
+ exclude_entry_id
+ ))
+ else:
+ return "", ""
+
+ def perform_search(self, _filter, result_attributes, base_dn_property):
+ """
+ Using the given filter, result attributes and base dn property,
+ perform a search and return the results.
+ """
if len(_filter) <= 6:
return None
- config_base_dn = self.config_get('base_dn')
+ config_base_dn = self.config_get(base_dn_property)
ldap_base_dn = self._kolab_domain_root_dn(self.domain)
if ldap_base_dn is not None and ldap_base_dn != config_base_dn:
@@ -454,78 +526,54 @@ class LDAP(pykolab.base.Base):
return _entry_dns
- def find_resource(self, address="*", exclude_entry_id=None):
+ def find_recipient(self, address="*", exclude_entry_id=None):
"""
Given an address string or list of addresses, find one or more valid
- resources.
+ recipients.
- Specify an additional entry_id to exclude to exclude matches.
+ Use this function only to detect whether an address is already in
+ use by any entry in the tree.
+
+ Specify an additional entry_id to exclude to exclude matches against
+ the current entry.
"""
self._bind()
- if exclude_entry_id is not None:
- __filter_prefix = "(&"
- __filter_suffix = "(!(%s=%s)))" % (
- self.config_get('unique_attribute'),
- exclude_entry_id
- )
-
- else:
- __filter_prefix = ""
- __filter_suffix = ""
+ __filter_prefix, __filter_suffix = self.get_filter_prefix_and_suffix(exclude_entry_id)
- resource_filter = self.config_get('resource_filter')
- if resource_filter is not None:
- __filter_prefix = "(&%s" % resource_filter
- __filter_suffix = ")"
+ recipient_address_attrs, result_attributes = self.get_search_attributes()
- recipient_address_attrs = self.config_get_list("mail_attributes")
+ _filter = self.get_filter_for_addresses(recipient_address_attrs, __filter_prefix, __filter_suffix)
- result_attributes = recipient_address_attrs
- result_attributes.append(self.config_get('unique_attribute'))
-
- _filter = "(|"
-
- for recipient_address_attr in recipient_address_attrs:
- if isinstance(address, basestring):
- _filter += "(%s=%s)" % (recipient_address_attr, address)
- else:
- for _address in address:
- _filter += "(%s=%s)" % (recipient_address_attr, _address)
+ log.debug(_("Finding recipient with filter %r") % (_filter), level=8)
- _filter += ")"
+ return self.perform_search(_filter, result_attributes, 'base_dn')
- _filter = "%s%s%s" % (__filter_prefix,_filter,__filter_suffix)
+ def find_resource(self, address="*", exclude_entry_id=None):
+ """
+ Given an address string or list of addresses, find one or more valid
+ resources.
- log.debug(_("Finding resource with filter %r") % (_filter), level=8)
+ Specify an additional entry_id to exclude to exclude matches.
+ """
- if len(_filter) <= 6:
- return None
+ self._bind()
- config_base_dn = self.config_get('resource_base_dn')
- ldap_base_dn = self._kolab_domain_root_dn(self.domain)
+ __filter_prefix, __filter_suffix = self.get_filter_prefix_and_suffix(exclude_entry_id)
- if ldap_base_dn is not None and ldap_base_dn != config_base_dn:
- resource_base_dn = ldap_base_dn
- else:
- resource_base_dn = config_base_dn
+ resource_filter = self.config_get('resource_filter')
+ if resource_filter is not None:
+ __filter_prefix = "(&%s" % resource_filter
+ __filter_suffix = ")"
- _results = self.ldap.search_s(
- resource_base_dn,
- scope=ldap.SCOPE_SUBTREE,
- filterstr=_filter,
- attrlist=result_attributes,
- attrsonly=True
- )
+ recipient_address_attrs, result_attributes = self.get_search_attributes()
- _entry_dns = []
+ _filter = self.get_filter_for_addresses(recipient_address_attrs, __filter_prefix, __filter_suffix)
- for _result in _results:
- (_entry_id, _entry_attrs) = _result
- _entry_dns.append(_entry_id)
+ log.debug(_("Finding resource with filter %r") % (_filter), level=8)
- return _entry_dns
+ return self.perform_search(_filter, result_attributes, 'resource_base_dn')
def get_latest_sync_timestamp(self):
timestamp = cache.last_modify_timestamp(self.domain)
@@ -861,7 +909,7 @@ class LDAP(pykolab.base.Base):
log.debug(_("secondary_mail_addresses: %r") % (secondary_mail_addresses), level=8)
log.debug(_("entry[%s]: %r") % (secondary_mail_attribute,entry[secondary_mail_attribute]), level=8)
- if list(set(secondary_mail_addresses)) != list(set(entry[secondary_mail_attribute])):
+ if set(secondary_mail_addresses) != set(entry[secondary_mail_attribute]):
self.set_entry_attribute(
entry,
secondary_mail_attribute,
@@ -1129,116 +1177,43 @@ class LDAP(pykolab.base.Base):
foldertype_attribute = self.config_get('sharedfolder_type_attribute')
if foldertype_attribute is not None:
- if not entry.has_key(foldertype_attribute):
- entry[foldertype_attribute] = self.get_user_attribute(
- entry['id'],
- foldertype_attribute
- )
+ self.init_entry_attribute(entry, foldertype_attribute)
if entry[foldertype_attribute] is not None:
entry['kolabfoldertype'] = entry[foldertype_attribute]
- if not entry.has_key('kolabfoldertype'):
- entry['kolabfoldertype'] = self.get_entry_attribute(
- entry['id'],
- 'kolabfoldertype'
- )
+ self.init_entry_attribute(entry, 'kolabfoldertype')
# A delivery address is postuser+targetfolder
delivery_address_attribute = self.config_get('sharedfolder_delivery_address_attribute')
if delivery_address_attribute is not None:
- if not entry.has_key(delivery_address_attribute):
- entry[delivery_address_attribute] = self.get_entry_attribute(
- entry['id'],
- delivery_address_attribute
- )
+ self.init_entry_attribute(entry, delivery_address_attribute)
if entry[delivery_address_attribute] is not None:
if len(entry[delivery_address_attribute].split('+')) > 1:
entry['kolabtargetfolder'] = entry[delivery_address_attribute].split('+')[1]
- if not entry.has_key('kolabtargetfolder'):
- entry['kolabtargetfolder'] = self.get_entry_attribute(
- entry['id'],
- 'kolabtargetfolder'
- )
+ self.init_entry_attribute(entry, 'kolabtargetfolder')
- if entry.has_key('kolabtargetfolder') and \
- entry['kolabtargetfolder'] is not None:
+ self.init_folder_for_entry(entry)
- folder_path = entry['kolabtargetfolder']
- else:
- # TODO: What is *the* way to see if we need to create an @domain
- # shared mailbox?
- # TODO^2: self.domain, really? Presumes any mail attribute is
- # set to the primary domain name space...
- # TODO^3: Test if the cn is already something@domain
- result_attribute = conf.get('cyrus-sasl', 'result_attribute')
- if result_attribute in ['mail']:
- folder_path = "%s@%s" % (entry['cn'], self.domain)
- else:
- folder_path = entry['cn']
+ folder_path = self.get_folder_path(entry)
- folderacl_entry_attribute = self.config_get('sharedfolder_acl_entry_attribute')
-
- if folderacl_entry_attribute is not None:
- if not entry.has_key(folderacl_entry_attribute):
- entry[folderacl_entry_attribute] = self.get_entry_attribute(
- entry['id'],
- folderacl_entry_attribute
- )
-
- if entry[folderacl_entry_attribute] is not None:
- # Parse it before assigning it
- entry['kolabmailfolderaclentry'] = []
- if not isinstance(entry[folderacl_entry_attribute], list):
- entry[folderacl_entry_attribute] = [ entry[folderacl_entry_attribute] ]
-
- for acl_entry in entry[folderacl_entry_attribute]:
- acl_access = acl_entry.split()[-1]
- aci_subject = ' '.join(acl_entry.split()[:-1])
-
- log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8)
-
- access_lookup_dict = {
- 'read': 'lrs',
- 'post': 'p',
- 'append': 'wip',
- 'write': 'lrswite',
- 'all': 'lrsedntxakcpiw'
- }
-
- if access_lookup_dict.has_key(acl_access):
- acl_access = access_lookup_dict[acl_access]
-
- log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8)
-
- entry['kolabmailfolderaclentry'].append("(%r, %r, %r)" % (folder_path, aci_subject, acl_access))
-
- if not entry.has_key('kolabmailfolderaclentry'):
- entry['kolabmailfolderaclentry'] = self.get_entry_attribute(
- entry['id'],
- 'kolabmailfolderaclentry'
- )
+ self.init_folder_acl(entry)
if not self.imap.shared_folder_exists(folder_path):
self.imap.shared_folder_create(folder_path, server)
- if entry.has_key('kolabfoldertype') and \
- entry['kolabfoldertype'] is not None:
-
+ if entry.get('kolabfoldertype'):
self.imap.shared_folder_set_type(
folder_path,
entry['kolabfoldertype']
)
- if entry.has_key(delivery_address_attribute) and \
- entry[delivery_address_attribute] is not None:
+ if entry.get(delivery_address_attribute):
self.imap.set_acl(folder_path, 'anyone', 'p')
- if entry.has_key('kolabmailfolderaclentry') and \
- entry['kolabmailfolderaclentry'] is not None:
-
+ if entry.get('kolabmailfolderaclentry'):
self.imap._set_kolab_mailfolder_acls(
entry['kolabmailfolderaclentry']
)
@@ -1287,13 +1262,7 @@ class LDAP(pykolab.base.Base):
for key in rcpt_addrs:
entry[key] = rcpt_addrs[key]
- if not entry.has_key(result_attribute):
- return
-
- if entry[result_attribute] is None:
- return
-
- if entry[result_attribute] == '':
+ if not entry.get(result_attribute):
return
cache.get_entry(self.domain, entry)
@@ -1443,13 +1412,7 @@ class LDAP(pykolab.base.Base):
for key in entry_changes.keys():
entry[key] = entry_changes[key]
- if not entry.has_key(result_attribute):
- return
-
- if entry[result_attribute] is None:
- return
-
- if entry[result_attribute] == '':
+ if not entry.get(result_attribute):
return
# Now look at entry_changes and old_canon_attr, and see if they're
@@ -1477,137 +1440,8 @@ class LDAP(pykolab.base.Base):
def _change_modify_role(self, entry, change):
pass
- def _change_modify_sharedfolder(self, entry, change):
- """
- A shared folder was modified.
- """
- self.imap.connect(domain=self.domain)
-
- server = None
-
- # Get some configuration values
- mailserver_attribute = self.config_get('mailserver_attribute')
- if entry.has_key(mailserver_attribute):
- server = entry[mailserver_attribute]
-
- foldertype_attribute = self.config_get('sharedfolder_type_attribute')
- if foldertype_attribute is not None:
- if not entry.has_key(foldertype_attribute):
- entry[foldertype_attribute] = self.get_user_attribute(
- entry['id'],
- foldertype_attribute
- )
-
- if entry[foldertype_attribute] is not None:
- entry['kolabfoldertype'] = entry[foldertype_attribute]
-
- if not entry.has_key('kolabfoldertype'):
- entry['kolabfoldertype'] = self.get_entry_attribute(
- entry['id'],
- 'kolabfoldertype'
- )
-
- # A delivery address is postuser+targetfolder
- delivery_address_attribute = self.config_get('sharedfolder_delivery_address_attribute')
- if delivery_address_attribute is not None:
- if not entry.has_key(delivery_address_attribute):
- entry[delivery_address_attribute] = self.get_entry_attribute(
- entry['id'],
- delivery_address_attribute
- )
-
- if entry[delivery_address_attribute] is not None:
- if len(entry[delivery_address_attribute].split('+')) > 1:
- entry['kolabtargetfolder'] = entry[delivery_address_attribute].split('+')[1]
-
- if not entry.has_key('kolabtargetfolder'):
- entry['kolabtargetfolder'] = self.get_entry_attribute(
- entry['id'],
- 'kolabtargetfolder'
- )
-
- if entry.has_key('kolabtargetfolder') and \
- entry['kolabtargetfolder'] is not None:
-
- folder_path = entry['kolabtargetfolder']
- else:
- # TODO: What is *the* way to see if we need to create an @domain
- # shared mailbox?
- # TODO^2: self.domain, really? Presumes any mail attribute is
- # set to the primary domain name space...
- # TODO^3: Test if the cn is already something@domain
- result_attribute = conf.get('cyrus-sasl', 'result_attribute')
- if result_attribute in ['mail']:
- folder_path = "%s@%s" % (entry['cn'], self.domain)
- else:
- folder_path = entry['cn']
-
- folderacl_entry_attribute = self.config_get('sharedfolder_acl_entry_attribute')
-
- if folderacl_entry_attribute is not None:
- if not entry.has_key(folderacl_entry_attribute):
- entry[folderacl_entry_attribute] = self.get_entry_attribute(
- entry['id'],
- folderacl_entry_attribute
- )
-
- if entry[folderacl_entry_attribute] is not None:
- # Parse it before assigning it
- entry['kolabmailfolderaclentry'] = []
- if not isinstance(entry[folderacl_entry_attribute], list):
- entry[folderacl_entry_attribute] = [ entry[folderacl_entry_attribute] ]
-
- for acl_entry in entry[folderacl_entry_attribute]:
- acl_access = acl_entry.split()[-1]
- aci_subject = ' '.join(acl_entry.split()[:-1])
-
- log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8)
-
- access_lookup_dict = {
- 'read': 'lrs',
- 'post': 'p',
- 'append': 'wip',
- 'write': 'lrswite',
- 'all': 'lrsedntxakcpiw'
- }
-
- if access_lookup_dict.has_key(acl_access):
- acl_access = access_lookup_dict[acl_access]
-
- log.debug(_("Found a subject %r with access %r") % (aci_subject, acl_access), level=8)
-
- entry['kolabmailfolderaclentry'].append("(%r, %r, %r)" % (folder_path, aci_subject, acl_access))
-
- if not entry.has_key('kolabmailfolderaclentry'):
- entry['kolabmailfolderaclentry'] = self.get_entry_attribute(
- entry['id'],
- 'kolabmailfolderaclentry'
- )
-
- if not self.imap.shared_folder_exists(folder_path):
- self.imap.shared_folder_create(folder_path, server)
-
- if entry.has_key('kolabfoldertype') and \
- entry['kolabfoldertype'] is not None:
-
- self.imap.shared_folder_set_type(
- folder_path,
- entry['kolabfoldertype']
- )
-
- if entry.has_key(delivery_address_attribute) and \
- entry[delivery_address_attribute] is not None:
- self.imap.set_acl(folder_path, 'anyone', 'p')
-
- if entry.has_key('kolabmailfolderaclentry') and \
- entry['kolabmailfolderaclentry'] is not None:
-
- self.imap._set_kolab_mailfolder_acls(
- entry['kolabmailfolderaclentry']
- )
-
- #if server is None:
- #self.entry_set_attribute(mailserver_attribute, server)
+ # A shared folder was modified.
+ _change_modify_sharedfolder = _change_add_sharedfolder
def _change_modify_user(self, entry, change):
"""
@@ -1698,54 +1532,24 @@ class LDAP(pykolab.base.Base):
if entry.has_key(mailserver_attribute):
server = entry[mailserver_attribute]
- if not entry.has_key('kolabtargetfolder'):
- entry['kolabtargetfolder'] = self.get_entry_attribute(
- entry['id'],
- 'kolabtargetfolder'
- )
+ self.init_entry_attribute(entry, 'kolabtargetfolder')
- if not entry.has_key('kolabfoldertype'):
- entry['kolabfoldertype'] = self.get_entry_attribute(
- entry['id'],
- 'kolabfoldertype'
- )
+ self.init_entry_attribute(entry, 'kolabfoldertype')
- #if not entry.has_key('kolabmailfolderaclentry'):
- #entry['kolabmailfolderaclentry'] = self.get_entry_attribute(
- #entry['id'],
- #'kolabmailfolderaclentry'
- #)
+ #self.init_entry_attribute(entry, 'kolabmailfolderaclentry')
- if entry.has_key('kolabtargetfolder') and \
- entry['kolabtargetfolder'] is not None:
-
- folder_path = entry['kolabtargetfolder']
- else:
- # TODO: What is *the* way to see if we need to create an @domain
- # shared mailbox?
- # TODO^2: self.domain, really? Presumes any mail attribute is
- # set to the primary domain name space...
- # TODO^3: Test if the cn is already something@domain
- result_attribute = conf.get('cyrus-sasl', 'result_attribute')
- if result_attribute in ['mail']:
- folder_path = "%s@%s" % (entry['cn'], self.domain)
- else:
- folder_path = entry['cn']
+ folder_path = self.get_folder_path(entry)
if not self.imap.shared_folder_exists(folder_path):
self.imap.shared_folder_create(folder_path, server)
- if entry.has_key('kolabfoldertype') and \
- entry['kolabfoldertype'] is not None:
-
+ if entry.get('kolabfoldertype'):
self.imap.shared_folder_set_type(
folder_path,
entry['kolabfoldertype']
)
- #if entry.has_key('kolabmailfolderaclentry') and \
- #entry['kolabmailfolderaclentry'] is not None:
-
+ #if entry.get('kolabmailfolderaclentry'):
#self.imap._set_kolab_mailfolder_acls(
#entry['kolabmailfolderaclentry']
#)