diff options
author | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2011-07-29 13:00:51 -0400 |
---|---|---|
committer | Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com> | 2011-07-29 13:00:51 -0400 |
commit | e4c7a4bd74b60876286b74bc1834425a4b583bf5 (patch) | |
tree | e920163fdc6a323bd59e97a97f4107b06db494fb /bin | |
parent | ebd1d9e9042a6a90e1ded684870b2ba9e667dc14 (diff) | |
download | pykolab-e4c7a4bd74b60876286b74bc1834425a4b583bf5.tar.gz |
Enhance kolab_smtp_access_policy by providing it with sqlalchemy-based SQL caching
Diffstat (limited to 'bin')
-rwxr-xr-x[-rw-r--r--] | bin/kolab_smtp_access_policy.py | 292 |
1 files changed, 180 insertions, 112 deletions
diff --git a/bin/kolab_smtp_access_policy.py b/bin/kolab_smtp_access_policy.py index c814e59..23e3324 100644..100755 --- a/bin/kolab_smtp_access_policy.py +++ b/bin/kolab_smtp_access_policy.py @@ -25,6 +25,27 @@ import time from optparse import OptionParser from ConfigParser import SafeConfigParser +cache = None + +try: + import sqlalchemy + from sqlalchemy import Boolean + from sqlalchemy import Column + from sqlalchemy import DateTime + from sqlalchemy import Integer + from sqlalchemy import MetaData + from sqlalchemy import String + from sqlalchemy import Table + + from sqlalchemy import create_engine + from sqlalchemy.orm import mapper + from sqlalchemy.orm import sessionmaker + from sqlalchemy.schema import Index + from sqlalchemy.schema import UniqueConstraint + cache = True +except: + cache = False + sys.path.append('..') sys.path.append('../..') @@ -46,88 +67,115 @@ conf = pykolab.getConf() auth = Auth() # -# Caching routines using buzhug. +# Caching routines using MySQL-python. # # If creating the cache fails, we continue without any caching, significantly # increasing the load on LDAP. # - -# TODO: This should be a configuration item. cache_expire = 3600 - try: - from buzhug import TS_Base - - if os.access( - os.path.join(KOLAB_LIB_PATH, 'kolab_smtp_access_policy'), - os.W_OK - ): - cache_path = os.path.join( - KOLAB_LIB_PATH, - 'kolab_smtp_access_policy', - 'cache' - ) + metadata = MetaData() +except: + cache = False - elif os.access('/tmp/', os.W_OK): - cache_path = os.path.join( - '/tmp/', - 'kolab_smtp_access_policy' +if cache: + session = None + policy_result_table = Table( + 'policy_result', metadata, + Column('id', Integer, primary_key=True), + Column('key', String(16), nullable=False), + Column('value', Boolean, nullable=False), + Column('sender', String(64), nullable=False), + Column('recipient', String(64), nullable=False), + Column('sasl_username', String(64)), + Column('sasl_sender', String(64)), + Column('created', Integer, nullable=False), + #Index('key', 'sender', 'recipient', + #'sasl_username', 'sasl_sender', unique=True) + #UniqueConstraint('key','sender','recipient','sasl_username','sasl_sender', name='fsrss') ) - else: - raise OSError, _("No writeable path for cache found, bailing out") - if os.path.exists(cache_path): - mode = "open" + Index('fsrss', policy_result_table.c.key, policy_result_table.c.sender, policy_result_table.c.recipient, + policy_result_table.c.sasl_username, policy_result_table.c.sasl_sender, unique=True) + +class PolicyResult(object): + def __init__(self, key=None, value=None, sender=None, recipient=None, sasl_username=None, sasl_sender=None): + self.key = key + self.value = value + self.sender = sender + self.sasl_username = sasl_username + self.sasl_sender = sasl_sender + self.recipient = recipient + self.created = (int)(time.time()) + +if cache: + mapper(PolicyResult, policy_result_table) + +def cache_cleanup(): + if not cache == True: + return + + log.debug(_("Cleaning up the cache"), level=8) + session.query( + PolicyResult + ).filter( + PolicyResult.created < ((int)(time.time()) - cache_expire) + ).delete() + +def cache_init(): + global cache, cache_expire, session + + if not cache == True: + return + + if conf.has_section('kolab_smtp_access_policy'): + if conf.has_option('kolab_smtp_access_policy', 'uri'): + cache_uri = conf.get('kolab_smtp_access_policy', 'uri') + cache = True + if conf.has_option('kolab_smtp_access_policy', 'retention'): + cache_expire = (int)(conf.get('kolab_smtp_access_policy', 'retention')) + else: + return False else: - mode = "override" - - cache = TS_Base(cache_path) - try: - log.debug(_("Attempting to use cache in %s") %(cache_path), level=8) - cache.create( - ('sender', str), - ('recipient', str), - ('sasl_username', str), - ('sasl_sender', str), - ('function', str), - ('result', int), - ('expire', float), - mode=mode - ) - - except: - log.debug(_("Using cache in %s failed") %(cache_path), level=8) - try: - log.debug( - _("Attempting to create cache in %s") %(cache_path), - level=8 - ) - - cache.create( - ('sender', str), - ('recipient', str), - ('sasl_username', str), - ('sasl_sender', str), - ('function', str), - ('result', int), - ('expire', float), - mode="override" - ) + return False - except: - log.error(_("Kolab SMTP Access Policy Cache not writeable!")) - cache = False + if cache: + engine = create_engine(cache_uri, echo=True) - log.info(_("Kolab SMTP Access Policy Cache not enabled")) - cache = False - -except ImportError: - log.warning(_("Could not import caching library, caching disabled")) - cache = False - -except OSError, e: - log.warning(_("%s, caching disabled") %(e)) - cache = False + try: + metadata.create_all(engine) + except sqlalchemy.exc.OperationalError, e: + log.error(_("Operational Error in caching: %s" %(e))) + return False + + Session = sessionmaker(bind=engine) + session = Session() + + return cache + +def cache_select(sender, recipient, function, sasl_username='',sasl_sender=''): + if not cache == True: + return None + + return session.query( + PolicyResult + ).filter_by( + key=function, + sender=sender, + recipient=recipient, + sasl_username=sasl_username, + sasl_sender=sasl_sender + ).filter( + PolicyResult.created >= ((int)(time.time()) - cache_expire) + ).first() + +def cache_insert(sender, recipient, function, result, sasl_username='',sasl_sender=''): + if not cache == True: + return [] + + log.debug(_("Caching the policy result with timestamp %d") %((int)(time.time())), level=8) + session.add(PolicyResult(key=function,value=result,sender=sender,recipient=recipient,sasl_username=sasl_username,sasl_sender=sasl_sender)) + session.commit() def defer_if_permit(message, policy_request=None): log.info(_("Returning action DEFER_IF_PERMIT: %s") %(message)) @@ -154,7 +202,7 @@ def parse_address(email_address): Parse an address; Strip off anything after a recipient delimiter. """ - # TODO: Recipient delimiter is configurable! + # TODO: Recipient delimiter is configurable. if len(email_address.split("+")) > 1: # Take the first part split by recipient delimiter and the last part # split by '@'. @@ -168,6 +216,12 @@ def parse_address(email_address): return email_address def parse_policy(sender, recipient, policy): + + # TODO: A future feature is to allow special values to be expanded + #special_rule_values = { + # '$mydomains': 'expand_mydomains' + # ] + rules = { 'allow': [], 'deny': [] } for rule in policy: @@ -180,6 +234,18 @@ def parse_policy(sender, recipient, policy): allowed = False for rule in rules['allow']: + # TODO: Example implementation of getting the special values to expand. + # Note that the append works to extend the for loop. + #if rule in special_rule_values: + ## TODO: Expand the special rule value and do something + ## intelligent. + #if rule == '$mydomains': + #mydomains = auth.list_domains() + #for mydomain in mydomains: + #rules['allow'].append(mydomain) + + #continue + deny_override = False if recipient.endswith(rule): #print "Matched allow rule:", rule @@ -226,7 +292,7 @@ def read_request_input(): end_of_request = True else: policy_request[request_line.split('=')[0]] = \ - '='.join(request_line.split('=')[1:]) + '='.join(request_line.split('=')[1:]).lower() return policy_request @@ -287,7 +353,7 @@ def verify_delegate(policy_request, sender_domain, sender_user): # Got a final answer here, do the cachking thing. if not cache == False: - result_set = cache.select( + result = cache_select( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], @@ -295,15 +361,14 @@ def verify_delegate(policy_request, sender_domain, sender_user): function='verify_sender' ) - if len(result_set) < 1: - record_id = cache.insert( + if result == None: + record_id = cache_insert( sender=policy_request['sender'], recipient=policy_request['recipient'], + result=False, sasl_username=policy_request['sasl_username'], sasl_sender=policy_request['sasl_sender'], - function='verify_sender', - result=0, - expire=time.time() + cache_expire + function='verify_sender' ) sender_is_delegate = False @@ -379,7 +444,7 @@ def verify_recipient(policy_request): recipient_verified = False if not cache == False: - records = cache( + record = cache_select( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], @@ -387,19 +452,14 @@ def verify_recipient(policy_request): function='verify_recipient' ) - for record in records: - if record.expire < time.time(): - # Purge record - cache.delete(record) - cache.cleanup() - else: - log.info(_("Reproducing verify_recipient(%r) from cache, " + \ - "saving you queries, time and thus money.") %( - policy_request - ) - ) + if not record == None: + log.info(_("Reproducing verify_recipient(%r) from cache, " + \ + "saving you queries, time and thus money.") %( + policy_request + ) + ) - return record.result + return record.value # TODO: Under some conditions, the recipient may not be fully qualified. # We'll cross that bridge when we get there, though. @@ -422,10 +482,18 @@ def verify_recipient(policy_request): attr_search, parse_address(policy_request['recipient']), domain=domain, + # TODO: Get the filter from the configuration. additional_filter="(&(objectclass=kolabinetorgperson)%(search_filter)s)" ) } + # We have gotten an invalid recipient. We need to catch this case, because + # testing can input invalid recipients, and so can faulty applications, or + # misconfigured servers. + if not user['dn']: + reject(_("Invalid recipient")) + return False + recipient_policy = auth.get_user_attribute( domain, user, @@ -445,7 +513,7 @@ def verify_recipient(policy_request): ) if not cache == False: - result_set = cache.select( + result_set = cache_select( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], @@ -454,7 +522,7 @@ def verify_recipient(policy_request): ) if len(result_set) < 1: - record_id = cache.insert( + record_id = cache_insert( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], @@ -499,7 +567,7 @@ def verify_sender(policy_request): sasl_user = False if not cache == False: - records = cache( + record = cache_select( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], @@ -507,19 +575,14 @@ def verify_sender(policy_request): function='verify_sender' ) - for record in records: - if record.expire < time.time(): - # Purge record - cache.delete(record) - cache.cleanup() - else: - log.info(_("Reproducing verify_sender(%r) from cache, " + \ - "saving you queries, time and thus money.") %( - policy_request - ) - ) + if not record == None: + log.info(_("Reproducing verify_sender(%r) from cache, " + \ + "saving you queries, time and thus money.") %( + policy_request + ) + ) - return record.result + return record sender_domain = policy_request['sender'].split('@')[1] @@ -623,7 +686,7 @@ def verify_sender(policy_request): ) if not cache == False: - result_set = cache.select( + result = cache_select( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], @@ -631,15 +694,14 @@ def verify_sender(policy_request): function='verify_sender' ) - if len(result_set) < 1: - record_id = cache.insert( + if result == None: + record_id = cache_insert( sender=policy_request['sender'], recipient=policy_request['recipient'], sasl_username=policy_request['sasl_username'], sasl_sender=policy_request['sasl_sender'], function='verify_sender', - result=(int)(sender_verified), - expire=time.time() + cache_expire + result=sender_verified ) return sender_verified @@ -669,10 +731,13 @@ if __name__ == "__main__": conf.finalize_conf() + cache = cache_init() + # Start the work while True: policy_request = read_request_input() break + # Set the overall default policy in case nothing attracts any particular # type of action. # @@ -791,4 +856,7 @@ if __name__ == "__main__": reject(_("Recipient access denied"), policy_request) else: - permit(_("No objections"))
\ No newline at end of file + permit(_("No objections")) + + if cache: + cache_cleanup()
\ No newline at end of file |