summaryrefslogtreecommitdiffstats
path: root/pykolab
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2019-10-25 13:16:55 +0200
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2019-10-25 13:16:55 +0200
commit28d9e703ea31a070a05589a2bd82b79bc7e6bed4 (patch)
treee7967f619425b94109c6c046075025a6f98aea0f /pykolab
parent334366d52a91de1309df5fc0ce89a57d9b23d5b1 (diff)
downloadpykolab-28d9e703ea31a070a05589a2bd82b79bc7e6bed4.tar.gz
More linting and syntax issues resolved
Diffstat (limited to 'pykolab')
-rw-r--r--pykolab/auth/ldap/__init__.py30
-rw-r--r--pykolab/auth/ldap/auth_cache.py88
-rw-r--r--pykolab/auth/ldap/cache.py147
-rw-r--r--pykolab/base.py7
-rw-r--r--pykolab/imap/__init__.py187
-rw-r--r--pykolab/logger.py126
-rw-r--r--pykolab/utils.py270
7 files changed, 455 insertions, 400 deletions
diff --git a/pykolab/auth/ldap/__init__.py b/pykolab/auth/ldap/__init__.py
index df35cd1..c414118 100644
--- a/pykolab/auth/ldap/__init__.py
+++ b/pykolab/auth/ldap/__init__.py
@@ -20,14 +20,16 @@ from __future__ import print_function
import datetime
# Catch python-ldap-2.4 changes
from distutils import version
-import _ldap
-import ldap
-import ldap.controls
-import ldap.filter
import logging
import time
import traceback
+import ldap
+import ldap.controls
+import ldap.filter
+
+import _ldap
+
import pykolab
import pykolab.base
@@ -39,13 +41,18 @@ from pykolab.translate import _
import auth_cache
import cache
+# pylint: disable=invalid-name
log = pykolab.getLogger('pykolab.auth')
conf = pykolab.getConf()
if version.StrictVersion('2.4.0') <= version.StrictVersion(ldap.__version__):
LDAP_CONTROL_PAGED_RESULTS = ldap.CONTROL_PAGEDRESULTS
else:
- LDAP_CONTROL_PAGED_RESULTS = ldap.LDAP_CONTROL_PAGE_OID
+ if hasattr(ldap, 'LDAP_CONTROL_PAGE_OID'):
+ # pylint: disable=no-member
+ LDAP_CONTROL_PAGED_RESULTS = ldap.LDAP_CONTROL_PAGE_OID
+ else:
+ LDAP_CONTROL_PAGED_RESULTS = ldap.controls.SimplePagedResultsControl.controlType
try:
from ldap.controls import psearch
@@ -80,8 +87,8 @@ class SimplePagedResultsControl(ldap.controls.SimplePagedResultsControl):
def cookie(self):
if version.StrictVersion('2.4.0') <= version.StrictVersion(ldap.__version__):
return self.cookie
- else:
- return self.controlValue[1]
+
+ return self.controlValue[1]
def size(self):
if version.StrictVersion('2.4.0') <= version.StrictVersion(ldap.__version__):
@@ -237,7 +244,7 @@ class LDAP(pykolab.base.Base):
except ldap.SERVER_DOWN as errmsg:
log.error(_("LDAP server unavailable: %r") % (errmsg))
- log.error(_("%s") % (traceback.format_exc()))
+ log.error(traceback.format_exc())
self._disconnect()
return False
@@ -250,7 +257,7 @@ class LDAP(pykolab.base.Base):
except Exception as errmsg:
log.error(_("Exception occurred: %r") % (errmsg))
- log.error(_("%s") % (traceback.format_exc()))
+ log.error(traceback.format_exc())
self._disconnect()
return False
@@ -1336,7 +1343,7 @@ class LDAP(pykolab.base.Base):
)
import traceback
- log.error("%s" % (traceback.format_exc()))
+ log.error(traceback.format_exc())
def synchronize(self, mode=0, callback=None):
"""
@@ -3063,8 +3070,7 @@ class LDAP(pykolab.base.Base):
scope=scope,
filterstr=filterstr,
attrlist=attrlist,
- attrsonly=attrsonly,
- timeout=timeout
+ attrsonly=attrsonly
)
_results = []
diff --git a/pykolab/auth/ldap/auth_cache.py b/pykolab/auth/ldap/auth_cache.py
index 781f7b9..af65e40 100644
--- a/pykolab/auth/ldap/auth_cache.py
+++ b/pykolab/auth/ldap/auth_cache.py
@@ -25,30 +25,18 @@ from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
-from sqlalchemy import Table
from sqlalchemy import Text
-from sqlalchemy import desc
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import mapper
-try:
- from sqlalchemy.orm import relationship
-except:
- from sqlalchemy.orm import relation as relationship
-
-try:
- from sqlalchemy.orm import sessionmaker
-except:
- from sqlalchemy.orm import create_session
+from sqlalchemy.orm import sessionmaker
import pykolab
-from pykolab import utils
from pykolab.constants import KOLAB_LIB_PATH
-from pykolab.translate import _
+# pylint: disable=invalid-name
conf = pykolab.getConf()
log = pykolab.getLogger('pykolab.auth_cache')
@@ -56,12 +44,19 @@ metadata = MetaData()
db = None
-##
-## Classes
-##
+try:
+ unicode('')
+except NameError:
+ unicode = str
+
+#
+# Classes
+#
DeclarativeBase = declarative_base()
+
+# pylint: disable=too-few-public-methods
class Entry(DeclarativeBase):
__tablename__ = 'entries'
@@ -78,38 +73,47 @@ class Entry(DeclarativeBase):
else:
self.value = value
-##
-## Functions
-##
+#
+# Functions
+#
+
def del_entry(key):
+ # pylint: disable=global-statement
+ global db
+
db = init_db()
try:
- _entries = db.query(Entry).filter_by(key=key).delete()
- except sqlalchemy.exc.OperationalError, errmsg:
+ db.query(Entry).filter_by(key=key).delete()
+ except sqlalchemy.exc.OperationalError:
db = init_db(reinit=True)
- except sqlalchemy.exc.InvalidRequest, errmsg:
+ except sqlalchemy.exc.InvalidRequest:
db = init_db(reinit=True)
finally:
- _entries = db.query(Entry).filter_by(key=key).delete()
+ db.query(Entry).filter_by(key=key).delete()
db.commit()
+
def get_entry(key):
+ # pylint: disable=global-statement
+ global db
+
db = init_db()
try:
_entries = db.query(Entry).filter_by(key=key).all()
- except sqlalchemy.exc.OperationalError, errmsg:
+ except sqlalchemy.exc.OperationalError:
db = init_db(reinit=True)
- except sqlalchemy.exc.InvalidRequest, errmsg:
+ except sqlalchemy.exc.InvalidRequest:
db = init_db(reinit=True)
finally:
_entries = db.query(Entry).filter_by(key=key).all()
- if len(_entries) == 0:
+ if _entries:
return None
+
if len(_entries) > 1:
return None
@@ -118,26 +122,23 @@ def get_entry(key):
return _entries[0].value.encode('utf-8', 'latin1')
+
def set_entry(key, value):
db = init_db()
+
try:
_entries = db.query(Entry).filter_by(key=key).all()
- except sqlalchemy.exc.OperationalError, errmsg:
+ except sqlalchemy.exc.OperationalError:
db = init_db(reinit=True)
- except sqlalchemy.exc.InvalidRequest, errmsg:
+ except sqlalchemy.exc.InvalidRequest:
db = init_db(reinit=True)
finally:
_entries = db.query(Entry).filter_by(key=key).all()
- if len(_entries) == 0:
- db.add(
- Entry(
- key,
- value
- )
- )
-
+ if not _entries:
+ db.add(Entry(key, value))
db.commit()
+
elif len(_entries) == 1:
if not isinstance(value, unicode):
value = unicode(value, 'utf-8')
@@ -148,21 +149,28 @@ def set_entry(key, value):
_entries[0].last_change = datetime.datetime.now()
db.commit()
+
def purge_entries(db):
- db.query(Entry).filter(Entry.last_change <= (datetime.datetime.now() - datetime.timedelta(1))).delete()
+ db.query(Entry).filter(
+ Entry.last_change <= (datetime.datetime.now() - datetime.timedelta(1))
+ ).delete()
+
db.commit()
+
def init_db(reinit=False):
"""
Returns a SQLAlchemy Session() instance.
"""
+ # pylint: disable=global-statement
global db
- if not db == None and not reinit:
+ if db is not None and not reinit:
return db
db_uri = conf.get('ldap', 'auth_cache_uri')
- if db_uri == None:
+
+ if db_uri is None:
db_uri = 'sqlite:///%s/auth_cache.db' % (KOLAB_LIB_PATH)
if reinit:
diff --git a/pykolab/auth/ldap/cache.py b/pykolab/auth/ldap/cache.py
index 23b4e07..02da350 100644
--- a/pykolab/auth/ldap/cache.py
+++ b/pykolab/auth/ldap/cache.py
@@ -18,6 +18,8 @@
import datetime
+from uuid import UUID
+
import sqlalchemy
from sqlalchemy import Column
@@ -31,24 +33,14 @@ from sqlalchemy import desc
from sqlalchemy import create_engine
from sqlalchemy.orm import mapper
-from uuid import UUID
-
-try:
- from sqlalchemy.orm import relationship
-except:
- from sqlalchemy.orm import relation as relationship
-
-try:
- from sqlalchemy.orm import sessionmaker
-except:
- from sqlalchemy.orm import create_session
+from sqlalchemy.orm import sessionmaker
import pykolab
-from pykolab import utils
from pykolab.constants import KOLAB_LIB_PATH
from pykolab.translate import _
+# pylint: disable=invalid-name
conf = pykolab.getConf()
log = pykolab.getLogger('pykolab.auth_cache')
@@ -56,11 +48,15 @@ metadata = MetaData()
db = {}
-##
-## Classes
-##
+#
+# Classes
+#
+
+
+# pylint: disable=too-few-public-methods
+class Entry:
+ last_change = None
-class Entry(object):
def __init__(self, uniqueid, result_attr, last_change):
self.uniqueid = uniqueid
self.result_attribute = result_attr
@@ -72,83 +68,82 @@ class Entry(object):
).replace('%%', '%')
self.last_change = datetime.datetime.strptime(
- last_change,
+ last_change,
modifytimestamp_format
)
-##
-## Tables
-##
+#
+# Tables
+#
+
entry_table = Table(
- 'entry', metadata,
- Column('id', Integer, primary_key=True),
- Column('uniqueid', String(128), nullable=False),
- Column('result_attribute', String(128), nullable=False),
- Column('last_change', DateTime),
- )
+ 'entry', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('uniqueid', String(128), nullable=False),
+ Column('result_attribute', String(128), nullable=False),
+ Column('last_change', DateTime),
+)
-##
-## Table <-> Class Mappers
-##
+#
+# Table <-> Class Mappers
+#
mapper(Entry, entry_table)
-##
-## Functions
-##
+#
+# Functions
+#
+
def delete_entry(domain, entry):
- result_attribute = conf.get_raw('cyrus-sasl', 'result_attribute')
+ _db = init_db(domain)
+ _entry = _db.query(Entry).filter_by(uniqueid=entry['id']).first()
- db = init_db(domain)
- _entry = db.query(Entry).filter_by(uniqueid=entry['id']).first()
+ if _entry is not None:
+ _db.delete(_entry)
+ _db.commit()
- if not _entry == None:
- db.delete(_entry)
- db.commit()
def get_entry(domain, entry, update=True):
result_attribute = conf.get_raw('cyrus-sasl', 'result_attribute')
_entry = None
- db = init_db(domain)
+ _db = init_db(domain)
try:
_uniqueid = str(UUID(bytes_le=entry['id']))
- log.debug(_("Entry uniqueid was converted from binary form to string: %s") % _uniqueid, level=8)
+ log.debug(
+ _("Entry uniqueid was converted from binary form to string: %s") % _uniqueid,
+ level=8
+ )
+
except ValueError:
_uniqueid = entry['id']
try:
- _entry = db.query(Entry).filter_by(uniqueid=_uniqueid).first()
- except sqlalchemy.exc.OperationalError, errmsg:
- db = init_db(domain,reinit=True)
- except sqlalchemy.exc.InvalidRequestError, errmsg:
- db = init_db(domain,reinit=True)
+ _entry = _db.query(Entry).filter_by(uniqueid=_uniqueid).first()
+ except sqlalchemy.exc.OperationalError:
+ _db = init_db(domain, reinit=True)
+ except sqlalchemy.exc.InvalidRequestError:
+ _db = init_db(domain, reinit=True)
finally:
- _entry = db.query(Entry).filter_by(uniqueid=_uniqueid).first()
+ _entry = _db.query(Entry).filter_by(uniqueid=_uniqueid).first()
if not update:
return _entry
- if _entry == None:
+ if _entry is None:
log.debug(_("Inserting cache entry %r") % (_uniqueid), level=8)
- if not entry.has_key(result_attribute):
+ if result_attribute not in entry:
entry[result_attribute] = ''
- db.add(
- Entry(
- _uniqueid,
- entry[result_attribute],
- entry['modifytimestamp']
- )
- )
+ _db.add(Entry(_uniqueid, entry[result_attribute], entry['modifytimestamp']))
- db.commit()
- _entry = db.query(Entry).filter_by(uniqueid=_uniqueid).first()
+ _db.commit()
+ _entry = _db.query(Entry).filter_by(uniqueid=_uniqueid).first()
else:
modifytimestamp_format = conf.get_raw(
'ldap',
@@ -158,24 +153,30 @@ def get_entry(domain, entry, update=True):
if not _entry.last_change.strftime(modifytimestamp_format) == entry['modifytimestamp']:
log.debug(_("Updating timestamp for cache entry %r") % (_uniqueid), level=8)
- last_change = datetime.datetime.strptime(entry['modifytimestamp'], modifytimestamp_format)
+ last_change = datetime.datetime.strptime(
+ entry['modifytimestamp'],
+ modifytimestamp_format
+ )
+
_entry.last_change = last_change
- db.commit()
- _entry = db.query(Entry).filter_by(uniqueid=_uniqueid).first()
+ _db.commit()
+ _entry = _db.query(Entry).filter_by(uniqueid=_uniqueid).first()
- if entry.has_key(result_attribute):
+ if result_attribute in entry:
if not _entry.result_attribute == entry[result_attribute]:
log.debug(_("Updating result_attribute for cache entry %r") % (_uniqueid), level=8)
_entry.result_attribute = entry[result_attribute]
- db.commit()
- _entry = db.query(Entry).filter_by(uniqueid=_uniqueid).first()
+ _db.commit()
+ _entry = _db.query(Entry).filter_by(uniqueid=_uniqueid).first()
return _entry
-def init_db(domain,reinit=False):
+
+def init_db(domain, reinit=False):
"""
Returns a SQLAlchemy Session() instance.
"""
+ # pylint: disable=global-statement
global db
if domain in db and not reinit:
@@ -192,7 +193,7 @@ def init_db(domain,reinit=False):
try:
engine = create_engine(db_uri, echo=echo)
metadata.create_all(engine)
- except:
+ except Exception:
engine = create_engine('sqlite://')
metadata.create_all(engine)
@@ -201,6 +202,7 @@ def init_db(domain,reinit=False):
return db[domain]
+
def last_modify_timestamp(domain):
modifytimestamp_format = conf.get_raw(
'ldap',
@@ -209,12 +211,13 @@ def last_modify_timestamp(domain):
).replace('%%', '%')
try:
- db = init_db(domain)
- last_change = db.query(Entry).order_by(desc(Entry.last_change)).first()
+ _db = init_db(domain)
+ last_change = _db.query(Entry).order_by(desc(Entry.last_change)).first()
- if not last_change == None:
+ if last_change is not None:
return last_change.last_change.strftime(modifytimestamp_format)
- else:
- return datetime.datetime(1900, 01, 01, 00, 00, 00).strftime(modifytimestamp_format)
- except:
- return datetime.datetime(1900, 01, 01, 00, 00, 00).strftime(modifytimestamp_format)
+
+ return datetime.datetime(1900, 1, 1, 00, 00, 00).strftime(modifytimestamp_format)
+
+ except Exception:
+ return datetime.datetime(1900, 1, 1, 00, 00, 00).strftime(modifytimestamp_format)
diff --git a/pykolab/base.py b/pykolab/base.py
index ae9ab25..961d431 100644
--- a/pykolab/base.py
+++ b/pykolab/base.py
@@ -19,14 +19,16 @@
import pykolab
from pykolab.imap import IMAP
+# pylint: disable=invalid-name
conf = pykolab.getConf()
-class Base(object):
+
+class Base:
"""
Abstraction class for functions commonly shared between auth, imap, etc.
"""
def __init__(self, *args, **kw):
- if kw.has_key('domain') and not kw['domain'] == None:
+ if 'domain' in kw and kw['domain'] is not None:
self.domain = kw['domain']
else:
self.domain = conf.get('kolab', 'primary_domain')
@@ -94,4 +96,3 @@ class Base(object):
return conf.get_raw('kolab', key1)
return default
-
diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py
index 205deaa..3fc72d3 100644
--- a/pykolab/imap/__init__.py
+++ b/pykolab/imap/__init__.py
@@ -54,20 +54,16 @@ class IMAP(object):
if len(aci_subject.split('@')) > 1:
lm_suffix = "@%s" % (aci_subject.split('@')[1])
- shared_folders = self.imap.lm(
- "shared/*%s" % (lm_suffix)
- )
+ shared_folders = self.imap.lm("shared/*%s" % (lm_suffix))
- user_folders = self.imap.lm(
- "user/*%s" % (lm_suffix)
- )
+ user_folders = self.imap.lm("user/*%s" % (lm_suffix))
log.debug(
- _("Cleaning up ACL entries referring to identifier %s") % (
- aci_subject
- ),
- level=5
- )
+ _("Cleaning up ACL entries referring to identifier %s") % (
+ aci_subject
+ ),
+ level=5
+ )
# For all folders (shared and user), ...
folders = user_folders + shared_folders
@@ -128,12 +124,12 @@ class IMAP(object):
# deployment.
backend = conf.get('kolab', 'imap_backend')
- if not domain == None:
+ if domain is not None:
self.domain = domain
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
backend = conf.get(domain, 'imap_backend')
- if uri == None:
+ if uri is None:
if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'):
uri = conf.get(domain, 'imap_uri')
else:
@@ -143,7 +139,7 @@ class IMAP(object):
hostname = None
port = None
- if uri == None:
+ if uri is None:
uri = conf.get(backend, 'uri')
result = urlparse(uri)
@@ -162,13 +158,13 @@ class IMAP(object):
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
- if not server == None:
+ if server is not None:
hostname = server
- if scheme == None or scheme == "":
+ if scheme is None or scheme == "":
scheme = 'imaps'
- if port == None:
+ if port is None:
if scheme == "imaps":
port = 993
elif scheme == "imap":
@@ -182,10 +178,10 @@ class IMAP(object):
admin_login = conf.get(backend, 'admin_login')
admin_password = conf.get(backend, 'admin_password')
- if admin_password == None or admin_password == '':
+ if admin_password is None or admin_password == '':
log.error(_("No administrator password is available."))
- if not self._imap.has_key(hostname):
+ if hostname not in self._imap:
if backend == 'cyrus-imap':
import cyrus
self._imap[hostname] = cyrus.Cyrus(uri)
@@ -216,19 +212,25 @@ class IMAP(object):
else:
if not login:
self.disconnect(hostname)
- self.connect(uri=uri,login=False)
- elif login and not hasattr(self._imap[hostname],'logged_in'):
+ self.connect(uri=uri, login=False)
+ elif login and not hasattr(self._imap[hostname], 'logged_in'):
self.disconnect(hostname)
self.connect(uri=uri)
else:
try:
if hasattr(self._imap[hostname], 'm'):
self._imap[hostname].m.noop()
- elif hasattr(self._imap[hostname], 'noop') and callable(self._imap[hostname].noop):
+ elif hasattr(self._imap[hostname], 'noop') \
+ and callable(self._imap[hostname].noop):
+
self._imap[hostname].noop()
- log.debug(_("Reusing existing IMAP server connection to %s") % (hostname), level=8)
- except:
+ log.debug(
+ _("Reusing existing IMAP server connection to %s") % (hostname),
+ level=8
+ )
+
+ except Exception:
log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8)
self.disconnect(hostname)
self.connect()
@@ -243,7 +245,7 @@ class IMAP(object):
self._set_socket_keepalive(self.imap.sock)
def disconnect(self, server=None):
- if server == None:
+ if server is None:
# No server specified, but make sure self.imap is None anyways
if hasattr(self, 'imap'):
del self.imap
@@ -253,33 +255,30 @@ class IMAP(object):
del self._imap[key]
else:
- if self._imap.has_key(server):
+ if server in self._imap:
del self._imap[server]
else:
- log.warning(_("Called imap.disconnect() on a server that we had no connection to."))
+ log.warning(
+ _("Called imap.disconnect() on a server that we had no connection to.")
+ )
def create_folder(self, folder_path, server=None, partition=None):
folder_path = self.folder_utf7(folder_path)
- if not server == None:
+ if server is not None:
self.connect(server=server)
try:
self._imap[server].cm(folder_path, partition=partition)
return True
- except:
- log.error(
- _("Could not create folder %r on server %r") % (
- folder_path,
- server
- )
- )
+ except Exception:
+ log.error(_("Could not create folder %r on server %r") % (folder_path, server))
else:
try:
self.imap.cm(folder_path, partition=partition)
return True
- except:
+ except Exception:
log.error(_("Could not create folder %r") % (folder_path))
return False
@@ -290,9 +289,9 @@ class IMAP(object):
if hasattr(self.imap.m, name):
return getattr(self.imap.m, name)
else:
- raise AttributeError, _("%r has no attribute %s") % (self,name)
+ raise AttributeError(_("%r has no attribute %s") % (self, name))
else:
- raise AttributeError, _("%r has no attribute %s") % (self,name)
+ raise AttributeError(_("%r has no attribute %s") % (self, name))
def folder_utf7(self, folder):
from pykolab import imap_utf7
@@ -313,13 +312,13 @@ class IMAP(object):
_metadata = self.imap.getannotation(self.folder_utf7(folder), '*')
- for (k,v) in _metadata.items():
+ for (k, v) in _metadata.items():
metadata[self.folder_utf8(k)] = v
return metadata
def get_separator(self):
- if not hasattr(self, 'imap') or self.imap == None:
+ if not hasattr(self, 'imap') or self.imap is None:
self.connect()
if hasattr(self.imap, 'separator'):
@@ -357,13 +356,13 @@ class IMAP(object):
if len(_namespaces) >= 3:
_shared = []
- _shared.append(' '.join(_namespaces[2].replace('((','').replace('))','').split()[:-1]).replace('"', ''))
+ _shared.append(' '.join(_namespaces[2].replace('((', '').replace('))', '').split()[:-1]).replace('"', ''))
if len(_namespaces) >= 2:
- _other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '')
+ _other_users = ' '.join(_namespaces[1].replace('((', '').replace('))', '').split()[:-1]).replace('"', '')
if len(_namespaces) >= 1:
- _personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '')
+ _personal = _namespaces[0].replace('((', '').replace('))', '').split()[0].replace('"', '')
return (_personal, _other_users, _shared)
@@ -385,7 +384,7 @@ class IMAP(object):
'write': 'lrswite',
}
- if short_rights.has_key(acl):
+ if acl in short_rights:
acl = short_rights[acl]
else:
for char in acl:
@@ -428,7 +427,7 @@ class IMAP(object):
try:
self.imap.sam(self.folder_utf7(folder), identifier, acl)
- except Exception, errmsg:
+ except Exception as errmsg:
log.error(
_("Could not set ACL for %s on folder %s: %r") % (
identifier,
@@ -544,10 +543,10 @@ class IMAP(object):
if not hasattr(self, 'domain'):
self.domain = None
- if self.domain == None and len(mailbox_base_name.split('@')) > 1:
+ if self.domain is None and len(mailbox_base_name.split('@')) > 1:
self.domain = mailbox_base_name.split('@')[1]
- if not self.domain == None:
+ if not self.domain is None:
if conf.has_option(self.domain, "autocreate_folders"):
_additional_folders = conf.get_raw(
self.domain,
@@ -564,7 +563,7 @@ class IMAP(object):
auth.disconnect()
if len(domains.keys()) > 0:
- if domains.has_key(self.domain):
+ if self.domain in domains:
primary = domains[self.domain]
if conf.has_option(primary, "autocreate_folders"):
@@ -573,7 +572,7 @@ class IMAP(object):
"autocreate_folders"
)
- if _additional_folders == None:
+ if _additional_folders is None:
if conf.has_option('kolab', "autocreate_folders"):
_additional_folders = conf.get_raw(
'kolab',
@@ -588,13 +587,13 @@ class IMAP(object):
}
)
- if not additional_folders == None:
+ if additional_folders is not None:
self.user_mailbox_create_additional_folders(
mailbox_base_name,
additional_folders
)
- if not self.domain == None:
+ if not self.domain is None:
if conf.has_option(self.domain, "sieve_mgmt"):
sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt')
if utils.true_or_false(sieve_mgmt_enabled):
@@ -634,7 +633,7 @@ class IMAP(object):
self.login_plain(admin_login, admin_password, user)
(personal, other, shared) = self.namespaces()
success = True
- except Exception, errmsg:
+ except Exception as errmsg:
if time.time() - last_log > 5 and self.imap_murder():
log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg))
last_log = time.time()
@@ -661,7 +660,7 @@ class IMAP(object):
log.warning(_("Failed to create folder: %s") % (folder_name))
continue
- if additional_folders[additional_folder].has_key("annotations"):
+ if "annotations" in additional_folders[additional_folder]:
for annotation in additional_folders[additional_folder]["annotations"].keys():
self.set_metadata(
folder_name,
@@ -669,7 +668,7 @@ class IMAP(object):
"%s" % (additional_folders[additional_folder]["annotations"][annotation])
)
- if additional_folders[additional_folder].has_key("acls"):
+ if "acls" in additional_folders[additional_folder]:
for acl in additional_folders[additional_folder]["acls"].keys():
self.set_acl(
folder_name,
@@ -686,7 +685,7 @@ class IMAP(object):
domain = None
domain_suffix = ""
- if not domain == None:
+ if domain is not None:
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
backend = conf.get(domain, 'imap_backend')
@@ -702,10 +701,10 @@ class IMAP(object):
# Subscribe only to personal folders
(personal, other, shared) = self.namespaces()
- if not other == None:
+ if other is not None:
_tests.append(other)
- if not shared == None:
+ if shared is not None:
for _shared in shared:
_tests.append(_shared)
@@ -729,7 +728,7 @@ class IMAP(object):
log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8)
try:
self.subscribe(_folder)
- except Exception, errmsg:
+ except Exception as errmsg:
log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg))
self.logout()
@@ -749,16 +748,16 @@ class IMAP(object):
domain_suffix
)
- if additional_folders[additional_folder].has_key("quota"):
+ if "quota" in additional_folders[additional_folder]:
try:
self.imap.sq(
folder_name,
additional_folders[additional_folder]['quota']
)
- except Exception, errmsg:
+ except Exception as errmsg:
log.error(_("Could not set quota on %s") % (additional_folder))
- if additional_folders[additional_folder].has_key("partition"):
+ if "partition" in additional_folders[additional_folder]:
partition = additional_folders[additional_folder]["partition"]
try:
self.imap._rename(folder_name, folder_name, partition)
@@ -815,7 +814,7 @@ class IMAP(object):
"""
self.connect()
- folder = "user%s%s" %(self.get_separator(),mailbox_base_name)
+ folder = "user%s%s" %(self.get_separator(), mailbox_base_name)
self.delete_mailfolder(folder)
self.cleanup_acls(mailbox_base_name)
@@ -837,23 +836,23 @@ class IMAP(object):
def user_mailbox_rename(self, old_name, new_name, partition=None):
self.connect()
- old_name = "user%s%s" % (self.get_separator(),old_name)
- new_name = "user%s%s" % (self.get_separator(),new_name)
+ old_name = "user%s%s" % (self.get_separator(), old_name)
+ new_name = "user%s%s" % (self.get_separator(), new_name)
- if old_name == new_name and partition == None:
+ if old_name == new_name and partition is None:
return
if not self.has_folder(old_name):
log.error(_("INBOX folder to rename (%s) does not exist") % (old_name))
- if not self.has_folder(new_name) or not partition == None:
- log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name))
+ if not self.has_folder(new_name) or not partition is None:
+ log.info(_("Renaming INBOX from %s to %s") % (old_name, new_name))
try:
- self.imap.rename(old_name,new_name,partition)
+ self.imap.rename(old_name, new_name, partition)
except:
- log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name))
+ log.error(_("Could not rename INBOX folder %s to %s") % (old_name, new_name))
else:
- log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name))
+ log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name, new_name))
def user_mailbox_server(self, mailbox):
server = self.imap.find_mailfolder_server(mailbox.lower()).lower()
@@ -865,7 +864,7 @@ class IMAP(object):
Check if the environment has a folder named folder.
"""
folders = self.imap.lm(self.folder_utf7(folder))
- log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8)
+ log.debug(_("Looking for folder '%s', we found folders: %r") % (folder, [self.folder_utf8(x) for x in folders]), level=8)
# Greater then one, this folder may have subfolders.
if len(folders) > 0:
return True
@@ -924,7 +923,7 @@ class IMAP(object):
if epoch > (int)(time.time()):
log.debug(
_("Setting ACL rights %s for subject %s on folder " + \
- "%s") % (rights,subject,folder), level=8)
+ "%s") % (rights, subject, folder), level=8)
self.set_acl(
folder,
@@ -935,7 +934,7 @@ class IMAP(object):
else:
log.debug(
_("Removing ACL rights %s for subject %s on folder " + \
- "%s") % (rights,subject,folder), level=8)
+ "%s") % (rights, subject, folder), level=8)
self.set_acl(
folder,
@@ -959,7 +958,7 @@ class IMAP(object):
def move_user_folders(self, users=[], domain=None):
for user in users:
if type(user) == dict:
- if user.has_key('old_mail'):
+ if 'old_mail' in user:
inbox = "user/%s" % (user['mail'])
old_inbox = "user/%s" % (user['old_mail'])
@@ -967,11 +966,11 @@ class IMAP(object):
log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8)
if not self.has_folder(inbox):
- log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox))
- self.imap.rename(old_inbox,inbox)
+ log.info(_("Renaming INBOX from %s to %s") % (old_inbox, inbox))
+ self.imap.rename(old_inbox, inbox)
self.inbox_folders.append(inbox)
else:
- log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox))
+ log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox, inbox))
else:
log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8)
else:
@@ -1004,7 +1003,7 @@ class IMAP(object):
default_quota = auth.domain_default_quota(primary_domain)
- if default_quota == "" or default_quota == None:
+ if default_quota == "" or default_quota is None:
default_quota = 0
if len(users) == 0:
@@ -1014,19 +1013,19 @@ class IMAP(object):
quota = None
if type(user) == dict:
- if user.has_key(_quota_attr):
+ if _quota_attr in user:
if type(user[_quota_attr]) == list:
quota = user[_quota_attr].pop(0)
elif type(user[_quota_attr]) == str:
quota = user[_quota_attr]
else:
_quota = auth.get_user_attribute(primary_domain, user, _quota_attr)
- if _quota == None:
+ if _quota is None:
quota = 0
else:
quota = _quota
- if not user.has_key(_inbox_folder_attr):
+ if _inbox_folder_attr not in user:
continue
else:
if type(user[_inbox_folder_attr]) == list:
@@ -1040,7 +1039,7 @@ class IMAP(object):
folder = folder.lower()
try:
- (used,current_quota) = self.imap.lq(folder)
+ (used, current_quota) = self.imap.lq(folder)
except:
# TODO: Go in fact correct the quota.
log.warning(_("Cannot get current IMAP quota for folder %s") % (folder))
@@ -1058,11 +1057,11 @@ class IMAP(object):
log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7)
- if new_quota == None:
+ if new_quota is None:
continue
if not int(new_quota) == int(quota):
- log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota)))
+ log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder, int(new_quota)))
quota = int(new_quota)
auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota)
@@ -1086,7 +1085,7 @@ class IMAP(object):
mailhost = None
if type(user) == dict:
- if user.has_key(_mailserver_attr):
+ if _mailserver_attr in user:
if type(user[_mailserver_attr]) == list:
_mailserver = user[_mailserver_attr].pop(0)
elif type(user[_mailserver_attr]) == str:
@@ -1094,7 +1093,7 @@ class IMAP(object):
else:
_mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr)
- if not user.has_key(_inbox_folder_attr):
+ if _inbox_folder_attr not in user:
continue
else:
if type(user[_inbox_folder_attr]) == list:
@@ -1110,7 +1109,7 @@ class IMAP(object):
_current_mailserver = self.imap.find_mailfolder_server(folder)
- if not _mailserver == None:
+ if _mailserver is not None:
# TODO:
if not _current_mailserver == _mailserver:
self.imap._xfer(folder, _current_mailserver, _mailserver)
@@ -1138,7 +1137,7 @@ class IMAP(object):
primary_domain, secondary_domains
"""
- if inbox_folders == None:
+ if inbox_folders is None:
inbox_folders = []
folders = self.list_user_folders()
@@ -1165,7 +1164,7 @@ class IMAP(object):
mbox_parts = self.parse_mailfolder(mailfolder_path)
- if mbox_parts == None:
+ if mbox_parts is None:
# We got user identifier only
log.error(_("Please don't give us just a user identifier"))
return
@@ -1204,7 +1203,7 @@ class IMAP(object):
acceptable_domain_name_res = []
- if not primary_domain == None:
+ if primary_domain is not None:
for domain in [ primary_domain ] + secondary_domains:
acceptable_domain_name_res.append(domain_re % (domain))
@@ -1221,16 +1220,16 @@ class IMAP(object):
#print "Acceptable indeed"
#acceptable = True
#if not acceptable:
- #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1],folder,domain_name_re)
+ #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1], folder, domain_name_re)
#if acceptable:
- #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0],folder.split('@')[1])
+ #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0], folder.split('@')[1])
- folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0],folder.split('@')[1])
+ folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0], folder.split('@')[1])
else:
folder_name = "%s" % (folder.split(self.get_separator())[1])
- if not folder_name == None:
+ if folder_name is not None:
if not folder_name in folders:
folders.append(folder_name)
diff --git a/pykolab/logger.py b/pykolab/logger.py
index 356734b..1a826d9 100644
--- a/pykolab/logger.py
+++ b/pykolab/logger.py
@@ -16,6 +16,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
import grp
import logging
@@ -23,11 +24,9 @@ import logging.handlers
import os
import pwd
import sys
-import time
-from pykolab.translate import _
-class StderrToLogger(object):
+class StderrToLogger:
"""
Fake file-like stream object that redirects writes to a logger instance.
"""
@@ -36,7 +35,7 @@ class StderrToLogger(object):
self.log_level = log_level
self.linebuf = ''
self.skip_next = False
-
+
def write(self, buf):
# ugly patch to make smtplib and smtpd debug logging records appear on one line in log file
# smtplib uses "print>>stderr, var, var" statements for debug logging. These
@@ -59,10 +58,11 @@ class StderrToLogger(object):
else:
self.logger.log(self.log_level, '%s %s', self.linebuf, line.rstrip()[:150])
self.linebuf = ''
-
- def flush(self):
+
+ def flush(self):
pass
+
class LoggerAdapter(logging.LoggerAdapter):
"""
Custom LoggingAdapter to log Wallace mail message Queue ID
@@ -71,6 +71,7 @@ class LoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return '%s %s' % (self.extra['qid'], msg), kwargs
+
class Logger(logging.Logger):
"""
The PyKolab version of a logger.
@@ -88,31 +89,31 @@ class Logger(logging.Logger):
for arg in sys.argv:
if debuglevel == -1:
try:
- debuglevel = int(arg)
- except ValueError, errmsg:
+ debuglevel = (int)(arg)
+ except ValueError:
continue
loglevel = logging.DEBUG
break
- if '-d' == arg:
+ if arg == '-d':
debuglevel = -1
continue
- if '-l' == arg:
+ if arg == '-l':
loglevel = -1
continue
- if '--fork' == arg:
+ if arg == '--fork':
fork = True
if loglevel == -1:
- if hasattr(logging,arg.upper()):
- loglevel = getattr(logging,arg.upper())
+ if hasattr(logging, arg.upper()):
+ loglevel = getattr(logging, arg.upper())
else:
loglevel = logging.DEBUG
- if '-u' == arg or '--user' == arg:
+ if arg in ['-u', '--user']:
process_username = -1
continue
@@ -122,7 +123,7 @@ class Logger(logging.Logger):
if process_username == -1:
process_username = arg
- if '-g' == arg or '--group' == arg:
+ if arg in ['-g', '--group']:
process_groupname = -1
continue
@@ -132,8 +133,11 @@ class Logger(logging.Logger):
if process_groupname == -1:
process_groupname = arg
+ # pylint: disable=too-many-branches
+ # pylint: disable=too-many-locals
+ # pylint: disable=too-many-statements
def __init__(self, *args, **kw):
- if kw.has_key('name'):
+ if 'name' in kw:
name = kw['name']
elif len(args) == 1:
name = args[0]
@@ -142,7 +146,9 @@ class Logger(logging.Logger):
logging.Logger.__init__(self, name)
- plaintextformatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s [%(process)d] %(message)s")
+ plaintextformatter = logging.Formatter(
+ "%(asctime)s %(name)s %(levelname)s [%(process)d] %(message)s"
+ )
if not self.fork:
self.console_stdout = logging.StreamHandler(sys.stdout)
@@ -150,7 +156,7 @@ class Logger(logging.Logger):
self.addHandler(self.console_stdout)
- if kw.has_key('logfile'):
+ if 'logfile' in kw:
self.logfile = kw['logfile']
else:
self.logfile = '/var/log/kolab/pykolab.log'
@@ -161,9 +167,9 @@ class Logger(logging.Logger):
# Make sure (read: attempt to change) the permissions
try:
try:
- (ruid, euid, suid) = os.getresuid()
- (rgid, egid, sgid) = os.getresgid()
- except AttributeError, errmsg:
+ (ruid, _, _) = os.getresuid()
+ (rgid, _, _) = os.getresgid()
+ except AttributeError:
ruid = os.getuid()
rgid = os.getgid()
@@ -173,48 +179,52 @@ class Logger(logging.Logger):
# Get group entry details
try:
(
- group_name,
- group_password,
- group_gid,
- group_members
- ) = grp.getgrnam(self.process_groupname)
+ _,
+ _,
+ group_gid,
+ _
+ ) = grp.getgrnam(self.process_groupname)
- except KeyError, errmsg:
- group_name = False
+ except KeyError:
+ group_gid = False
if ruid == 0:
# Means we haven't switched yet.
try:
(
- user_name,
- user_password,
- user_uid,
- user_gid,
- user_gecos,
- user_homedir,
- user_shell
- ) = pwd.getpwnam(self.process_username)
-
- except KeyError, errmsg:
- user_name = False
+ _,
+ _,
+ user_uid,
+ _,
+ _,
+ _,
+ _
+ ) = pwd.getpwnam(self.process_username)
+
+ except KeyError:
+ user_uid = False
if os.path.isfile(self.logfile):
try:
- if not user_uid == 0 or group_gid == 0:
+ if user_uid > 0 or group_gid > 0:
os.chown(
- self.logfile,
- user_uid,
- group_gid
- )
- os.chmod(self.logfile, 0660)
-
- except Exception, errmsg:
- self.error(_("Could not change permissions on %s: %r") % (self.logfile, errmsg))
+ self.logfile,
+ user_uid,
+ group_gid
+ )
+
+ os.chmod(self.logfile, 660)
+
+ except Exception as errmsg:
+ self.error(
+ _("Could not change permissions on %s: %r") % (self.logfile, errmsg)
+ )
+
if self.debuglevel > 8:
import traceback
traceback.print_exc()
- except Exception, errmsg:
+ except Exception as errmsg:
if os.path.isfile(self.logfile):
self.error(_("Could not change permissions on %s: %r") % (self.logfile, errmsg))
if self.debuglevel > 8:
@@ -223,7 +233,7 @@ class Logger(logging.Logger):
# Make sure the log file exists
try:
- fhandle = file(self.logfile, 'a')
+ fhandle = open(self.logfile, 'a')
try:
os.utime(self.logfile, None)
finally:
@@ -232,16 +242,16 @@ class Logger(logging.Logger):
try:
filelog_handler = logging.FileHandler(filename=self.logfile)
filelog_handler.setFormatter(plaintextformatter)
- except IOError, e:
- print >> sys.stderr, _("Cannot log to file %s: %s") % (self.logfile, e)
+ except IOError as errmsg:
+ print(_("Cannot log to file %s: %s") % (self.logfile, errmsg), file=sys.stderr)
- if not len(self.handlers) > 1:
+ if len(self.handlers) <= 1:
try:
self.addHandler(filelog_handler)
- except:
+ except Exception:
pass
- except IOError, errmsg:
+ except IOError:
pass
def remove_stdout_handler(self):
@@ -249,14 +259,16 @@ class Logger(logging.Logger):
self.console_stdout.close()
self.removeHandler(self.console_stdout)
+ # pylint: disable=arguments-differ
+ # pylint: disable=keyword-arg-before-vararg
def debug(self, msg, level=1, *args, **kw):
self.setLevel(self.loglevel)
# Work around other applications not using various levels of debugging
- if not self.name.startswith('pykolab') and not self.debuglevel == 9:
+ if not self.name.startswith('pykolab') and self.debuglevel != 9:
return
if level <= self.debuglevel:
- # TODO: Not the way it's supposed to work!
self.log(logging.DEBUG, msg)
+
logging.setLoggerClass(Logger)
diff --git a/pykolab/utils.py b/pykolab/utils.py
index 9794a57..1e5057c 100644
--- a/pykolab/utils.py
+++ b/pykolab/utils.py
@@ -17,21 +17,38 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
+
import base64
import getpass
import grp
import os
import pwd
+from six import string_types
import struct
import sys
import pykolab
from pykolab import constants
-from pykolab.translate import _
+from pykolab.translate import _ as _l
+# pylint: disable=invalid-name
log = pykolab.getLogger('pykolab.utils')
conf = pykolab.getConf()
+try:
+ # pylint: disable=redefined-builtin
+ input = raw_input
+except NameError:
+ pass
+
+try:
+ unicode('')
+except NameError:
+ unicode = str
+
+
+# pylint: disable=too-many-branches
def ask_question(question, default="", password=False, confirm=False):
"""
Ask a question on stderr.
@@ -43,56 +60,57 @@ def ask_question(question, default="", password=False, confirm=False):
Usage: pykolab.utils.ask_question("What is the server?", default="localhost")
"""
-
- if not default == "" and not default == None and conf.cli_keywords.answer_default:
+ if default != "" and default is not None and conf.cli_keywords.answer_default:
if not conf.cli_keywords.quiet:
- print ("%s [%s]: " % (question, default))
+ print("%s [%s]: " % (question, default))
return default
if password:
- if default == "" or default == None:
+ if default == "" or default is None:
answer = getpass.getpass("%s: " % (question))
else:
answer = getpass.getpass("%s [%s]: " % (question, default))
else:
- if default == "" or default == None:
- answer = raw_input("%s: " % (question))
+ if default == "" or default is None:
+ answer = input("%s: " % (question))
else:
- answer = raw_input("%s [%s]: " % (question, default))
+ answer = input("%s [%s]: " % (question, default))
+ # pylint: disable=too-many-nested-blocks
if not answer == "":
if confirm:
answer_confirm = None
answer_confirmed = False
while not answer_confirmed:
if password:
- answer_confirm = getpass.getpass(_("Confirm %s: ") % (question))
+ answer_confirm = getpass.getpass(_l("Confirm %s: ") % (question))
else:
- answer_confirm = raw_input(_("Confirm %s: ") % (question))
+ answer_confirm = input(_l("Confirm %s: ") % (question))
if not answer_confirm == answer:
- print >> sys.stderr, _("Incorrect confirmation. " + \
- "Please try again.")
+ print(_l("Incorrect confirmation. Please try again."), file=sys.stderr)
if password:
- if default == "" or default == None:
- answer = getpass.getpass(_("%s: ") % (question))
+ if default == "" or default is None:
+ answer = getpass.getpass(_l("%s: ") % (question))
else:
- answer = getpass.getpass(_("%s [%s]: ") % (question, default))
+ answer = getpass.getpass(_l("%s [%s]: ") % (question, default))
else:
- if default == "" or default == None:
- answer = raw_input(_("%s: ") % (question))
+ if default == "" or default is None:
+ answer = input(_l("%s: ") % (question))
else:
- answer = raw_input(_("%s [%s]: ") % (question, default))
+ answer = input(_l("%s [%s]: ") % (question, default))
else:
answer_confirmed = True
if answer == "":
return default
- else:
- return answer
+ return answer
+
+
+# pylint: disable=too-many-return-statements
def ask_confirmation(question, default="y", all_inclusive_no=True):
"""
Create a confirmation dialog, including a default option (capitalized),
@@ -101,11 +119,11 @@ def ask_confirmation(question, default="y", all_inclusive_no=True):
"""
default_answer = None
- if default in [ "y", "Y" ]:
+ if default in ["y", "Y"]:
default_answer = True
default_no = "n"
default_yes = "Y"
- elif default in [ "n", "N" ]:
+ elif default in ["n", "N"]:
default_answer = False
default_no = "N"
default_yes = "y"
@@ -115,44 +133,50 @@ def ask_confirmation(question, default="y", all_inclusive_no=True):
default_no = "'no'"
default_yes = "Please type 'yes'"
- if conf.cli_keywords.answer_yes or (conf.cli_keywords.answer_default and default_answer is not None):
+ if conf.cli_keywords.answer_yes \
+ or (conf.cli_keywords.answer_default and default_answer is not None):
+
if not conf.cli_keywords.quiet:
- print ("%s [%s/%s]: " % (question,default_yes,default_no))
+ print("%s [%s/%s]: " % (question, default_yes, default_no))
if conf.cli_keywords.answer_yes:
return True
if conf.cli_keywords.answer_default:
return default_answer
answer = False
- while answer == False:
- answer = raw_input("%s [%s/%s]: " % (question,default_yes,default_no))
+ while not answer:
+ answer = input("%s [%s/%s]: " % (question, default_yes, default_no))
# Parse answer and set back to False if not appropriate
if all_inclusive_no:
- if answer == "" and not default_answer == None:
+ if answer == "" and default_answer is not None:
return default_answer
- elif answer in [ "y", "Y", "yes" ]:
+
+ if answer in ["y", "Y", "yes"]:
return True
- elif answer in [ "n", "N", "no" ]:
- return False
- else:
- answer = False
- print >> sys.stderr, _("Please answer 'yes' or 'no'.")
- else:
- if not answer in [ "y", "Y", "yes" ]:
+
+ if answer in ["n", "N", "no"]:
return False
- else:
- return True
+ answer = False
+ print(_l("Please answer 'yes' or 'no'."), file=sys.stderr)
+
+ if answer not in ["y", "Y", "yes"]:
+ return False
+
+ return True
+
+
+# pylint: disable=dangerous-default-value
def ask_menu(question, options={}, default=''):
- if not default == '' and conf.cli_keywords.answer_default:
+ if default != '' and conf.cli_keywords.answer_default:
if not conf.cli_keywords.quiet:
- print question + " [" + default + "]:"
+ print(question + " [" + default + "]:")
return default
- if not default == '':
- print question + " [" + default + "]:"
+ if default != '':
+ print(question + " [" + default + "]:")
else:
- print question
+ print(question)
answer_correct = False
max_key_length = 0
@@ -162,7 +186,7 @@ def ask_menu(question, options={}, default=''):
options = {}
for key in _options:
options[key] = key
-
+
keys = options.keys()
keys.sort()
@@ -174,24 +198,24 @@ def ask_menu(question, options={}, default=''):
str_format = "%%%ds" % max_key_length
- if default == '' or not default in options.keys():
+ if default == '' or default not in options.keys():
for key in keys:
if options[key] == key:
- print " - " + key
+ print(" - " + key)
else:
- print " - " + eval("str_format % key") + ": " + options[key]
+ print(" - " + str_format % key + ": " + options[key])
- answer = raw_input(_("Choice") + ": ")
+ answer = input(_l("Choice") + ": ")
else:
- answer = raw_input(_("Choice (type '?' for options)") + ": ")
+ answer = input(_l("Choice (type '?' for options)") + ": ")
if answer == '?':
for key in keys:
if options[key] == key:
- print " - " + key
+ print(" - " + key)
else:
- print " - " + eval("str_format % key") + ": " + options[key]
+ print(" - " + str_format % key + ": " + options[key])
continue
@@ -203,8 +227,9 @@ def ask_menu(question, options={}, default=''):
return answer
+
def decode(key, enc):
- if key == None:
+ if key is None:
return enc
dec = []
@@ -215,8 +240,9 @@ def decode(key, enc):
dec.append(dec_c)
return "".join(dec)
+
def encode(key, clear):
- if key == None:
+ if key is None:
return clear
enc = []
@@ -226,15 +252,16 @@ def encode(key, clear):
enc.append(enc_c)
return base64.urlsafe_b64encode("".join(enc))
+
def ensure_directory(_dir, _user='root', _group='root'):
if not os.path.isdir(_dir):
os.makedirs(_dir)
try:
try:
- (ruid, euid, suid) = os.getresuid()
- (rgid, egid, sgid) = os.getresgid()
- except AttributeError, errmsg:
+ (ruid, _, _) = os.getresuid()
+ (rgid, _, _) = os.getresgid()
+ except AttributeError:
ruid = os.getuid()
rgid = os.getgid()
@@ -243,18 +270,10 @@ def ensure_directory(_dir, _user='root', _group='root'):
if rgid == 0:
# Get group entry details
try:
- (
- group_name,
- group_password,
- group_gid,
- group_members
- ) = grp.getgrnam(_group)
+ (_, _, group_gid, _) = grp.getgrnam(_group)
except KeyError:
- print >> sys.stderr, _("Group %s does not exist") % (
- _group
- )
-
+ print(_l("Group %s does not exist") % (_group), file=sys.stderr)
sys.exit(1)
# Set real and effective group if not the same as current.
@@ -264,28 +283,20 @@ def ensure_directory(_dir, _user='root', _group='root'):
if ruid == 0:
# Means we haven't switched yet.
try:
- (
- user_name,
- user_password,
- user_uid,
- user_gid,
- user_gecos,
- user_homedir,
- user_shell
- ) = pwd.getpwnam(_user)
+ (_, _, user_uid, _, _, _, _) = pwd.getpwnam(_user)
except KeyError:
- print >> sys.stderr, _("User %s does not exist") % (_user)
+ print(_l("User %s does not exist") % (_user), file=sys.stderr)
sys.exit(1)
-
# Set real and effective user if not the same as current.
if not user_uid == ruid:
os.chown(_dir, user_uid, -1)
- except:
- print >> sys.stderr, _("Could not change the permissions on %s") % (_dir)
+ except Exception:
+ print(_l("Could not change the permissions on %s") % (_dir), file=sys.stderr)
+
def generate_password():
import subprocess
@@ -299,6 +310,7 @@ def generate_password():
return output
+
def multiline_message(message):
if hasattr(conf, 'cli_keywords') and hasattr(conf.cli_keywords, 'quiet'):
if conf.cli_keywords.quiet:
@@ -326,41 +338,44 @@ def multiline_message(message):
return "\n%s\n" % ("\n".join(lines))
+
def stripped_message(message):
return "\n" + message.strip() + "\n"
+
def str2unicode(s, encoding='utf-8'):
if isinstance(s, unicode):
return s
try:
return unicode(s, encoding)
- except:
+ except Exception:
pass
return s
+
def normalize(_object):
- if type(_object) == list:
+ if isinstance(_object, list):
result = []
- elif type(_object) == dict:
+ elif isinstance(_object, dict):
result = {}
else:
return _object
- if type(_object) == list:
+ if isinstance(_object, list):
for item in _object:
result.append(item.lower())
result = list(set(result))
return result
- elif type(_object) == dict:
+ if isinstance(_object, dict):
def _strip(value):
try:
return value.strip()
- except:
+ except Exception:
return value
for key in _object:
- if type(_object[key]) == list:
+ if isinstance(_object[key], list):
if _object[key] is None:
continue
@@ -382,21 +397,21 @@ def normalize(_object):
result[key.lower()] = _strip(_object[key])
- if result.has_key('objectsid') and not result['objectsid'][0] == "S":
+ if 'objectsid' in result and not result['objectsid'][0] == "S":
result['objectsid'] = sid_to_string(result['objectsid'])
- if result.has_key('sn'):
+ if 'sn' in result:
result['surname'] = result['sn'].replace(' ', '')
- if result.has_key('mail'):
+ if 'mail' in result:
if isinstance(result['mail'], list):
result['mail'] = result['mail'][0]
- if len(result['mail']) > 0:
+ if result['mail']:
if len(result['mail'].split('@')) > 1:
result['domain'] = result['mail'].split('@')[1]
- if not result.has_key('domain') and result.has_key('standard_domain'):
+ if 'domain' not in result and 'standard_domain' in result:
result['domain'] = result['standard_domain']
if 'objectclass' not in result:
@@ -412,7 +427,8 @@ def normalize(_object):
return result
-def parse_input(_input, splitchars= [ ' ' ]):
+
+def parse_input(_input, splitchars=[' ']):
"""
Split the input string using the split characters defined
in splitchars, and remove the empty list items, then unique the
@@ -438,6 +454,7 @@ def parse_input(_input, splitchars= [ ' ' ]):
return _output_list
+
def parse_ldap_uri(uri):
"""
Parse an LDAP URI and return it's components.
@@ -462,7 +479,7 @@ def parse_ldap_uri(uri):
_server = _ldap_uri.split('//')[1].split('/')[0]
_base_dn = _ldap_uri.split('//')[1].split('/')[1]
- except:
+ except Exception:
_server = uri.split('//')[1].split('/')[0]
_attr = None
_scope = None
@@ -483,7 +500,7 @@ def parse_ldap_uri(uri):
if _attr == '':
_attrs = []
else:
- _attrs = [ _attr ]
+ _attrs = [_attr]
if _scope == '':
_scope = 'sub'
@@ -491,11 +508,12 @@ def parse_ldap_uri(uri):
if _filter == '':
_filter = "(objectclass=*)"
- return (_protocol, _server, _port, _base_dn, _attr, _scope, _filter)
+ return (_protocol, _server, _port, _base_dn, _attrs, _scope, _filter)
- except:
+ except Exception:
return None
+
def pop_empty_from_list(_input_list):
_output_list = []
@@ -503,6 +521,7 @@ def pop_empty_from_list(_input_list):
if not item == '':
_output_list.append(item)
+
def sid_to_string(sid):
srl = ord(sid[0])
number_sub_id = ord(sid[1])
@@ -511,70 +530,76 @@ def sid_to_string(sid):
sub_ids = []
for i in range(number_sub_id):
- sub_ids.append(struct.unpack('<I',sid[8+4*i:12+4*i])[0])
+ sub_ids.append(struct.unpack('<I', sid[8 + 4 * i:12 + 4 * i])[0])
result = 'S-%d-%d-%s' % (
- srl,
- iav,
- '-'.join([str(s) for s in sub_ids]),
- )
+ srl,
+ iav,
+ '-'.join([str(s) for s in sub_ids]),
+ )
return result
+
def standard_root_dn(domain):
return 'dc=%s' % (',dc='.join(domain.split('.')))
+
def translate(mystring, locale_name='en_US'):
import locale
import subprocess
- log.debug(_("Transliterating string %r with locale %r") % (mystring, locale_name), level=8)
+ log.debug(_l("Transliterating string %r with locale %r") % (mystring, locale_name), level=8)
if len(locale.normalize(locale_name).split('.')) > 1:
- (locale_name,locale_charset) = locale.normalize(locale_name).split('.')
+ (locale_name, locale_charset) = locale.normalize(locale_name).split('.')
else:
locale_charset = 'utf-8'
try:
- log.debug(_("Attempting to set locale"), level=8)
- locale.setlocale(locale.LC_ALL, (locale_name,locale_charset))
- log.debug(_("Success setting locale"), level=8)
- except:
- log.debug(_("Failure to set locale"), level=8)
- pass
-
- command = [ '/usr/bin/iconv',
- '-f', 'UTF-8',
- '-t', 'ASCII//TRANSLIT',
- '-s' ]
-
- log.debug(_("Executing '%s | %s'") % (r"%s" % (mystring), ' '.join(command)), level=8)
- process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, env={'LANG': locale.normalize(locale_name)})
+ log.debug(_l("Attempting to set locale"), level=8)
+ locale.setlocale(locale.LC_ALL, (locale_name, locale_charset))
+ log.debug(_l("Success setting locale"), level=8)
+ except Exception:
+ log.debug(_l("Failure to set locale"), level=8)
+
+ command = ['/usr/bin/iconv', '-f', 'UTF-8', '-t', 'ASCII//TRANSLIT', '-s']
+
+ log.debug(_l("Executing '%s | %s'") % (r"%s" % (mystring), ' '.join(command)), level=8)
+ process = subprocess.Popen(
+ command,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env={'LANG': locale.normalize(locale_name)}
+ )
try:
print >> process.stdin, r"%s" % mystring
- except UnicodeEncodeError, errmsg:
+ except UnicodeEncodeError:
pass
result = process.communicate()[0].strip()
if '?' in result or (result == '' and not mystring == ''):
- log.warning(_("Could not translate %s using locale %s") % (mystring, locale_name))
+ log.warning(_l("Could not translate %s using locale %s") % (mystring, locale_name))
from pykolab import translit
result = translit.transliterate(mystring, locale_name)
return result
+
def true_or_false(val):
- if val == None:
+ if val is None:
return False
if isinstance(val, bool):
return val
- if isinstance(val, basestring) or isinstance(val, str):
+ if isinstance(val, string_types):
val = val.lower()
- if val in [ "true", "yes", "y", "1" ]:
+
+ if val in ["true", "yes", "y", "1"]:
return True
else:
return False
@@ -585,6 +610,7 @@ def true_or_false(val):
else:
return False
+
def is_service(services):
"""
Checks each item in list services to see if it has a RC script in
@@ -605,4 +631,4 @@ def is_service(services):
else:
_other_services.append(service)
- return (_service,_other_services)
+ return (_service, _other_services)