summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pykolab/cli/cmd_sync.py76
1 files changed, 69 insertions, 7 deletions
diff --git a/pykolab/cli/cmd_sync.py b/pykolab/cli/cmd_sync.py
index e4f4bdf..3e2191a 100644
--- a/pykolab/cli/cmd_sync.py
+++ b/pykolab/cli/cmd_sync.py
@@ -19,27 +19,50 @@
import commands
+from distutils import version
+import multiprocessing
+
+import sys
import time
+
import pykolab
+from pykolab import utils
from pykolab.auth import Auth
+from pykolab.imap import IMAP
from pykolab.translate import _
log = pykolab.getLogger('pykolab.cli')
conf = pykolab.getConf()
+imap = None
+pool = None
+
def __init__():
commands.register('sync', execute, description="Synchronize Kolab Users with IMAP.")
def cli_options():
my_option_group = conf.add_cli_parser_option_group(_("CLI Options"))
- my_option_group.add_option( '--resync',
- dest = "resync",
- action = "store_true",
- default = False,
- help = _("Resync from the beginning"))
+ my_option_group.add_option(
+ '--threads',
+ dest = "threads",
+ action = "store",
+ default = 20,
+ type = int,
+ help = _("Synchronize LDAP and IMAP")
+ )
+
+ my_option_group.add_option(
+ '--resync',
+ dest = "resync",
+ action = "store_true",
+ default = False,
+ help = _("Resync from the beginning")
+ )
def execute(*args, **kw):
+ global imap, pool
+
auth = Auth()
log.debug(_("Listing domains..."), level=5)
start_time = time.time()
@@ -53,17 +76,56 @@ def execute(*args, **kw):
level=8
)
- all_folders = []
+ if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"):
+ pool = multiprocessing.Pool(conf.threads, worker_process, (), 1)
+ else:
+ pool = multiprocessing.Pool(conf.threads, worker_process, ())
for primary_domain in list(set(domains.values())):
log.debug(_("Running for domain %s") % (primary_domain), level=8)
auth = Auth(primary_domain)
auth.connect(primary_domain)
start_time = time.time()
- auth.synchronize(mode='_paged_search')
+ auth.synchronize(mode='_paged_search', callback=queue_add)
end_time = time.time()
log.info(_("Synchronizing users for %s took %d seconds")
% (primary_domain, (end_time-start_time))
)
+ while not pool._taskqueue.empty():
+ time.sleep(1)
+
+def queue_add(*args, **kw):
+ global pool
+ for dn, entry in kw['entry']:
+ entry['dn'] = dn
+ pool.apply_async(_synchronize, (), dict(**entry))
+
+def worker_process(*args, **kw):
+ pass
+
+def _synchronize(*args, **kw):
+ log.info(_("Worker process %s handling %s") % (multiprocessing.current_process().name, kw['dn']))
+
+ entry = utils.normalize(entry)
+
+ if not entry.has_key('mail'):
+ return
+
+ if not 'kolabinetorgperson' in entry['objectclass']:
+ return
+
+ imap = IMAP()
+ imap.connect()
+
+ if not imap.user_mailbox_exists(entry['mail']):
+ if entry.has_key('mailhost'):
+ server = entry['mailhost']
+ else:
+ server = None
+
+ imap.user_mailbox_create(entry['mail'], server=server)
+
+ imap.disconnect()
+