summaryrefslogtreecommitdiffstats
path: root/cyruslib.py
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2011-01-05 12:38:37 +0100
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2011-01-05 12:38:37 +0100
commit6a35e5efa2ffd88ebcc69b1bcd3c4d4639082f30 (patch)
treeeafd6480db629f40f0dd28ecb21280a9ad14d7a7 /cyruslib.py
parent2179a1ff034888b263ec7d2c65550b55f1d2bf78 (diff)
downloadpykolab-6a35e5efa2ffd88ebcc69b1bcd3c4d4639082f30.tar.gz
Add cyruslib
Diffstat (limited to 'cyruslib.py')
-rw-r--r--cyruslib.py676
1 files changed, 676 insertions, 0 deletions
diff --git a/cyruslib.py b/cyruslib.py
new file mode 100644
index 0000000..8eaaff4
--- /dev/null
+++ b/cyruslib.py
@@ -0,0 +1,676 @@
+# -*- coding: utf-8 -*-
+#
+# Cyruslib v0.8.5-20090401
+# Copyright (C) 2007-2009 Reinaldo de Carvalho <reinaldoc@gmail.com>
+# Copyright (C) 2003-2006 Gianluigi Tiesi <sherpya@netfarm.it>
+# Copyright (C) 2003-2006 NetFarm S.r.l. [http://www.netfarm.it]
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+#
+# Requires python >= 2.3
+#
+
+__version__ = '0.8.5'
+__all__ = [ 'CYRUS' ]
+__doc__ = """Cyrus admin wrapper
+Adds cyrus-specific commands to imaplib IMAP4 Class
+and defines new CYRUS class for cyrus imapd commands
+
+"""
+
+from sys import exit, stdout
+
+try:
+ import imaplib
+ import re
+ from binascii import b2a_base64
+except ImportError, e:
+ print e
+ exit(1)
+
+Commands = {
+ 'RECONSTRUCT' : ('AUTH',),
+ 'DUMP' : ('AUTH',), # To check admin status
+ 'ID' : ('AUTH',), # Only one ID allowed in non auth mode
+ 'GETANNOTATION': ('AUTH',),
+ 'SETANNOTATION': ('AUTH',)
+ }
+
+imaplib.Commands.update(Commands)
+
+DEFAULT_SEP = '.'
+QUOTE = '"'
+DQUOTE = '""'
+
+re_ns = re.compile(r'.*\(\(\".*(\.|/)\"\)\).*')
+re_q0 = re.compile(r'(.*)\s\(\)')
+re_q = re.compile(r'(.*)\s\(STORAGE (\d+) (\d+)\)')
+re_mb = re.compile(r'\((.*)\)\s\".\"\s(.*)')
+re_url = re.compile(r'^(imaps?)://(.+?):?(\d{0,5})$')
+
+def ok(res):
+ return res.upper().startswith('OK')
+
+def quote(text, qchar=QUOTE):
+ return text.join([qchar, qchar])
+
+def unquote(text, qchar=QUOTE):
+ return ''.join(text.split(qchar))
+
+def getflags(test):
+ flags = []
+ for flag in test.split('\\'):
+ flag = flag.strip()
+ if len(flag): flags.append(flag)
+ return flags
+
+### A smart function to return an array of splitted strings
+### and honours quoted strings
+def splitquote(text):
+ data = text.split(QUOTE)
+ if len(data) == 1: # no quotes
+ res = data[0].split()
+ else:
+ res = []
+ for match in data:
+ if len(match.strip()) == 0: continue
+ if match[0] == ' ':
+ res = res + match.strip().split()
+ else:
+ res.append(match)
+ return res
+
+### return a dictionary from a cyrus info response
+def res2dict(data):
+ data = splitquote(data)
+ datalen = len(data)
+ if datalen % 2: # Unmatched pair
+ return False, {}
+ res = {}
+ for i in range(0, datalen, 2):
+ res[data[i]] = data[i+1]
+ return True, res
+
+
+class CYRUSError(Exception): pass
+
+class IMAP4(imaplib.IMAP4):
+ def getsep(self):
+ """Get mailbox separator"""
+ ### yes, ugly but cyradm does it in the same way
+ ### also more realable then calling NAMESPACE
+ ### and it should be also compatibile with other servers
+ try:
+ return unquote(self.list(DQUOTE, DQUOTE)[1][0]).split()[1]
+ except:
+ return DEFAULT_SEP
+
+ def isadmin(self):
+ ### A trick to check if the user is admin or not
+ ### normal users cannot use dump command
+ try:
+ res, msg = self._simple_command('DUMP', 'NIL')
+ if msg[0].lower().find('denied') == -1:
+ return True
+ except:
+ pass
+ return False
+
+ def id(self):
+ try:
+ typ, dat = self._simple_command('ID', 'NIL')
+ res, dat = self._untagged_response(typ, dat, 'ID')
+ except:
+ return False, dat[0]
+ return ok(res), dat[0]
+
+ def getannotation(self, mailbox, pattern='*'):
+ typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('value.shared'))
+ return self._untagged_response(typ, dat, 'ANNOTATION')
+
+ def setannotation(self, mailbox, desc, value):
+ if value:
+ value = quote(value)
+ else:
+ value = "NIL"
+ typ, dat = self._simple_command('SETANNOTATION', mailbox, quote(desc), "(%s %s)" % (quote('value.shared'), value) )
+ return self._untagged_response(typ, dat, 'ANNOTATION')
+
+ def setquota(self, mailbox, limit):
+ """Set quota of a mailbox"""
+ if limit == 0:
+ quota = '()'
+ else:
+ quota = '(STORAGE %s)' % limit
+ return self._simple_command('SETQUOTA', mailbox, quota)
+
+ ### Overridden to support partition
+ ### Pychecker will complain about non matching signature
+ def create(self, mailbox, partition=None):
+ """Create a mailbox, partition is optional"""
+ if partition is not None:
+ return self._simple_command('CREATE', mailbox, partition)
+ else:
+ return self._simple_command('CREATE', mailbox)
+
+ ### Overridden to support partition
+ ### Pychecker: same here
+ def rename(self, from_mailbox, to_mailbox, partition=None):
+ """Rename a from_mailbox to to_mailbox, partition is optional"""
+ if partition is not None:
+ return self._simple_command('RENAME', from_mailbox, to_mailbox, partition)
+ else:
+ return self._simple_command('RENAME', from_mailbox, to_mailbox)
+
+ def reconstruct(self, mailbox):
+ return self._simple_command('RECONSTRUCT', mailbox)
+
+class IMAP4_SSL(imaplib.IMAP4_SSL):
+ def getsep(self):
+ """Get mailbox separator"""
+ ### yes, ugly but cyradm does it in the same way
+ ### also more realable then calling NAMESPACE
+ ### and it should be also compatibile with other servers
+ try:
+ return unquote(self.list(DQUOTE, DQUOTE)[1][0]).split()[1]
+ except:
+ return DEFAULT_SEP
+
+ def isadmin(self):
+ ### A trick to check if the user is admin or not
+ ### normal users cannot use dump command
+ try:
+ res, msg = self._simple_command('DUMP', 'NIL')
+ if msg[0].lower().find('denied') == -1:
+ return True
+ except:
+ pass
+ return False
+
+ def id(self):
+ try:
+ typ, dat = self._simple_command('ID', 'NIL')
+ res, dat = self._untagged_response(typ, dat, 'ID')
+ except:
+ return False, dat[0]
+ return ok(res), dat[0]
+
+ def getannotation(self, mailbox, pattern='*'):
+ typ, dat = self._simple_command('GETANNOTATION', mailbox, quote(pattern), quote('value.shared'))
+ return self._untagged_response(typ, dat, 'ANNOTATION')
+
+ def setannotation(self, mailbox, desc, value):
+ if value:
+ value = quote(value)
+ else:
+ value = "NIL"
+ typ, dat = self._simple_command('SETANNOTATION', mailbox, quote(desc), "(%s %s)" % (quote('value.shared'), value) )
+ return self._untagged_response(typ, dat, 'ANNOTATION')
+
+ def setquota(self, mailbox, limit):
+ """Set quota of a mailbox"""
+ if limit == 0:
+ quota = '()'
+ else:
+ quota = '(STORAGE %s)' % limit
+ return self._simple_command('SETQUOTA', mailbox, quota)
+
+ ### Overridden to support partition
+ ### Pychecker will complain about non matching signature
+ def create(self, mailbox, partition=None):
+ """Create a mailbox, partition is optional"""
+ if partition is not None:
+ return self._simple_command('CREATE', mailbox, partition)
+ else:
+ return self._simple_command('CREATE', mailbox)
+
+ ### Overridden to support partition
+ ### Pychecker: same here
+ def rename(self, from_mailbox, to_mailbox, partition=None):
+ """Rename a from_mailbox to to_mailbox, partition is optional"""
+ if partition is not None:
+ return self._simple_command('RENAME', from_mailbox, to_mailbox, partition)
+ else:
+ return self._simple_command('RENAME', from_mailbox, to_mailbox)
+
+ def reconstruct(self, mailbox):
+ return self._simple_command('RECONSTRUCT', mailbox)
+
+ def login_plain(self, admin, password, asUser):
+ if asUser:
+ encoded = b2a_base64("%s\0%s\0%s" % (asUser, admin, password)).strip()
+ else:
+ encoded = b2a_base64("%s\0%s\0%s" % (admin, admin, password)).strip()
+
+ res, data = self._simple_command('AUTHENTICATE', 'PLAIN', encoded)
+ if ok(res):
+ self.state = 'AUTH'
+ return res, data
+
+
+class CYRUS:
+ ERROR = {}
+ ERROR["CONNECT"] = [0, "Connection error"]
+ ERROR["INVALID_URL"] = [1, "Invalid URL"]
+ ERROR["ENCODING"] = [3, "Invalid encondig"]
+ ERROR["MBXNULL"] = [5, "Mailbox is Null"]
+ ERROR["NOAUTH"] = [7, "Connection is not authenticated"]
+ ERROR["LOGIN"] = [10, "User or password is wrong"]
+ ERROR["ADMIN"] = [11, "User is not cyrus administrator"]
+ ERROR["AUTH"] = [12, "Connection already authenticated"]
+ ERROR["LOGINPLAIN"] = [15, "Encryption needed to use mechanism"]
+ ERROR["LOGIN_PLAIN"] = [16, "User or password is wrong"]
+ ERROR["CREATE"] = [20, "Unable create mailbox"]
+ ERROR["DELETE"] = [25, "Unable delete mailbox"]
+ ERROR["GETACL"] = [30, "Unable parse GETACL result"]
+ ERROR["SETQUOTA"] = [40, "Invalid integer argument"]
+ ERROR["GETQUOTA"] = [45, "Quota root does not exist"]
+ ERROR["RENAME"] = [50, "Unable rename mailbox"]
+ ERROR["RECONSTRUCT"] = [60, "Unable reconstruct mailbox"]
+ ERROR["SUBSCRIBE"] = [70, "User is cyrus administrator, normal user required"]
+ ERROR["UNSUBSCRIBE"] = [75, "User is cyrus administrator, normal user required"]
+ ERROR["LSUB"] = [77, "User is cyrus administrator, normal user required"]
+ ERROR["UNKCMD"] = [98, "Command not implemented"]
+ ERROR["IMAPLIB"] = [99, "Generic imaplib error"]
+
+ ENCODING_LIST = ['imap', 'utf-8', 'iso-8859-1']
+
+ def __init__(self, url = 'imap://localhost:143'):
+ self.VERBOSE = False
+ self.AUTH = False
+ self.ADMIN = None
+ self.AUSER = None
+ self.ADMINACL = 'c'
+ self.SEP = DEFAULT_SEP
+ self.ENCODING = 'imap'
+ self.LOGFD = stdout
+ match = re_url.match(url)
+ if match:
+ host = match.group(2)
+ if match.group(3):
+ port = int(match.group(3))
+ else:
+ port = 143
+ else:
+ self.__doraise("INVALID_URL")
+ try:
+ if match.group(1) == 'imap':
+ self.ssl = False
+ self.m = IMAP4(host, port)
+ else:
+ self.ssl = True
+ self.m = IMAP4_SSL(host, port)
+ except:
+ self.__doraise("CONNECT")
+
+ def __del__(self):
+ if self.AUTH:
+ self.logout()
+
+ def __verbose(self, msg):
+ if self.VERBOSE:
+ print >> self.LOGFD, msg
+
+ def __doexception(self, function, msg=None, *args):
+ if msg is None:
+ try:
+ msg = self.ERROR.get(function.upper())[1]
+ except:
+ msg = self.ERROR.get("IMAPLIB")[1]
+ value = ""
+ for arg in args:
+ if arg is not None:
+ value = "%s %s" % (value, arg)
+
+ self.__verbose( '[%s%s] %s: %s' % (function.upper(), value, "BAD", msg) )
+ self.__doraise( function.upper(), msg )
+
+ def __doraise(self, mode, msg=None):
+ idError = self.ERROR.get(mode)
+ if idError:
+ if msg is None:
+ msg = idError[1]
+ else:
+ idError = [self.ERROR.get("IMAPLIB")[0]]
+ raise CYRUSError( idError[0], mode, msg )
+
+
+ def __prepare(self, command, mailbox=True):
+ if not self.AUTH:
+ self.__doexception(command, self.ERROR.get("NOAUTH")[1])
+ elif not mailbox:
+ self.__doexception(command, self.ERROR.get("MBXNULL")[1])
+
+ def __docommand(self, function, *args):
+ wrapped = getattr(self.m, function, None)
+ if wrapped is None:
+ raise self.__doraise("UNKCMD")
+ try:
+ res, msg = wrapped(*args)
+ if ok(res):
+ return res, msg
+ except Exception, info:
+ error = info.args[0].split(':').pop().strip()
+ if error.upper().startswith('BAD'):
+ error = error.split('BAD', 1).pop().strip()
+ error = unquote(error[1:-1], '\'')
+ self.__doexception(function, error, *args)
+ self.__doexception(function, msg[0], *args)
+
+ def id(self):
+ self.__prepare('id')
+ res, data = self.m.id()
+ data = data.strip()
+ if not res or (len(data) < 3): return False, {}
+ data = data[1:-1] # Strip ()
+ res, rdata = res2dict(data)
+ if not res:
+ self.__verbose( '[ID] Umatched pairs in result' )
+ return res, rdata
+
+ def login(self, username, password):
+ if self.AUTH:
+ self.__doexception("LOGIN", self.ERROR.get("AUTH")[1])
+ try:
+ res, msg = self.m.login(username, password)
+ admin = self.m.isadmin()
+ except Exception, info:
+ error = info.args[0].split(':').pop().strip()
+ self.__doexception("LOGIN", error)
+
+ if admin:
+ self.ADMIN = username
+
+ self.SEP = self.m.getsep()
+ self.AUTH = True
+ self.__verbose( '[LOGIN %s] %s: %s' % (username, res, msg[0]) )
+
+ def login_plain(self, username, password, asUser = None):
+ if self.AUTH:
+ self.__doexception("LOGINPLAIN", self.ERROR.get("AUTH")[1])
+ if not self.ssl:
+ self.__doexception("LOGINPLAIN", self.ERROR.get("LOGINPLAIN")[1])
+ res, msg = self.__docommand("login_plain", username, password, asUser)
+ self.__verbose( '[AUTHENTICATE PLAIN %s] %s: %s' % (username, res, msg[0]) )
+
+ if ok(res):
+ if asUser is None:
+ if self.m.isadmin():
+ self.ADMIN = admin
+ else:
+ self.ADMIN = asUser
+ self.AUSER = asUser
+ self.SEP = self.m.getsep()
+ self.AUTH = True
+
+ def logout(self):
+ try:
+ res, msg = self.m.logout()
+ except Exception, info:
+ error = info.args[0].split(':').pop().strip()
+ self.__doexception("LOGOUT", error)
+ self.AUTH = False
+ self.ADMIN = None
+ self.AUSER = None
+ self.__verbose( '[LOGOUT] %s: %s' % (res, msg[0]) )
+
+ def getEncoding(self):
+ """Get current input/ouput codification"""
+ return self.ENCODING
+
+ def setEncoding(self, enc = None):
+ """Set current input/ouput codification"""
+ if enc is None:
+ self.ENCODING = 'imap'
+ elif enc in self.ENCODING_LIST:
+ self.ENCODING = enc
+ else:
+ raise self.__doraise("ENCODING")
+
+ def __encode(self, text):
+ if re.search("&", text):
+ text = re.sub("/", "+AC8-", text)
+ text = re.sub("&", "+", text)
+ text = unicode(text, 'utf-7').encode(self.ENCODING)
+ return text
+
+ def encode(self, text):
+ if self.ENCODING == 'imap':
+ return text
+ elif self.ENCODING in self.ENCODING_LIST:
+ return self.__encode(text)
+
+ def __decode(self, text):
+ text = re.sub("/", "-&", text)
+ text = re.sub(" ", "-@", text)
+ text = unicode(text, self.ENCODING).encode('utf-7')
+ text = re.sub("-@", " ", text)
+ text = re.sub("-&", "/", text)
+ text = re.sub("\+", "&", text)
+ return text
+
+ def decode(self, text):
+ if self.ENCODING == 'imap':
+ return text
+ elif self.ENCODING in self.ENCODING_LIST:
+ return self.__decode(text)
+
+ def lm(self, pattern="*"):
+ """
+ List mailboxes, returns dict with list of mailboxes
+
+ To list all mailboxes lm()
+ To list users top mailboxes lm("user/%")
+ To list all users mailboxes lm("user/*")
+ To list users mailboxes startwith a word lm("user/word*")
+ To list global top folders lm("%")
+ To list global startwith a word unsupported by server
+ suggestion lm("word*")
+
+ """
+ self.__prepare('LIST')
+ if pattern == '': pattern = "*"
+ if pattern == '%':
+ res, ml = self.__docommand('list', '', '%')
+ else:
+ res, ml = self.__docommand('list', '*', self.decode(pattern))
+
+ if not ok(res):
+ self.__verbose( '[LIST] %s: %s' % (res, ml) )
+ return []
+
+ if (len(ml) == 1) and ml[0] is None:
+ self.__verbose( '[LIST] No results' )
+ return []
+
+ mb = []
+ for mailbox in ml:
+ res = re_mb.match(mailbox)
+ if res is None: continue
+ mbe = unquote(res.group(2))
+ if 'Noselect' in getflags(res.group(1)): continue
+ mb.append(self.encode(mbe))
+ return mb
+
+ def cm(self, mailbox, partition=None):
+ """Create mailbox"""
+ self.__prepare('CREATE', mailbox)
+ res, msg = self.__docommand('create', self.decode(mailbox), partition)
+ self.__verbose( '[CREATE %s partition=%s] %s: %s' % (mailbox, partition, res, msg[0]) )
+
+ def __dm(self, mailbox):
+ if not mailbox:
+ return True
+ self.__docommand("setacl", self.decode(mailbox), self.ADMIN, self.ADMINACL)
+ res, msg = self.__docommand("delete", self.decode(mailbox))
+ self.__verbose( '[DELETE %s] %s: %s' % (mailbox, res, msg[0]) )
+
+ def dm(self, mailbox, recursive=True):
+ """Delete mailbox"""
+ self.__prepare('DELETE', mailbox)
+ mbxTmp = mailbox.split(self.SEP)
+ # Cyrus is not recursive for user subfolders and global folders
+ if (recursive and mbxTmp[0] != "user") or (len(mbxTmp) > 2):
+ mbxList = self.lm("%s%s*" % (mailbox, self.SEP))
+ mbxList.reverse()
+ for mbox in mbxList:
+ self.__dm(mbox)
+ self.__dm(mailbox)
+
+ def rename(self, fromMbx, toMbx, partition=None):
+ """Rename or change partition"""
+ self.__prepare('RENAME', fromMbx)
+ # Rename is recursive! Amen!
+ res, msg = self.__docommand("rename", self.decode(fromMbx), self.decode(toMbx), partition)
+ self.__verbose( '[RENAME %s %s] %s: %s' % (fromMbx, toMbx, res, msg[0]) )
+
+ def lam(self, mailbox):
+ """List ACLs"""
+ self.__prepare('GETACL', mailbox)
+ res, acl = self.__docommand("getacl", self.decode(mailbox))
+ acls = {}
+ aclList = splitquote(acl.pop().strip())
+ del aclList[0] # mailbox
+ for i in range(0, len(aclList), 2):
+ try:
+ userid = self.encode(aclList[i])
+ rights = aclList[i + 1]
+ except Exception, info:
+ self.__verbose( '[GETACL %s] BAD: %s' % (mailbox, info.args[0]) )
+ raise self.__doraise("GETACL")
+ self.__verbose( '[GETACL %s] %s %s' % (mailbox, userid, rights) )
+ acls[userid] = rights
+ return acls
+
+ def sam(self, mailbox, userid, rights):
+ """Set ACL"""
+ self.__prepare('SETACL', mailbox)
+ res, msg = self.__docommand("setacl", self.decode(mailbox), userid, rights)
+ self.__verbose( '[SETACL %s %s %s] %s: %s' % (mailbox, userid, rights, res, msg[0]) )
+
+ def lq(self, mailbox):
+ """List Quota"""
+ self.__prepare('GETQUOTA', mailbox)
+ res, msg = self.__docommand("getquota", self.decode(mailbox))
+ match = re_q0.match(msg[0])
+ if match:
+ self.__verbose( '[GETQUOTA %s] QUOTA (Unlimited)' % mailbox )
+ return 0, 0
+ match = re_q.match(msg[0])
+ if match is None:
+ self.__verbose( '[GETQUOTA %s] BAD: RegExp not matched, please report' % mailbox )
+ return 0, 0
+ try:
+ used = int(match.group(2))
+ quota = int(match.group(3))
+ self.__verbose( '[GETQUOTA %s] %s: QUOTA (%d/%d)' % (mailbox, res, used, quota) )
+ return used, quota
+ except:
+ self.__verbose( '[GETQUOTA %s] BAD: Error while parsing results' % mailbox )
+ return 0, 0
+
+ def sq(self, mailbox, limit):
+ """Set Quota"""
+ self.__prepare('SETQUOTA', mailbox)
+ try:
+ limit = int(limit)
+ except ValueError, e:
+ self.__verbose( '[SETQUOTA %s] BAD: %s %s' % (mailbox, self.ERROR.get("SETQUOTA")[1], limit) )
+ raise self.__doraise("SETQUOTA")
+ res, msg = self.__docommand("setquota", self.decode(mailbox), limit)
+ self.__verbose( '[SETQUOTA %s %s] %s: %s' % (mailbox, limit, res, msg[0]) )
+
+ def getannotation(self, mailbox, pattern='*'):
+ """Get Annotation"""
+ self.__prepare('GETANNOTATION')
+ res, data = self.__docommand('getannotation', self.decode(mailbox), pattern)
+ if (len(data) == 1) and data[0] is None:
+ self.__verbose( '[GETANNOTATION %s] No results' % (mailbox) )
+ return {}
+ ann = {}
+ for annotation in data:
+ annotation = annotation.split('"')
+ if len(annotation) != 9:
+ self.__verbose( '[GETANNOTATION] Invalid annotation entry' )
+ continue
+ mbx = self.encode(annotation[1])
+ key = annotation[3]
+ value = annotation[7]
+ self.__verbose( '[GETANNOTATION %s] %s: %s' % (mbx, key, value) )
+ if not ann.has_key(mbx):
+ ann[mbx] = {}
+ if not ann[mbx].has_key(key):
+ ann[mbx][key] = value
+ return ann
+
+ def setannotation(self, mailbox, annotation, value):
+ """Set Annotation"""
+ self.__prepare('SETANNOTATION')
+ res, msg = self.__docommand("setannotation", self.decode(mailbox), annotation, value)
+ self.__verbose( '[SETANNOTATION %s] %s: %s' % (mailbox, res, msg[0]) )
+
+ def __reconstruct(self, mailbox):
+ if not mailbox:
+ return True
+ res, msg = self.__docommand("reconstruct", self.decode(mailbox))
+ self.__verbose( '[RECONSTRUCT %s] %s: %s' % (mailbox, res, msg[0]) )
+
+ def reconstruct(self, mailbox, recursive=True):
+ """Reconstruct"""
+ self.__prepare('RECONSTRUCT', mailbox)
+ # Cyrus is not recursive for remote reconstruct
+ if recursive:
+ mbxList = self.lm("%s%s*" % (mailbox, self.SEP))
+ mbxList.reverse()
+ for mbox in mbxList:
+ self.__reconstruct(mbox)
+ self.__reconstruct(mailbox)
+
+ def lsub(self, pattern="*"):
+ if self.AUSER is None:
+ self.__doexception("lsub")
+ self.__prepare('LSUB')
+ if pattern == '': pattern = "*"
+ res, ml = self.__docommand('lsub', '*', pattern)
+
+ if not ok(res):
+ self.__verbose( '[LIST] %s: %s' % (res, ml) )
+ return []
+
+ if (len(ml) == 1) and ml[0] is None:
+ self.__verbose( '[LIST] No results' )
+ return []
+
+ mb = []
+ for mailbox in ml:
+ res = re_mb.match(mailbox)
+ if res is None: continue
+ mbe = unquote(res.group(2))
+ if 'Noselect' in getflags(res.group(1)): continue
+ mb.append(self.encode(mbe))
+ return mb
+
+ def subscribe(self, mailbox):
+ """Subscribe"""
+ self.__prepare('SUBSCRIBE')
+ res, msg = self.__docommand("subscribe", self.decode(mailbox))
+ self.__verbose( '[SUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) )
+
+ def unsubscribe(self, mailbox):
+ """Unsubscribe"""
+ self.__prepare('UNSUBSCRIBE')
+ res, msg = self.__docommand("unsubscribe", self.decode(mailbox))
+ self.__verbose( '[UNSUBSCRIBE %s] %s: %s' % (mailbox, res, msg[0]) )
+