summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pykolab/__init__.py15
-rw-r--r--pykolab/cli/__init__.py24
-rw-r--r--pykolab/conf/__init__.py27
-rw-r--r--pykolab/conf/defaults.py4
-rw-r--r--pykolab/imap/__init__.py192
-rw-r--r--pykolab/imap/cyrus.py156
-rw-r--r--pykolab/tests/__init__.py23
-rw-r--r--pykolab/tests/zpush/__init__.py19
-rw-r--r--pykolab/tests/zpush/test_000_000.py86
-rw-r--r--pykolab/tests/zpush/test_000_001.py13
10 files changed, 291 insertions, 268 deletions
diff --git a/pykolab/__init__.py b/pykolab/__init__.py
index 3f751b9..9a36ef8 100644
--- a/pykolab/__init__.py
+++ b/pykolab/__init__.py
@@ -17,6 +17,12 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
+"""
+
+ pyKolab, the interface to Kolab Groupware Solution management in Python.
+
+"""
+
import logging
import shutil
import sys
@@ -26,6 +32,9 @@ from pykolab.logger import Logger
logging.setLoggerClass(Logger)
def getLogger(name):
+ """
+ Return the correct logger class.
+ """
logging.setLoggerClass(Logger)
log = logging.getLogger(name=name)
return log
@@ -36,3 +45,9 @@ conf = Conf()
def getConf():
return conf
+
+from pykolab.auth import Auth
+auth = Auth()
+
+from pykolab.imap import IMAP
+imap = IMAP()
diff --git a/pykolab/cli/__init__.py b/pykolab/cli/__init__.py
index 5b793f2..3700302 100644
--- a/pykolab/cli/__init__.py
+++ b/pykolab/cli/__init__.py
@@ -32,14 +32,15 @@ import pykolab.plugins
from pykolab import utils
from pykolab import conf
-from pykolab.auth import Auth
-from pykolab.imap import IMAP
from pykolab.constants import *
from pykolab.translate import _
log = pykolab.getLogger('pykolab.cli')
conf = pykolab.getConf()
+auth = pykolab.auth
+imap = pykolab.imap
+
class Cli(object):
def __init__(self):
domain_group = conf.add_cli_parser_option_group(_("CLI Options"))
@@ -82,12 +83,6 @@ class Cli(object):
sys.exit(1)
def action_sync(self):
- imap = IMAP()
- if hasattr(imap,'auth'):
- auth = imap.auth
- else:
- auth = Auth()
-
log.debug(_("Listing domains..."), level=5)
start = time.time()
domains = auth.list_domains()
@@ -115,8 +110,7 @@ class Cli(object):
"""
List deleted mailboxes
"""
- imap = IMAP()
- imap._connect()
+ imap.connect()
folders = imap.lm("DELETED/*")
print "Deleted folders:"
for folder in folders:
@@ -130,8 +124,7 @@ class Cli(object):
undelete_folder = conf.cli_args.pop(0)
- imap = IMAP()
- imap._connect()
+ imap.connect()
folders = imap.lm("DELETED/*")
for folder in folders:
if undelete_folder == folder:
@@ -146,7 +139,6 @@ class Cli(object):
def action_list_domains(self):
# Create the authentication object.
# TODO: Binds with superuser credentials!
- auth = Auth()
domains = auth.list_domains()
# TODO: Take a hint in --quiet, and otherwise print out a nice table
@@ -155,7 +147,7 @@ class Cli(object):
print _("Primary domain: %s - Secondary domain(s): %s") %(domain, ', '.join(domain_aliases))
def action_del_domain(self):
- domainname = conf.args.pop(0)
+ domainname = conf.cli_args.pop(0)
log.info(_("Deleting domain %s") %(domainname))
@@ -177,7 +169,7 @@ class Cli(object):
def action_add_domain(self):
log.info(_("TODO: Figure out where the domain should actually be added."))
- domainname = conf.args.pop(0)
+ domainname = conf.cli_args.pop(0)
log.info(_("Adding domain %s") %(domainname))
@@ -203,7 +195,7 @@ class Cli(object):
go_ahead = True
- if conf.cli_options.review:
+ if conf.cli_keywords.review:
ldif_writer = ldif.LDIFWriter(sys.stdout)
ldif_writer.unparse(dn,attrs)
if not utils.ask_confirmation(_("Please ACK or NACK the above LDIF:"), default="y", all_inclusive_no=True):
diff --git a/pykolab/conf/__init__.py b/pykolab/conf/__init__.py
index a57821a..bd1bff8 100644
--- a/pykolab/conf/__init__.py
+++ b/pykolab/conf/__init__.py
@@ -91,6 +91,7 @@ class Conf(object):
# Also set the cli options
for option in self.cli_keywords.__dict__.keys():
+ retval = False
if hasattr(self, "check_setting_%s" %(option)):
exec("retval = self.check_setting_%s(%r)" % (option, self.cli_keywords.__dict__[option]))
@@ -114,15 +115,12 @@ class Conf(object):
if section == 'testing':
continue
- #print "section: %s" %(section)
if not config.has_section(section):
- #print "no section for section %s, continuing" %(section)
continue
for key in self.defaults.__dict__[section].keys():
- #print "key: %s" %(key)
+ retval = False
if not config.has_option(section, key):
- #print "no key for option %s in section %s, continuing" %(key,section)
continue
if isinstance(self.defaults.__dict__[section][key], int):
@@ -144,11 +142,11 @@ class Conf(object):
continue
if not self.defaults.__dict__[section][key] == value:
- setattr(self,"%s_%s" %(section,key),value)
if key.count('password') >= 1:
log.debug(_("Setting %s_%s to '****' (from configuration file)") %(section,key), level=8)
else:
log.debug(_("Setting %s_%s to %r (from configuration file)") %(section,key,value), level=8)
+ setattr(self,"%s_%s" %(section,key),value)
def options_set_from_config(self):
"""
@@ -179,6 +177,7 @@ class Conf(object):
return
for key in config.options('testing'):
+ retval = False
if isinstance(self.defaults.__dict__['testing'][key], int):
value = config.getint('testing',key)
@@ -192,7 +191,7 @@ class Conf(object):
value = eval(config.get('testing',key))
if hasattr(self,"check_setting_%s_%s" %('testing',key)):
- exec("retval = self.check_setting_%s(%r)" % ('testing',key,value))
+ exec("retval = self.check_setting_%s_%s(%r)" % ('testing',key,value))
if not retval:
# We just don't set it, check_setting_%s should have
# taken care of the error messages
@@ -457,6 +456,7 @@ class Conf(object):
TODO: Include getting the value from plugins through a hook.
"""
+ retval = False
if not self.cfg_parser:
self.read_config()
@@ -533,9 +533,9 @@ class Conf(object):
if value:
try:
import imaplib
- setattr(self,"use_imap",value)
+ self.use_imap = value
return True
- except ImportError, e:
+ except ImportError:
log.error(_("No imaplib library found."))
return False
@@ -543,9 +543,9 @@ class Conf(object):
if value:
try:
from smtplib import LMTP
- setattr(self,"use_lmtp",value)
+ self.use_lmtp = value
return True
- except ImportError, e:
+ except ImportError:
log.error(_("No LMTP class found in the smtplib library."))
return False
@@ -553,9 +553,9 @@ class Conf(object):
if value:
try:
from smtplib import SMTP
- setattr(self,"use_mail",value)
+ self.use_mail = value
return True
- except ImportError, e:
+ except ImportError:
log.error(_("No SMTP class found in the smtplib library."))
return False
@@ -563,6 +563,9 @@ class Conf(object):
# Attempt to load the suite,
# Get the suite's options,
# Set them here.
+ if not hasattr(self,'test_suites'):
+ self.test_suites = []
+
if "zpush" in value:
selectively = False
for item in [ 'calendar', 'contacts', 'mail' ]:
diff --git a/pykolab/conf/defaults.py b/pykolab/conf/defaults.py
index 9e3a8ec..437e3f6 100644
--- a/pykolab/conf/defaults.py
+++ b/pykolab/conf/defaults.py
@@ -22,3 +22,7 @@ import logging
class Defaults(object):
def __init__(self, plugins=None):
self.loglevel = logging.CRITICAL
+
+ # An integer or float to indicate the interval at which the Cyrus IMAP
+ # library should try to retrieve annotations
+ self.cyrus_annotations_retry_interval = 1 \ No newline at end of file
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py
index 441fb8a..a3533f3 100644
--- a/pykolab/imap/__init__.py
+++ b/pykolab/imap/__init__.py
@@ -17,52 +17,85 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
+
import logging
import re
import time
+from urlparse import urlparse
+
import pykolab
-from pykolab.auth import Auth
from pykolab.translate import _
-conf = pykolab.getConf()
log = pykolab.getLogger('pykolab.imap')
+conf = pykolab.getConf()
+
+auth = pykolab.auth
class IMAP(object):
def __init__(self):
- self.auth = Auth()
+ # Pool of named IMAP connections, by hostname
+ self._imap = {}
+ # Place holder for the current IMAP connection
self.imap = None
- def _connect(self):
- if not self.imap:
- if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
- import cyruslib
- try:
- self.imap = cyruslib.CYRUS(conf.get('cyrus-imap', 'uri'))
- # TODO: Actually handle the error
- except cyruslib.CYRUSError, e:
- (code, error, message) = e
- raise cyruslib.CYRUSError, e
-
- if conf.debuglevel >= 9:
- self.imap.VERBOSE = 1
- try:
- admin_login = conf.get('cyrus-imap', 'admin_login')
- admin_password = conf.get('cyrus-imap', 'admin_password')
- self.imap.login(admin_login, admin_password)
- self.seperator = self.imap.m.getsep()
- except cyruslib.CYRUSError, e:
- (code, error, message) = e
- if error == 'LOGIN':
- log.error(_("Invalid login credentials for IMAP Administrator"))
+ def connect(self, uri=None):
+ backend = conf.get('kolab', 'imap_backend')
+
+ if uri == None:
+ uri = conf.get(backend, 'uri')
+
+ result = urlparse(uri)
+
+ # Get the credentials
+ admin_login = conf.get(backend, 'admin_login')
+ admin_password = conf.get(backend, 'admin_password')
+
+ if not self._imap.has_key(result.hostname):
+ if backend == 'cyrus-imap':
+ import cyrus
+ self._imap[result.hostname] = cyrus.Cyrus(uri)
+ # Actually connect
+ log.debug(_("Logging on to Cyrus IMAP server %s") %(result.hostname), level=8)
+ self._imap[result.hostname].login(admin_login, admin_password)
+
+ elif backend == 'dovecot':
+ import dovecot
+ self._imap[result.hostname] = dovecot.Dovecot(uri)
+ # Actually connect
+ log.debug(_("Logging on to Dovecot IMAP server %s") %(result.hostname), level=8)
+ self._imap[result.hostname].login(admin_login, admin_password)
+
else:
- import dovecotlib
- self.imap = dovecotlib.IMAP4()
+ import imap
+ self._imap[result.hostname] = imap.IMAP(uri)
+ # Actually connect
+ log.debug(_("Logging on to generic IMAP server %s") %(result.hostname), level=8)
+ self._imap[result.hostname].login(admin_login, admin_password)
+ else:
+ log.debug(_("Reusing existing IMAP server connection to %s") %(result.hostname), level=8)
- def _disconnect(self):
- del self.imap
- self.imap = None
+ # Set the newly created technology specific IMAP library as the current
+ # IMAP connection to be used.
+ self.imap = self._imap[result.hostname]
+
+ def disconnect(self, server=None):
+ if server == None:
+ # No server specified, but make sure self.imap is None anyways
+ self.imap = None
+ else:
+ if self._imap.has_key(server):
+ del self._imap[server]
+ else:
+ log.warning(_("Called imap.disconnect() on a server that " + \
+ "we had no connection to"))
+
+ def __getattr__(self, name):
+ if hasattr(self.imap, name):
+ return getattr(self.imap, name)
+ else:
+ raise AttributeError, _("%r has no attribute %s") %(self,name)
def has_folder(self, folder):
folders = self.imap.lm(folder)
@@ -84,10 +117,7 @@ class IMAP(object):
log.debug(_("Found old INBOX folder %s") %(old_inbox), level=8)
if not self.has_folder(inbox):
- if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
- from pykolab.imap.cyrus import Cyrus
- _imap = Cyrus(self.imap)
- _imap.rename(old_inbox,inbox)
+ self.imap.rename(old_inbox,inbox)
else:
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") %(old_inbox,inbox))
else:
@@ -98,7 +128,7 @@ class IMAP(object):
def create_user_folders(self, users, primary_domain, secondary_domains):
inbox_folders = []
- domain_section = self.auth.domain_section(primary_domain)
+ domain_section = auth.domain_section(primary_domain)
folders = self.list_user_folders(primary_domain, secondary_domains)
@@ -108,7 +138,7 @@ class IMAP(object):
#print domain
if not users:
- users = self.auth.list_users(primary_domain)
+ users = auth.list_users(primary_domain)
for user in users:
if type(user) == dict:
@@ -124,21 +154,26 @@ class IMAP(object):
continue
else:
# TODO: Perhaps this block is moot
- log.info(_("Creating new INBOX for user (%d): %s") %(1,folder))
- self.imap.cm("user/%s" %(folder))
+ log.info(_("Creating new INBOX for user (%d): %s")
+ %(1,folder))
+ try:
+ self.imap.cm("user/%s" %(folder))
+ except:
+ log.warning(_("Mailbox already exists: user/%s")
+ %(folder))
+ continue
if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
- from pykolab.imap.cyrus import Cyrus
- _imap = Cyrus(self.imap)
- _imap.setquota("user/%s" %(folder),0)
+ self.imap._setquota("user/%s" %(folder),0)
except:
# TODO: Perhaps this block is moot
log.info(_("Creating new INBOX for user (%d): %s") %(2,folder))
- self.imap.cm("user/%s" %(folder))
- if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
- from pykolab.imap.cyrus import Cyrus
- _imap = Cyrus(self.imap)
- _imap.setquota("user/%s" %(folder),0)
+ try:
+ self.imap.cm("user/%s" %(folder))
+ except:
+ log.warning(_("Mailbox already exists: user/%s") %(folder))
+ continue
+ self.imap._setquota("user/%s" %(folder),0)
if conf.has_option(domain_section, "autocreate_folders"):
_additional_folders = conf.get_raw(domain_section, "autocreate_folders")
@@ -162,23 +197,24 @@ class IMAP(object):
_add_folder['username'] = folder.split('@')[0]
_add_folder['domainname'] = folder.split('@')[1]
_add_folder['additional_folder_name'] = additional_folder
- _add_folder['seperator'] = self.seperator
+ _add_folder['seperator'] = self.imap.seperator
folder_name = folder_name % _add_folder
else:
folder_name = "user%(seperator)s%(username)s%(seperator)s%(additional_folder_name)s" % {
"username": folder,
- "seperator": self.seperator,
+ "seperator": self.imap.seperator,
"additional_folder_name": additional_folder
}
- self.imap.cm(folder_name)
+ try:
+ self.imap.cm(folder_name)
+ except:
+ log.warning(_("Mailbox already exists: user/%s") %(folder))
if additional_folders[additional_folder].has_key("annotations"):
for annotation in additional_folders[additional_folder]["annotations"].keys():
if conf.get('kolab', 'imap_backend') == 'cyrus-imap':
- from pykolab.imap.cyrus import Cyrus
- _imap = Cyrus(self.imap)
- _imap.setannotation(
+ self.imap._setannotation(
folder_name,
"%s" %(annotation),
"%s" %(additional_folders[additional_folder]["annotations"][annotation])
@@ -193,7 +229,7 @@ class IMAP(object):
)
def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]):
- self._connect()
+ self.connect()
if conf.has_option(primary_domain, 'quota_attribute'):
_quota_attr = conf.get(primary_domain, 'quota_attribute')
@@ -203,13 +239,13 @@ class IMAP(object):
_inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute')
- default_quota = self.auth.domain_default_quota(primary_domain)
+ default_quota = auth.domain_default_quota(primary_domain)
if default_quota == "":
default_quota = 0
if len(users) == 0:
- users = self.auth.list_users(primary_domain)
+ users = auth.list_users(primary_domain)
for user in users:
quota = None
@@ -222,7 +258,7 @@ class IMAP(object):
elif type(user[_quota_attr]) == str:
quota = user[_quota_attr]
else:
- _quota = self.auth.get_user_attribute(primary_domain, user, _quota_attr)
+ _quota = auth.get_user_attribute(primary_domain, user, _quota_attr)
if _quota == None:
quota = 0
else:
@@ -236,7 +272,7 @@ class IMAP(object):
elif type(user[_inbox_folder_attr]) == str:
folder = "user/%s" % user[_inbox_folder_attr]
elif type(user) == str:
- quota = self.auth.get_user_attribute(user, 'quota')
+ quota = auth.get_user_attribute(user, 'quota')
folder = "user/%s" %(user)
try:
@@ -262,24 +298,11 @@ class IMAP(object):
if not int(new_quota) == int(quota):
log.info(_("Adjusting authentication database quota for folder %s to %d") %(folder,int(new_quota)))
quota = int(new_quota)
- self.auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota)
+ auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota)
if not int(current_quota) == int(quota):
#log.info(_("Correcting quota for %s to %s (currently %s)") %(folder, quota, current_quota))
- log.debug(_("Checking actual backend server for folder %s through annotations") %(folder), level=8)
- annotations = self.imap.getannotation(folder, "/vendor/cmu/cyrus-imapd/server")
- server = annotations[folder]['/vendor/cmu/cyrus-imapd/server']
- log.debug(_("Server for INBOX folder %s is %s") %(folder,server))
- import cyruslib
- _imap = cyruslib.IMAP4(server, 143)
- admin_login = conf.get('cyrus-imap', 'admin_login')
- admin_password = conf.get('cyrus-imap', 'admin_password')
- _imap.login(admin_login, admin_password)
-
- log.info(_("Correcting quota for %s to %s (currently %s)") %(folder, quota, current_quota))
- _imap.setquota(folder, quota)
-
- del _imap
+ self.imap._setquota(folder, quota)
def expunge_user_folders(self, inbox_folders=None):
"""
@@ -299,7 +322,7 @@ class IMAP(object):
primary_domain, secondary_domains
"""
- self._connect()
+ self.connect()
if inbox_folders == None:
inbox_folders = []
@@ -316,14 +339,17 @@ class IMAP(object):
self.imap.dm("user/%s" %(folder))
except:
log.info(_("Folder has no corresponding user (2): %s") %(folder))
- self.imap.dm("user/%s" %(folder))
+ try:
+ self.imap.dm("user/%s" %(folder))
+ except:
+ pass
def list_user_folders(self, primary_domain=None, secondary_domains=[]):
"""
List the INBOX folders in the IMAP backend. Returns a list of unique
base folder names.
"""
- self._connect()
+ self.connect()
_folders = self.imap.lm("user/%")
# TODO: Replace the .* below with a regex representing acceptable DNS
@@ -353,9 +379,9 @@ class IMAP(object):
#if acceptable:
#folder_name = "%s@%s" %(folder.split(self.seperator)[1].split('@')[0],folder.split('@')[1])
- folder_name = "%s@%s" %(folder.split(self.seperator)[1].split('@')[0],folder.split('@')[1])
+ folder_name = "%s@%s" %(folder.split(self.imap.seperator)[1].split('@')[0],folder.split('@')[1])
else:
- folder_name = "%s" %(folder.split(self.seperator)[1])
+ folder_name = "%s" %(folder.split(self.imap.seperator)[1])
if not folder_name == None:
if not folder_name in folders:
@@ -366,7 +392,7 @@ class IMAP(object):
return folders
def synchronize(self, users=[], primary_domain=None, secondary_domains=[]):
- self._connect()
+ self.connect()
self.users = users
self.move_user_folders(users)
@@ -377,16 +403,8 @@ class IMAP(object):
return folders
- def getannotation(self, *args, **kw):
- self._connect()
- return self.imap.getannotation(*args, **kw)
-
def lm(self, *args, **kw):
return self.imap.lm(*args, **kw)
def undelete(self, *args, **kw):
- from pykolab.imap.cyrus import Cyrus
- imap = Cyrus()
- result = imap.undelete(*args, **kw)
- del imap
- return result
+ self.imap.undelete(*args, **kw)
diff --git a/pykolab/imap/cyrus.py b/pykolab/imap/cyrus.py
index bd56ab4..90ceeb0 100644
--- a/pykolab/imap/cyrus.py
+++ b/pykolab/imap/cyrus.py
@@ -20,16 +20,18 @@
import cyruslib
import time
-import pykolab
+from urlparse import urlparse
-from pykolab.imap import IMAP
+import pykolab
from pykolab.translate import _
log = pykolab.getLogger('pykolab.imap.cyrus')
conf = pykolab.getConf()
-class Cyrus(object):
+imap = pykolab.imap
+
+class Cyrus(cyruslib.CYRUS):
"""
Abstraction class for some common actions to do exclusively in Cyrus.
@@ -39,69 +41,109 @@ class Cyrus(object):
- Setting quota
- Renaming the top-level mailbox
- Setting annotations
+
"""
- def __init__(self, imap=None):
- self.imap = imap
- if self.imap == None:
- self.imap = IMAP()
+ setquota = cyruslib.CYRUS.sq
- def setquota(self, mailbox, quota):
+ def __init__(self, uri):
"""
- Login to the actual backend server.
+ Initialize this class, but do not connect yet.
+ """
+ result = urlparse(uri)
+
+ # Complete the uri with the result to avoid cyruslib from bailing out.
+ if not hasattr(result,'port'):
+ if result.scheme == 'imap':
+ port = 143
+ else:
+ port = 993
+ else:
+ port = result.port
+
+ self.uri = "%s://%s:%s" %(result.scheme,result.hostname,port)
+
+ cyruslib.CYRUS.__init__(self, self.uri)
+
+ # Initialize our variables
+ self.seperator = self.SEP
+
+ # Placeholder for known mailboxes on known servers
+ self.mbox = {}
+
+ def __del__(self):
+ cyruslib.CYRUS.__del__(self)
+
+ def login(self, *args, **kw):
"""
+ Login to the Cyrus IMAP server through cyruslib.CYRUS, but set our
+ hierarchy seperator.
+ """
+ cyruslib.CYRUS.login(self, *args, **kw)
+ self.seperator = self.SEP
+
+ def find_mailbox_server(self, mailbox):
+ annotations = {}
log.debug(_("Checking actual backend server for folder %s through annotations") %(mailbox), level=8)
- annotations = self.imap.getannotation(mailbox, "/vendor/cmu/cyrus-imapd/server")
+ if self.mbox.has_key(mailbox):
+ return self.mbox[mailbox]
+
+ max_tries = 20
+ num_try = 0
+ while 1:
+ num_try += 1
+ annotations = self.getannotation(mailbox, "/vendor/cmu/cyrus-imapd/server")
+
+ if annotations.has_key(mailbox):
+ break
+
+ if max_tries <= num_try:
+ log.error(_("Could not get the annotations after %s tries.") %(num_try))
+ break
+
+ log.warning(_("No annotations for %s: %r") %(mailbox,annotations))
+
+ time.sleep(1)
+
server = annotations[mailbox]['/vendor/cmu/cyrus-imapd/server']
+ self.mbox[mailbox] = server
+
log.debug(_("Server for INBOX folder %s is %s") %(mailbox,server), level=8)
- _imap = cyruslib.IMAP4(server, 143)
- admin_login = conf.get('cyrus-imap', 'admin_login')
- admin_password = conf.get('cyrus-imap', 'admin_password')
- _imap.login(admin_login, admin_password)
+ return server
- log.debug(_("Setting quota for INBOX folder %s to %s") %(mailbox,quota), level=8)
- _imap.setquota(mailbox, quota)
+ def _setquota(self, mailbox, quota):
+ """
+ Login to the actual backend server.
+ """
+ server = self.find_mailbox_server(mailbox)
+ imap.connect('imap://%s:143' %(server))
- del _imap
+ log.debug(_("Setting quota for INBOX folder %s to %s") %(mailbox,quota), level=8)
+ try:
+ imap.setquota(mailbox, quota)
+ except:
+ log.error(_("Could not set quota for mailbox %s") %(mailbox))
- def rename(self, from_mailbox, to_mailbox, partition=None):
+ def _rename(self, from_mailbox, to_mailbox, partition=None):
"""
Login to the actual backend server, then rename.
"""
- log.debug(_("Checking actual backend server for folder %s through annotations") %(from_mailbox), level=8)
- annotations = self.imap.getannotation(from_mailbox, "/vendor/cmu/cyrus-imapd/server")
- server = annotations[from_mailbox]['/vendor/cmu/cyrus-imapd/server']
- log.debug(_("Server for INBOX folder %s is %s") %(from_mailbox,server), level=8)
-
- _imap = cyruslib.IMAP4(server, 143)
- admin_login = conf.get('cyrus-imap', 'admin_login')
- admin_password = conf.get('cyrus-imap', 'admin_password')
- _imap.login(admin_login, admin_password)
+ server = self.find_mailbox_server(from_mailbox)
+ imap.connect('imap://%s:143' %(server))
- log.debug(_("Moving INBOX folder %s to %s") %(from_mailbox,from_mailbox), level=8)
- _imap.rename(from_mailbox, from_mailbox, partition)
+ log.debug(_("Moving INBOX folder %s to %s") %(from_mailbox,to_mailbox), level=8)
+ imap.rename(from_mailbox, to_mailbox, partition)
- del _imap
-
- def setannotation(self, mailbox, annotation, value):
+ def _setannotation(self, mailbox, annotation, value):
"""
Login to the actual backend server, then set annotation.
"""
- log.debug(_("Checking actual backend server for folder %s through annotations") %(mailbox), level=8)
- annotations = self.imap.getannotation(mailbox, "/vendor/cmu/cyrus-imapd/server")
- server = annotations[mailbox]['/vendor/cmu/cyrus-imapd/server']
- log.debug(_("Server for INBOX folder %s is %s") %(mailbox,server), level=8)
-
- _imap = cyruslib.IMAP4(server, 143)
- admin_login = conf.get('cyrus-imap', 'admin_login')
- admin_password = conf.get('cyrus-imap', 'admin_password')
- _imap.login(admin_login, admin_password)
+ server = self.find_mailbox_server(mailbox)
+ imap.connect('imap://%s:143' %(server))
log.debug(_("Setting annotation %s on folder %s") %(annotation,mailbox), level=8)
- _imap.setannotation(mailbox, annotation, value)
-
- del _imap
+ imap.setannotation(mailbox, annotation, value)
def undelete(self, mailbox, to_mailbox=None, recursive=True):
"""
@@ -123,19 +165,9 @@ class Cyrus(object):
'to_mailbox' may be the target folder to "undelete" the deleted
folder to. If not specified, the original folder name is used.
"""
+ server = self.find_mailbox_server(mailbox)
- log.debug(_("Checking actual backend server for folder %s through annotations") %(mailbox), level=8)
- annotations = self.imap.getannotation(mailbox, "/vendor/cmu/cyrus-imapd/server")
- server = annotations[mailbox]['/vendor/cmu/cyrus-imapd/server']
- log.debug(_("Server for deleted folder %s is %s") %(mailbox,server), level=8)
-
- _imap = cyruslib.IMAP4(server, 143)
- admin_login = conf.get('cyrus-imap', 'admin_login')
- admin_password = conf.get('cyrus-imap', 'admin_password')
- _imap.login(admin_login, admin_password)
-
- # Get the seperator used
- self.seperator = _imap.getsep()
+ imap.connect('imap://%s:143' %(server))
# Placeholder for folders we have recovered already.
target_folders = []
@@ -174,7 +206,7 @@ class Cyrus(object):
target_folder = "%s@%s" %(target_folder,target_mbox['domain'])
log.info(_("Undeleting %s to %s") %(undelete_folder,target_folder))
- _imap.rename(undelete_folder,target_folder)
+ self.rename(undelete_folder,target_folder)
def parse_mailbox(self, mailbox):
"""
@@ -250,6 +282,7 @@ class Cyrus(object):
TODO: It finds virtdomain folders for non-virtdomain searches.
"""
deleted_folder_search = "%(deleted_prefix)s%(seperator)s%(mailbox)s%(seperator)s*" % {
+ # TODO: The prefix used is configurable
'deleted_prefix': "DELETED",
'mailbox': self.seperator.join(mbox['path_parts']),
'seperator': self.seperator,
@@ -260,16 +293,19 @@ class Cyrus(object):
folders = self.imap.lm(deleted_folder_search)
- #print "the deleted folders that i could find are:", folders
+ # The folders we have found at this stage include virtdomain folders.
+ #
+ # For example, having searched for user/userid, it will also find
+ # user/userid@example.org
+ #
+ # Here, we explicitely remove any virtdomain folders.
if mbox['domain'] == None:
- #print "removing the folders that are virtdomain folders"
_folders = []
for folder in folders:
if len(folder.split('@')) < 2:
_folders.append(folder)
- #print "remaining folders:", _folders
folders = _folders
return folders
diff --git a/pykolab/tests/__init__.py b/pykolab/tests/__init__.py
index 554b214..fba8133 100644
--- a/pykolab/tests/__init__.py
+++ b/pykolab/tests/__init__.py
@@ -23,16 +23,19 @@ import os
import random
import time
-from pykolab.conf import Conf
+import pykolab
+
from pykolab.constants import *
from pykolab.tests.constants import *
from pykolab.translate import _
+log = pykolab.getLogger('pykolab.tests')
+conf = pykolab.getConf()
+
class Tests(object):
def __init__(self):
- self.conf = Conf()
- test_group = self.conf.parser.add_option_group(_("Test Options"))
+ test_group = conf.add_cli_parser_option_group(_("Test Options"))
for item in TEST_ITEMS:
test_group.add_option( "--%s" %(item['name']),
@@ -48,7 +51,7 @@ class Tests(object):
help = _("Run tests in suite SUITE. Implies a certain set of items being tested."),
metavar = "SUITE")
- delivery_group = self.conf.parser.add_option_group(_("Content Delivery Options"))
+ delivery_group = conf.add_cli_parser_option_group(_("Content Delivery Options"))
delivery_group.add_option( "--use-mail",
dest = "use_mail",
@@ -60,21 +63,21 @@ class Tests(object):
dest = "use_imap",
action = "store_true",
default = False,
- help = _("Inject messages containing the items through IMAP (requires imaplib)"))
+ help = _("Inject messages containing the items through IMAP"))
delivery_group.add_option( "--use-lmtp",
dest = "use_lmtp",
action = "store_true",
default = False,
- help = _("Deliver messages containing the items through LMTP (requires imaplib)"))
+ help = _("Deliver messages containing the items through LMTP"))
- self.conf.finalize_conf()
+ conf.finalize_conf()
def run(self):
# Execute the suites first.
- for suite in self.conf.test_suites:
+ for suite in conf.test_suites:
try:
exec("from pykolab.tests.%s import %sTest" %(suite,suite.capitalize()))
- exec("%stest = %sTest(self.conf)" %(suite,suite.capitalize()))
+ exec("%stest = %sTest()" %(suite,suite.capitalize()))
except ImportError, e:
- self.conf.log.error(_("Tests for suite %s failed to load. Aborting.") %(suite.capitalize()), recoverable=False)
+ conf.log.error(_("Tests for suite %s failed to load. Aborting.") %(suite.capitalize()), recoverable=False)
diff --git a/pykolab/tests/zpush/__init__.py b/pykolab/tests/zpush/__init__.py
index 6794725..62783f5 100644
--- a/pykolab/tests/zpush/__init__.py
+++ b/pykolab/tests/zpush/__init__.py
@@ -21,25 +21,22 @@ import imp
import os
import sys
-import pykolab.logger
+import pykolab
-from pykolab.conf import Conf
from pykolab.constants import *
from pykolab.tests.constants import *
+from pykolab.translate import _
-class ZpushTest(object):
- def __init__(self, conf=None):
- if conf:
- self.conf = conf
- else:
- self.conf = Conf()
- self.conf.finalize_conf()
+log = pykolab.getLogger('pykolab.tests.zpush')
+conf = pykolab.getConf()
+class ZpushTest(object):
+ def __init__(self):
self.tests = []
# Make sure we parse the [testing] section of the configuration file, if
# available.
- self.conf.set_options_from_testing_section()
+ conf.set_options_from_testing_section()
# Attempt to create a list of modules
for x in range(0,8):
@@ -52,7 +49,7 @@ class ZpushTest(object):
pass
for test in self.tests:
- exec("result = %s(self.conf)" %(test))
+ exec("result = %s()" %(test))
#name = "from pykolab.tests.zpush.test_%s import Test_%s" %(test_num,test_num)
#file, pathname, description = imp.find_module(name, sys.path)
diff --git a/pykolab/tests/zpush/test_000_000.py b/pykolab/tests/zpush/test_000_000.py
index e0969c2..415de0b 100644
--- a/pykolab/tests/zpush/test_000_000.py
+++ b/pykolab/tests/zpush/test_000_000.py
@@ -17,92 +17,48 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
-import cyruslib
+import pykolab
-from pykolab.conf import Conf
+from pykolab import utils
from pykolab.constants import *
from pykolab.tests.constants import *
from pykolab.translate import _
-TEST_FOLDERS = {
- 'Calendar': {
- 'annotations': {
- "/vendor/kolab/folder-test": "true",
- "/vendor/kolab/folder-type": "event.default"
- },
- 'acls': [
- ],
- },
-
- 'Contacts': {
- 'annotations': {
- "/vendor/kolab/folder-type": "contact.default"
- },
- 'acls': [
- ],
- },
-
- 'Journal': {
- 'annotations': {
- "/vendor/kolab/folder-test": "true",
- "/vendor/kolab/folder-type": "journal.default"
- },
- 'acls': [
- ],
- },
- 'Notes': {
- 'annotations': {
- "/vendor/kolab/folder-type": "note.default"
- },
- 'acls': [
- ],
- },
- 'Tasks': {
- 'annotations': {
- "/vendor/kolab/folder-type": "task.default"
- },
- 'acls': [
- ],
- },
- }
+log = pykolab.getLogger('pykolab.tests.zpush')
+conf = pykolab.getConf()
class Test_000_000(object):
"""
- Preparations for the Test 000 series
+ Preparations for the Test 000 series.
"""
def __init__(self, conf=None):
self.suite_num = "000"
self.suite_test_num = "000"
- self.log.info("About to execute preperation task #000 in Test Suite #000");
- self.log.info("We will assume the start situation has been configured");
- self.log.info("such as is described in the documentation");
+ log.info("About to execute preperation task #000 in Test Suite #000");
+ log.info("We will assume the start situation has been configured");
+ log.info("such as is described in the documentation.");
- if not conf:
- self.conf = Conf()
- self.conf.finalize_conf()
- else:
- self.conf = conf
-
- # Remove all mailboxes
- # FIXME: Should come from configuration file and/or prompts
- imap = cyruslib.CYRUS("imap://%s:143" %(self.conf.testing_server))
- imap.login(self.conf.testing_admin_login,self.conf.testing_admin_password)
+ utils.ask_confirmation("Continue?")
# Delete all mailboxes
- for user in self.conf.testing_users:
+ #imap.connect()
+ #for folder in imap.lm("user/%"):
+ #imap.dm(folder)
+
+ for user in auth.list_users(domain):
for mailbox in imap.lm("user%s%s" %(imap.SEP,"%(givenname)s@%(domain)s" %(user))):
- self.conf.log.debug(_("Deleting mailbox: %s") %(mailbox), level=3)
+ log.debug(_("Deleting mailbox: %s") %(mailbox), level=3)
try:
imap.dm(mailbox)
except cyruslib.CYRUSError, e:
pass
# Recreate the user top-level mailboxes
- for user in self.conf.testing_users:
+ for user in conf.testing_users:
mailbox = "user%s%s" %(imap.SEP,"%(givenname)s@%(domain)s" %(user))
- self.conf.log.debug(_("Creating mailbox: %s") %(mailbox), level=3)
+ log.debug(_("Creating mailbox: %s") %(mailbox), level=3)
imap.cm(mailbox)
imap.logout()
@@ -113,15 +69,15 @@ class Test_000_000(object):
# - create the standard folders
# - set the standard annotations
# - subscribe
- for user in self.conf.testing_users:
- imap = cyruslib.CYRUS("imap://%s:143" %(self.conf.testing_server))
+ for user in conf.testing_users:
+ imap = cyruslib.CYRUS("imap://%s:143" %(conf.testing_server))
try:
imap.login("%(givenname)s@%(domain)s" %(user), user['password'])
except:
- self.conf.log.error(_("Authentication failure for %s") %("%(givenname)s@%(domain)s" %(user)), recoverable=True)
+ log.error(_("Authentication failure for %s") %("%(givenname)s@%(domain)s" %(user)), recoverable=True)
continue
- if self.conf.debuglevel > 3:
+ if conf.debuglevel > 3:
imap.VERBOSE = True
imap.subscribe("INBOX")
diff --git a/pykolab/tests/zpush/test_000_001.py b/pykolab/tests/zpush/test_000_001.py
index c7bfbb6..204aba3 100644
--- a/pykolab/tests/zpush/test_000_001.py
+++ b/pykolab/tests/zpush/test_000_001.py
@@ -17,11 +17,16 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
-from pykolab.conf import Conf
+import pykolab
+
+from pykolab import utils
from pykolab.constants import *
from pykolab.tests.constants import *
from pykolab.translate import _
+log = pykolab.getLogger('pykolab.tests.zpush')
+conf = pykolab.getConf()
+
class Test_000_001(object):
"""
First, basic test.
@@ -34,12 +39,6 @@ class Test_000_001(object):
self.suite_num = "000"
self.suite_test_num = "001"
- if not conf:
- self.conf = Conf()
- self.conf.finalize_conf()
- else:
- self.conf = conf
-
# Create some test calendar items
for item in TEST_ITEMS:
try: