summaryrefslogtreecommitdiffstats
path: root/pykolab/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'pykolab/utils.py')
-rw-r--r--pykolab/utils.py270
1 files changed, 148 insertions, 122 deletions
diff --git a/pykolab/utils.py b/pykolab/utils.py
index 9794a57..1e5057c 100644
--- a/pykolab/utils.py
+++ b/pykolab/utils.py
@@ -17,21 +17,38 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
+
import base64
import getpass
import grp
import os
import pwd
+from six import string_types
import struct
import sys
import pykolab
from pykolab import constants
-from pykolab.translate import _
+from pykolab.translate import _ as _l
+# pylint: disable=invalid-name
log = pykolab.getLogger('pykolab.utils')
conf = pykolab.getConf()
+try:
+ # pylint: disable=redefined-builtin
+ input = raw_input
+except NameError:
+ pass
+
+try:
+ unicode('')
+except NameError:
+ unicode = str
+
+
+# pylint: disable=too-many-branches
def ask_question(question, default="", password=False, confirm=False):
"""
Ask a question on stderr.
@@ -43,56 +60,57 @@ def ask_question(question, default="", password=False, confirm=False):
Usage: pykolab.utils.ask_question("What is the server?", default="localhost")
"""
-
- if not default == "" and not default == None and conf.cli_keywords.answer_default:
+ if default != "" and default is not None and conf.cli_keywords.answer_default:
if not conf.cli_keywords.quiet:
- print ("%s [%s]: " % (question, default))
+ print("%s [%s]: " % (question, default))
return default
if password:
- if default == "" or default == None:
+ if default == "" or default is None:
answer = getpass.getpass("%s: " % (question))
else:
answer = getpass.getpass("%s [%s]: " % (question, default))
else:
- if default == "" or default == None:
- answer = raw_input("%s: " % (question))
+ if default == "" or default is None:
+ answer = input("%s: " % (question))
else:
- answer = raw_input("%s [%s]: " % (question, default))
+ answer = input("%s [%s]: " % (question, default))
+ # pylint: disable=too-many-nested-blocks
if not answer == "":
if confirm:
answer_confirm = None
answer_confirmed = False
while not answer_confirmed:
if password:
- answer_confirm = getpass.getpass(_("Confirm %s: ") % (question))
+ answer_confirm = getpass.getpass(_l("Confirm %s: ") % (question))
else:
- answer_confirm = raw_input(_("Confirm %s: ") % (question))
+ answer_confirm = input(_l("Confirm %s: ") % (question))
if not answer_confirm == answer:
- print >> sys.stderr, _("Incorrect confirmation. " + \
- "Please try again.")
+ print(_l("Incorrect confirmation. Please try again."), file=sys.stderr)
if password:
- if default == "" or default == None:
- answer = getpass.getpass(_("%s: ") % (question))
+ if default == "" or default is None:
+ answer = getpass.getpass(_l("%s: ") % (question))
else:
- answer = getpass.getpass(_("%s [%s]: ") % (question, default))
+ answer = getpass.getpass(_l("%s [%s]: ") % (question, default))
else:
- if default == "" or default == None:
- answer = raw_input(_("%s: ") % (question))
+ if default == "" or default is None:
+ answer = input(_l("%s: ") % (question))
else:
- answer = raw_input(_("%s [%s]: ") % (question, default))
+ answer = input(_l("%s [%s]: ") % (question, default))
else:
answer_confirmed = True
if answer == "":
return default
- else:
- return answer
+ return answer
+
+
+# pylint: disable=too-many-return-statements
def ask_confirmation(question, default="y", all_inclusive_no=True):
"""
Create a confirmation dialog, including a default option (capitalized),
@@ -101,11 +119,11 @@ def ask_confirmation(question, default="y", all_inclusive_no=True):
"""
default_answer = None
- if default in [ "y", "Y" ]:
+ if default in ["y", "Y"]:
default_answer = True
default_no = "n"
default_yes = "Y"
- elif default in [ "n", "N" ]:
+ elif default in ["n", "N"]:
default_answer = False
default_no = "N"
default_yes = "y"
@@ -115,44 +133,50 @@ def ask_confirmation(question, default="y", all_inclusive_no=True):
default_no = "'no'"
default_yes = "Please type 'yes'"
- if conf.cli_keywords.answer_yes or (conf.cli_keywords.answer_default and default_answer is not None):
+ if conf.cli_keywords.answer_yes \
+ or (conf.cli_keywords.answer_default and default_answer is not None):
+
if not conf.cli_keywords.quiet:
- print ("%s [%s/%s]: " % (question,default_yes,default_no))
+ print("%s [%s/%s]: " % (question, default_yes, default_no))
if conf.cli_keywords.answer_yes:
return True
if conf.cli_keywords.answer_default:
return default_answer
answer = False
- while answer == False:
- answer = raw_input("%s [%s/%s]: " % (question,default_yes,default_no))
+ while not answer:
+ answer = input("%s [%s/%s]: " % (question, default_yes, default_no))
# Parse answer and set back to False if not appropriate
if all_inclusive_no:
- if answer == "" and not default_answer == None:
+ if answer == "" and default_answer is not None:
return default_answer
- elif answer in [ "y", "Y", "yes" ]:
+
+ if answer in ["y", "Y", "yes"]:
return True
- elif answer in [ "n", "N", "no" ]:
- return False
- else:
- answer = False
- print >> sys.stderr, _("Please answer 'yes' or 'no'.")
- else:
- if not answer in [ "y", "Y", "yes" ]:
+
+ if answer in ["n", "N", "no"]:
return False
- else:
- return True
+ answer = False
+ print(_l("Please answer 'yes' or 'no'."), file=sys.stderr)
+
+ if answer not in ["y", "Y", "yes"]:
+ return False
+
+ return True
+
+
+# pylint: disable=dangerous-default-value
def ask_menu(question, options={}, default=''):
- if not default == '' and conf.cli_keywords.answer_default:
+ if default != '' and conf.cli_keywords.answer_default:
if not conf.cli_keywords.quiet:
- print question + " [" + default + "]:"
+ print(question + " [" + default + "]:")
return default
- if not default == '':
- print question + " [" + default + "]:"
+ if default != '':
+ print(question + " [" + default + "]:")
else:
- print question
+ print(question)
answer_correct = False
max_key_length = 0
@@ -162,7 +186,7 @@ def ask_menu(question, options={}, default=''):
options = {}
for key in _options:
options[key] = key
-
+
keys = options.keys()
keys.sort()
@@ -174,24 +198,24 @@ def ask_menu(question, options={}, default=''):
str_format = "%%%ds" % max_key_length
- if default == '' or not default in options.keys():
+ if default == '' or default not in options.keys():
for key in keys:
if options[key] == key:
- print " - " + key
+ print(" - " + key)
else:
- print " - " + eval("str_format % key") + ": " + options[key]
+ print(" - " + str_format % key + ": " + options[key])
- answer = raw_input(_("Choice") + ": ")
+ answer = input(_l("Choice") + ": ")
else:
- answer = raw_input(_("Choice (type '?' for options)") + ": ")
+ answer = input(_l("Choice (type '?' for options)") + ": ")
if answer == '?':
for key in keys:
if options[key] == key:
- print " - " + key
+ print(" - " + key)
else:
- print " - " + eval("str_format % key") + ": " + options[key]
+ print(" - " + str_format % key + ": " + options[key])
continue
@@ -203,8 +227,9 @@ def ask_menu(question, options={}, default=''):
return answer
+
def decode(key, enc):
- if key == None:
+ if key is None:
return enc
dec = []
@@ -215,8 +240,9 @@ def decode(key, enc):
dec.append(dec_c)
return "".join(dec)
+
def encode(key, clear):
- if key == None:
+ if key is None:
return clear
enc = []
@@ -226,15 +252,16 @@ def encode(key, clear):
enc.append(enc_c)
return base64.urlsafe_b64encode("".join(enc))
+
def ensure_directory(_dir, _user='root', _group='root'):
if not os.path.isdir(_dir):
os.makedirs(_dir)
try:
try:
- (ruid, euid, suid) = os.getresuid()
- (rgid, egid, sgid) = os.getresgid()
- except AttributeError, errmsg:
+ (ruid, _, _) = os.getresuid()
+ (rgid, _, _) = os.getresgid()
+ except AttributeError:
ruid = os.getuid()
rgid = os.getgid()
@@ -243,18 +270,10 @@ def ensure_directory(_dir, _user='root', _group='root'):
if rgid == 0:
# Get group entry details
try:
- (
- group_name,
- group_password,
- group_gid,
- group_members
- ) = grp.getgrnam(_group)
+ (_, _, group_gid, _) = grp.getgrnam(_group)
except KeyError:
- print >> sys.stderr, _("Group %s does not exist") % (
- _group
- )
-
+ print(_l("Group %s does not exist") % (_group), file=sys.stderr)
sys.exit(1)
# Set real and effective group if not the same as current.
@@ -264,28 +283,20 @@ def ensure_directory(_dir, _user='root', _group='root'):
if ruid == 0:
# Means we haven't switched yet.
try:
- (
- user_name,
- user_password,
- user_uid,
- user_gid,
- user_gecos,
- user_homedir,
- user_shell
- ) = pwd.getpwnam(_user)
+ (_, _, user_uid, _, _, _, _) = pwd.getpwnam(_user)
except KeyError:
- print >> sys.stderr, _("User %s does not exist") % (_user)
+ print(_l("User %s does not exist") % (_user), file=sys.stderr)
sys.exit(1)
-
# Set real and effective user if not the same as current.
if not user_uid == ruid:
os.chown(_dir, user_uid, -1)
- except:
- print >> sys.stderr, _("Could not change the permissions on %s") % (_dir)
+ except Exception:
+ print(_l("Could not change the permissions on %s") % (_dir), file=sys.stderr)
+
def generate_password():
import subprocess
@@ -299,6 +310,7 @@ def generate_password():
return output
+
def multiline_message(message):
if hasattr(conf, 'cli_keywords') and hasattr(conf.cli_keywords, 'quiet'):
if conf.cli_keywords.quiet:
@@ -326,41 +338,44 @@ def multiline_message(message):
return "\n%s\n" % ("\n".join(lines))
+
def stripped_message(message):
return "\n" + message.strip() + "\n"
+
def str2unicode(s, encoding='utf-8'):
if isinstance(s, unicode):
return s
try:
return unicode(s, encoding)
- except:
+ except Exception:
pass
return s
+
def normalize(_object):
- if type(_object) == list:
+ if isinstance(_object, list):
result = []
- elif type(_object) == dict:
+ elif isinstance(_object, dict):
result = {}
else:
return _object
- if type(_object) == list:
+ if isinstance(_object, list):
for item in _object:
result.append(item.lower())
result = list(set(result))
return result
- elif type(_object) == dict:
+ if isinstance(_object, dict):
def _strip(value):
try:
return value.strip()
- except:
+ except Exception:
return value
for key in _object:
- if type(_object[key]) == list:
+ if isinstance(_object[key], list):
if _object[key] is None:
continue
@@ -382,21 +397,21 @@ def normalize(_object):
result[key.lower()] = _strip(_object[key])
- if result.has_key('objectsid') and not result['objectsid'][0] == "S":
+ if 'objectsid' in result and not result['objectsid'][0] == "S":
result['objectsid'] = sid_to_string(result['objectsid'])
- if result.has_key('sn'):
+ if 'sn' in result:
result['surname'] = result['sn'].replace(' ', '')
- if result.has_key('mail'):
+ if 'mail' in result:
if isinstance(result['mail'], list):
result['mail'] = result['mail'][0]
- if len(result['mail']) > 0:
+ if result['mail']:
if len(result['mail'].split('@')) > 1:
result['domain'] = result['mail'].split('@')[1]
- if not result.has_key('domain') and result.has_key('standard_domain'):
+ if 'domain' not in result and 'standard_domain' in result:
result['domain'] = result['standard_domain']
if 'objectclass' not in result:
@@ -412,7 +427,8 @@ def normalize(_object):
return result
-def parse_input(_input, splitchars= [ ' ' ]):
+
+def parse_input(_input, splitchars=[' ']):
"""
Split the input string using the split characters defined
in splitchars, and remove the empty list items, then unique the
@@ -438,6 +454,7 @@ def parse_input(_input, splitchars= [ ' ' ]):
return _output_list
+
def parse_ldap_uri(uri):
"""
Parse an LDAP URI and return it's components.
@@ -462,7 +479,7 @@ def parse_ldap_uri(uri):
_server = _ldap_uri.split('//')[1].split('/')[0]
_base_dn = _ldap_uri.split('//')[1].split('/')[1]
- except:
+ except Exception:
_server = uri.split('//')[1].split('/')[0]
_attr = None
_scope = None
@@ -483,7 +500,7 @@ def parse_ldap_uri(uri):
if _attr == '':
_attrs = []
else:
- _attrs = [ _attr ]
+ _attrs = [_attr]
if _scope == '':
_scope = 'sub'
@@ -491,11 +508,12 @@ def parse_ldap_uri(uri):
if _filter == '':
_filter = "(objectclass=*)"
- return (_protocol, _server, _port, _base_dn, _attr, _scope, _filter)
+ return (_protocol, _server, _port, _base_dn, _attrs, _scope, _filter)
- except:
+ except Exception:
return None
+
def pop_empty_from_list(_input_list):
_output_list = []
@@ -503,6 +521,7 @@ def pop_empty_from_list(_input_list):
if not item == '':
_output_list.append(item)
+
def sid_to_string(sid):
srl = ord(sid[0])
number_sub_id = ord(sid[1])
@@ -511,70 +530,76 @@ def sid_to_string(sid):
sub_ids = []
for i in range(number_sub_id):
- sub_ids.append(struct.unpack('<I',sid[8+4*i:12+4*i])[0])
+ sub_ids.append(struct.unpack('<I', sid[8 + 4 * i:12 + 4 * i])[0])
result = 'S-%d-%d-%s' % (
- srl,
- iav,
- '-'.join([str(s) for s in sub_ids]),
- )
+ srl,
+ iav,
+ '-'.join([str(s) for s in sub_ids]),
+ )
return result
+
def standard_root_dn(domain):
return 'dc=%s' % (',dc='.join(domain.split('.')))
+
def translate(mystring, locale_name='en_US'):
import locale
import subprocess
- log.debug(_("Transliterating string %r with locale %r") % (mystring, locale_name), level=8)
+ log.debug(_l("Transliterating string %r with locale %r") % (mystring, locale_name), level=8)
if len(locale.normalize(locale_name).split('.')) > 1:
- (locale_name,locale_charset) = locale.normalize(locale_name).split('.')
+ (locale_name, locale_charset) = locale.normalize(locale_name).split('.')
else:
locale_charset = 'utf-8'
try:
- log.debug(_("Attempting to set locale"), level=8)
- locale.setlocale(locale.LC_ALL, (locale_name,locale_charset))
- log.debug(_("Success setting locale"), level=8)
- except:
- log.debug(_("Failure to set locale"), level=8)
- pass
-
- command = [ '/usr/bin/iconv',
- '-f', 'UTF-8',
- '-t', 'ASCII//TRANSLIT',
- '-s' ]
-
- log.debug(_("Executing '%s | %s'") % (r"%s" % (mystring), ' '.join(command)), level=8)
- process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, env={'LANG': locale.normalize(locale_name)})
+ log.debug(_l("Attempting to set locale"), level=8)
+ locale.setlocale(locale.LC_ALL, (locale_name, locale_charset))
+ log.debug(_l("Success setting locale"), level=8)
+ except Exception:
+ log.debug(_l("Failure to set locale"), level=8)
+
+ command = ['/usr/bin/iconv', '-f', 'UTF-8', '-t', 'ASCII//TRANSLIT', '-s']
+
+ log.debug(_l("Executing '%s | %s'") % (r"%s" % (mystring), ' '.join(command)), level=8)
+ process = subprocess.Popen(
+ command,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env={'LANG': locale.normalize(locale_name)}
+ )
try:
print >> process.stdin, r"%s" % mystring
- except UnicodeEncodeError, errmsg:
+ except UnicodeEncodeError:
pass
result = process.communicate()[0].strip()
if '?' in result or (result == '' and not mystring == ''):
- log.warning(_("Could not translate %s using locale %s") % (mystring, locale_name))
+ log.warning(_l("Could not translate %s using locale %s") % (mystring, locale_name))
from pykolab import translit
result = translit.transliterate(mystring, locale_name)
return result
+
def true_or_false(val):
- if val == None:
+ if val is None:
return False
if isinstance(val, bool):
return val
- if isinstance(val, basestring) or isinstance(val, str):
+ if isinstance(val, string_types):
val = val.lower()
- if val in [ "true", "yes", "y", "1" ]:
+
+ if val in ["true", "yes", "y", "1"]:
return True
else:
return False
@@ -585,6 +610,7 @@ def true_or_false(val):
else:
return False
+
def is_service(services):
"""
Checks each item in list services to see if it has a RC script in
@@ -605,4 +631,4 @@ def is_service(services):
else:
_other_services.append(service)
- return (_service,_other_services)
+ return (_service, _other_services)