summaryrefslogtreecommitdiffstats
path: root/bin
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2011-07-29 13:00:51 -0400
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2011-07-29 13:00:51 -0400
commite4c7a4bd74b60876286b74bc1834425a4b583bf5 (patch)
treee920163fdc6a323bd59e97a97f4107b06db494fb /bin
parentebd1d9e9042a6a90e1ded684870b2ba9e667dc14 (diff)
downloadpykolab-e4c7a4bd74b60876286b74bc1834425a4b583bf5.tar.gz
Enhance kolab_smtp_access_policy by providing it with sqlalchemy-based SQL caching
Diffstat (limited to 'bin')
-rwxr-xr-x[-rw-r--r--]bin/kolab_smtp_access_policy.py292
1 files changed, 180 insertions, 112 deletions
diff --git a/bin/kolab_smtp_access_policy.py b/bin/kolab_smtp_access_policy.py
index c814e59..23e3324 100644..100755
--- a/bin/kolab_smtp_access_policy.py
+++ b/bin/kolab_smtp_access_policy.py
@@ -25,6 +25,27 @@ import time
from optparse import OptionParser
from ConfigParser import SafeConfigParser
+cache = None
+
+try:
+ import sqlalchemy
+ from sqlalchemy import Boolean
+ from sqlalchemy import Column
+ from sqlalchemy import DateTime
+ from sqlalchemy import Integer
+ from sqlalchemy import MetaData
+ from sqlalchemy import String
+ from sqlalchemy import Table
+
+ from sqlalchemy import create_engine
+ from sqlalchemy.orm import mapper
+ from sqlalchemy.orm import sessionmaker
+ from sqlalchemy.schema import Index
+ from sqlalchemy.schema import UniqueConstraint
+ cache = True
+except:
+ cache = False
+
sys.path.append('..')
sys.path.append('../..')
@@ -46,88 +67,115 @@ conf = pykolab.getConf()
auth = Auth()
#
-# Caching routines using buzhug.
+# Caching routines using MySQL-python.
#
# If creating the cache fails, we continue without any caching, significantly
# increasing the load on LDAP.
#
-
-# TODO: This should be a configuration item.
cache_expire = 3600
-
try:
- from buzhug import TS_Base
-
- if os.access(
- os.path.join(KOLAB_LIB_PATH, 'kolab_smtp_access_policy'),
- os.W_OK
- ):
- cache_path = os.path.join(
- KOLAB_LIB_PATH,
- 'kolab_smtp_access_policy',
- 'cache'
- )
+ metadata = MetaData()
+except:
+ cache = False
- elif os.access('/tmp/', os.W_OK):
- cache_path = os.path.join(
- '/tmp/',
- 'kolab_smtp_access_policy'
+if cache:
+ session = None
+ policy_result_table = Table(
+ 'policy_result', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('key', String(16), nullable=False),
+ Column('value', Boolean, nullable=False),
+ Column('sender', String(64), nullable=False),
+ Column('recipient', String(64), nullable=False),
+ Column('sasl_username', String(64)),
+ Column('sasl_sender', String(64)),
+ Column('created', Integer, nullable=False),
+ #Index('key', 'sender', 'recipient',
+ #'sasl_username', 'sasl_sender', unique=True)
+ #UniqueConstraint('key','sender','recipient','sasl_username','sasl_sender', name='fsrss')
)
- else:
- raise OSError, _("No writeable path for cache found, bailing out")
- if os.path.exists(cache_path):
- mode = "open"
+ Index('fsrss', policy_result_table.c.key, policy_result_table.c.sender, policy_result_table.c.recipient,
+ policy_result_table.c.sasl_username, policy_result_table.c.sasl_sender, unique=True)
+
+class PolicyResult(object):
+ def __init__(self, key=None, value=None, sender=None, recipient=None, sasl_username=None, sasl_sender=None):
+ self.key = key
+ self.value = value
+ self.sender = sender
+ self.sasl_username = sasl_username
+ self.sasl_sender = sasl_sender
+ self.recipient = recipient
+ self.created = (int)(time.time())
+
+if cache:
+ mapper(PolicyResult, policy_result_table)
+
+def cache_cleanup():
+ if not cache == True:
+ return
+
+ log.debug(_("Cleaning up the cache"), level=8)
+ session.query(
+ PolicyResult
+ ).filter(
+ PolicyResult.created < ((int)(time.time()) - cache_expire)
+ ).delete()
+
+def cache_init():
+ global cache, cache_expire, session
+
+ if not cache == True:
+ return
+
+ if conf.has_section('kolab_smtp_access_policy'):
+ if conf.has_option('kolab_smtp_access_policy', 'uri'):
+ cache_uri = conf.get('kolab_smtp_access_policy', 'uri')
+ cache = True
+ if conf.has_option('kolab_smtp_access_policy', 'retention'):
+ cache_expire = (int)(conf.get('kolab_smtp_access_policy', 'retention'))
+ else:
+ return False
else:
- mode = "override"
-
- cache = TS_Base(cache_path)
- try:
- log.debug(_("Attempting to use cache in %s") %(cache_path), level=8)
- cache.create(
- ('sender', str),
- ('recipient', str),
- ('sasl_username', str),
- ('sasl_sender', str),
- ('function', str),
- ('result', int),
- ('expire', float),
- mode=mode
- )
-
- except:
- log.debug(_("Using cache in %s failed") %(cache_path), level=8)
- try:
- log.debug(
- _("Attempting to create cache in %s") %(cache_path),
- level=8
- )
-
- cache.create(
- ('sender', str),
- ('recipient', str),
- ('sasl_username', str),
- ('sasl_sender', str),
- ('function', str),
- ('result', int),
- ('expire', float),
- mode="override"
- )
+ return False
- except:
- log.error(_("Kolab SMTP Access Policy Cache not writeable!"))
- cache = False
+ if cache:
+ engine = create_engine(cache_uri, echo=True)
- log.info(_("Kolab SMTP Access Policy Cache not enabled"))
- cache = False
-
-except ImportError:
- log.warning(_("Could not import caching library, caching disabled"))
- cache = False
-
-except OSError, e:
- log.warning(_("%s, caching disabled") %(e))
- cache = False
+ try:
+ metadata.create_all(engine)
+ except sqlalchemy.exc.OperationalError, e:
+ log.error(_("Operational Error in caching: %s" %(e)))
+ return False
+
+ Session = sessionmaker(bind=engine)
+ session = Session()
+
+ return cache
+
+def cache_select(sender, recipient, function, sasl_username='',sasl_sender=''):
+ if not cache == True:
+ return None
+
+ return session.query(
+ PolicyResult
+ ).filter_by(
+ key=function,
+ sender=sender,
+ recipient=recipient,
+ sasl_username=sasl_username,
+ sasl_sender=sasl_sender
+ ).filter(
+ PolicyResult.created >= ((int)(time.time()) - cache_expire)
+ ).first()
+
+def cache_insert(sender, recipient, function, result, sasl_username='',sasl_sender=''):
+ if not cache == True:
+ return []
+
+ log.debug(_("Caching the policy result with timestamp %d") %((int)(time.time())), level=8)
+ session.add(PolicyResult(key=function,value=result,sender=sender,recipient=recipient,sasl_username=sasl_username,sasl_sender=sasl_sender))
+ session.commit()
def defer_if_permit(message, policy_request=None):
log.info(_("Returning action DEFER_IF_PERMIT: %s") %(message))
@@ -154,7 +202,7 @@ def parse_address(email_address):
Parse an address; Strip off anything after a recipient delimiter.
"""
- # TODO: Recipient delimiter is configurable!
+ # TODO: Recipient delimiter is configurable.
if len(email_address.split("+")) > 1:
# Take the first part split by recipient delimiter and the last part
# split by '@'.
@@ -168,6 +216,12 @@ def parse_address(email_address):
return email_address
def parse_policy(sender, recipient, policy):
+
+ # TODO: A future feature is to allow special values to be expanded
+ #special_rule_values = {
+ # '$mydomains': 'expand_mydomains'
+ # ]
+
rules = { 'allow': [], 'deny': [] }
for rule in policy:
@@ -180,6 +234,18 @@ def parse_policy(sender, recipient, policy):
allowed = False
for rule in rules['allow']:
+ # TODO: Example implementation of getting the special values to expand.
+ # Note that the append works to extend the for loop.
+ #if rule in special_rule_values:
+ ## TODO: Expand the special rule value and do something
+ ## intelligent.
+ #if rule == '$mydomains':
+ #mydomains = auth.list_domains()
+ #for mydomain in mydomains:
+ #rules['allow'].append(mydomain)
+
+ #continue
+
deny_override = False
if recipient.endswith(rule):
#print "Matched allow rule:", rule
@@ -226,7 +292,7 @@ def read_request_input():
end_of_request = True
else:
policy_request[request_line.split('=')[0]] = \
- '='.join(request_line.split('=')[1:])
+ '='.join(request_line.split('=')[1:]).lower()
return policy_request
@@ -287,7 +353,7 @@ def verify_delegate(policy_request, sender_domain, sender_user):
# Got a final answer here, do the cachking thing.
if not cache == False:
- result_set = cache.select(
+ result = cache_select(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
@@ -295,15 +361,14 @@ def verify_delegate(policy_request, sender_domain, sender_user):
function='verify_sender'
)
- if len(result_set) < 1:
- record_id = cache.insert(
+ if result == None:
+ record_id = cache_insert(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
+ result=False,
sasl_username=policy_request['sasl_username'],
sasl_sender=policy_request['sasl_sender'],
- function='verify_sender',
- result=0,
- expire=time.time() + cache_expire
+ function='verify_sender'
)
sender_is_delegate = False
@@ -379,7 +444,7 @@ def verify_recipient(policy_request):
recipient_verified = False
if not cache == False:
- records = cache(
+ record = cache_select(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
@@ -387,19 +452,14 @@ def verify_recipient(policy_request):
function='verify_recipient'
)
- for record in records:
- if record.expire < time.time():
- # Purge record
- cache.delete(record)
- cache.cleanup()
- else:
- log.info(_("Reproducing verify_recipient(%r) from cache, " + \
- "saving you queries, time and thus money.") %(
- policy_request
- )
- )
+ if not record == None:
+ log.info(_("Reproducing verify_recipient(%r) from cache, " + \
+ "saving you queries, time and thus money.") %(
+ policy_request
+ )
+ )
- return record.result
+ return record.value
# TODO: Under some conditions, the recipient may not be fully qualified.
# We'll cross that bridge when we get there, though.
@@ -422,10 +482,18 @@ def verify_recipient(policy_request):
attr_search,
parse_address(policy_request['recipient']),
domain=domain,
+ # TODO: Get the filter from the configuration.
additional_filter="(&(objectclass=kolabinetorgperson)%(search_filter)s)"
)
}
+ # We have gotten an invalid recipient. We need to catch this case, because
+ # testing can input invalid recipients, and so can faulty applications, or
+ # misconfigured servers.
+ if not user['dn']:
+ reject(_("Invalid recipient"))
+ return False
+
recipient_policy = auth.get_user_attribute(
domain,
user,
@@ -445,7 +513,7 @@ def verify_recipient(policy_request):
)
if not cache == False:
- result_set = cache.select(
+ result_set = cache_select(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
@@ -454,7 +522,7 @@ def verify_recipient(policy_request):
)
if len(result_set) < 1:
- record_id = cache.insert(
+ record_id = cache_insert(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
@@ -499,7 +567,7 @@ def verify_sender(policy_request):
sasl_user = False
if not cache == False:
- records = cache(
+ record = cache_select(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
@@ -507,19 +575,14 @@ def verify_sender(policy_request):
function='verify_sender'
)
- for record in records:
- if record.expire < time.time():
- # Purge record
- cache.delete(record)
- cache.cleanup()
- else:
- log.info(_("Reproducing verify_sender(%r) from cache, " + \
- "saving you queries, time and thus money.") %(
- policy_request
- )
- )
+ if not record == None:
+ log.info(_("Reproducing verify_sender(%r) from cache, " + \
+ "saving you queries, time and thus money.") %(
+ policy_request
+ )
+ )
- return record.result
+ return record
sender_domain = policy_request['sender'].split('@')[1]
@@ -623,7 +686,7 @@ def verify_sender(policy_request):
)
if not cache == False:
- result_set = cache.select(
+ result = cache_select(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
@@ -631,15 +694,14 @@ def verify_sender(policy_request):
function='verify_sender'
)
- if len(result_set) < 1:
- record_id = cache.insert(
+ if result == None:
+ record_id = cache_insert(
sender=policy_request['sender'],
recipient=policy_request['recipient'],
sasl_username=policy_request['sasl_username'],
sasl_sender=policy_request['sasl_sender'],
function='verify_sender',
- result=(int)(sender_verified),
- expire=time.time() + cache_expire
+ result=sender_verified
)
return sender_verified
@@ -669,10 +731,13 @@ if __name__ == "__main__":
conf.finalize_conf()
+ cache = cache_init()
+
# Start the work
while True:
policy_request = read_request_input()
break
+
# Set the overall default policy in case nothing attracts any particular
# type of action.
#
@@ -791,4 +856,7 @@ if __name__ == "__main__":
reject(_("Recipient access denied"), policy_request)
else:
- permit(_("No objections")) \ No newline at end of file
+ permit(_("No objections"))
+
+ if cache:
+ cache_cleanup() \ No newline at end of file