summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2019-06-18 09:17:15 +0200
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2019-06-18 09:17:15 +0200
commitbaeee0049f06430801a2d9940f875747e51be7f5 (patch)
treeca6c96a3a8173316a5c7ddd0ff9293c7c920c0b5
parentc6aa28265c6e8a7d0f92871c5ef7d6bfc34d1d50 (diff)
downloadpykolab-baeee0049f06430801a2d9940f875747e51be7f5.tar.gz
Implement multiprocessing part of D761, letting wallace restart cleanly and fast.
-rw-r--r--wallace/__init__.py367
-rw-r--r--wallace/modules.py67
2 files changed, 265 insertions, 169 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py
index 0078ec1..56cae74 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com)
+# Copyright 2010-2019 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
@@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
+
import asyncore
from distutils import version
import grp
@@ -31,31 +33,33 @@ import sys
import tempfile
from threading import _Timer
import time
-import traceback
import pykolab
from pykolab import utils
from pykolab.translate import _
+from modules import cb_action_ACCEPT
+
+# pylint: disable=invalid-name
log = pykolab.getLogger('pykolab.wallace')
conf = pykolab.getConf()
-from modules import cb_action_ACCEPT
-def pickup_message(filepath, *args, **kw):
+def pickup_message(filepath, *args, **kwargs):
wallace_modules = args[0]
- if kw.has_key('module'):
+
+ if 'module' in kwargs:
# Cause the previous modules to be skipped
- wallace_modules = wallace_modules[(wallace_modules.index(kw['module'])+1):]
+ wallace_modules = wallace_modules[(wallace_modules.index(kwargs['module']) + 1):]
log.debug(_("Wallace modules: %r") % (wallace_modules), level=8)
# Execute the module
- if kw.has_key('stage'):
- modules.execute(kw['module'], filepath, stage=kw['stage'])
+ if 'stage' in kwargs:
+ modules.execute(kwargs['module'], filepath, stage=kwargs['stage'])
else:
- modules.execute(kw['module'], filepath)
+ modules.execute(kwargs['module'], filepath)
# After all modules are executed, continue with a call to
# accept the message and re-inject in to Postfix.
@@ -64,11 +68,18 @@ def pickup_message(filepath, *args, **kw):
for module in wallace_modules:
try:
result_filepath = modules.execute(module, filepath)
- except:
- log.error(_("Module %s.execute() failed on message %r with error: %s" % (module, filepath, traceback.format_exc())))
+ except Exception:
+ log.error(
+ "Module %s.execute() failed on message %r with error: %s" % (
+ module,
+ filepath,
+ traceback.format_exc()
+ )
+ )
+
result_filepath = False
- if not result_filepath == None and not result_filepath == False:
+ if result_filepath is not None and result_filepath is not False:
filepath = result_filepath
else:
# A module has returned False or None
@@ -80,26 +91,38 @@ def pickup_message(filepath, *args, **kw):
if continue_with_accept:
cb_action_ACCEPT('wallace', filepath)
+
def modules_heartbeat(wallace_modules):
lastrun = 0
- while True:
+ while not multiprocessing.current_process().finished.is_set():
try:
for module in wallace_modules:
try:
modules.heartbeat(module, lastrun)
- except:
- log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc())))
+ except Exception:
+ log.error(
+ "Module %s.heartbeat() failed with error: %s" % (
+ module,
+ traceback.format_exc()
+ )
+ )
lastrun = int(time.time())
- time.sleep(60)
- except (SystemExit, KeyboardInterrupt), e:
- log.info("Terminating heartbeat process")
+ multiprocessing.current_process().finished.wait(60)
+
+ except (SystemExit, KeyboardInterrupt) as errmsg:
+ log.warning("Exiting %s, %s" % (multiprocessing.current_process().name, errmsg))
break
-def worker_process(*args, **kw):
- log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
+def worker_process(*args, **kwargs):
+ import signal
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ log.debug("Worker process %s initializing" % (multiprocessing.current_process().name), level=1)
+
+
+# pylint: disable=too-few-public-methods
class Timer(_Timer):
def run(self):
while True:
@@ -109,92 +132,99 @@ class Timer(_Timer):
self.finished.set()
-class WallaceDaemon(object):
+
+class WallaceDaemon:
def __init__(self):
self.current_connections = 0
self.max_connections = 24
+ self.parent_pid = None
self.pool = None
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options"))
daemon_group.add_option(
- "--fork",
- dest = "fork_mode",
- action = "store_true",
- default = False,
- help = _("Fork to the background.")
- )
+ "--fork",
+ dest="fork_mode",
+ action="store_true",
+ default=False,
+ help=_("Fork to the background.")
+ )
daemon_group.add_option(
- "-b", "--bind",
- dest = "wallace_bind_address",
- action = "store",
- default = "localhost",
- help = _("Bind address for Wallace.")
- )
+ "-b", "--bind",
+ dest="wallace_bind_address",
+ action="store",
+ default="localhost",
+ help=_("Bind address for Wallace.")
+ )
daemon_group.add_option(
- "-g",
- "--group",
- dest = "process_groupname",
- action = "store",
- default = "kolab",
- help = _("Run as group GROUPNAME"),
- metavar = "GROUPNAME"
- )
+ "-g", "--group",
+ dest="process_groupname",
+ action="store",
+ default="kolab",
+ help=_("Run as group GROUPNAME"),
+ metavar="GROUPNAME"
+ )
daemon_group.add_option(
- "--threads",
- dest = "max_threads",
- action = "store",
- default = 4,
- type = int,
- help = _("Number of threads to use.")
- )
+ "--threads",
+ dest="max_threads",
+ action="store",
+ default=4,
+ type=int,
+ help=_("Number of threads to use.")
+ )
daemon_group.add_option(
- "-p", "--pid-file",
- dest = "pidfile",
- action = "store",
- default = "/var/run/wallaced/wallaced.pid",
- help = _("Path to the PID file to use.")
- )
+ "-p", "--pid-file",
+ dest="pidfile",
+ action="store",
+ default="/var/run/wallaced/wallaced.pid",
+ help=_("Path to the PID file to use.")
+ )
daemon_group.add_option(
- "--port",
- dest = "wallace_port",
- action = "store",
- default = 10026,
- type = int,
- help = _("Port that Wallace is supposed to use.")
- )
+ "--port",
+ dest="wallace_port",
+ action="store",
+ default=10026,
+ type=int,
+ help=_("Port that Wallace is supposed to use.")
+ )
daemon_group.add_option(
- "-u",
- "--user",
- dest = "process_username",
- action = "store",
- default = "kolab",
- help = _("Run as user USERNAME"),
- metavar = "USERNAME"
- )
+ "-u", "--user",
+ dest="process_username",
+ action="store",
+ default="kolab",
+ help=_("Run as user USERNAME"),
+ metavar="USERNAME"
+ )
conf.finalize_conf()
utils.ensure_directory(
- os.path.dirname(conf.pidfile),
- conf.process_username,
- conf.process_groupname
- )
+ os.path.dirname(conf.pidfile),
+ conf.process_username,
+ conf.process_groupname
+ )
+
+ if conf.debuglevel >= 9:
+ mp_logger = multiprocessing.get_logger()
+ mp_logger.setLevel(multiprocessing.SUBDEBUG)
+ mp_logger.debug('Python multi-processing logger started')
import modules
modules.__init__()
self.modules = conf.get_list('wallace', 'modules')
- if self.modules == None:
+ if not self.modules:
self.modules = []
def do_wallace(self):
+ self.parent_pid = os.getpid()
+
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"):
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1)
else:
@@ -215,20 +245,23 @@ class WallaceDaemon(object):
s.bind((conf.wallace_bind_address, conf.wallace_port))
bound = True
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception:
log.warning(
- _("Could not bind to socket on port %d on bind " + \
- "address %s") % (
- conf.wallace_port,
- conf.wallace_bind_address
- )
+ _("Could not bind to socket on port %d on bind address %s") % (
+ conf.wallace_port,
+ conf.wallace_bind_address
)
+ )
while not shutdown:
try:
s.shutdown(socket.SHUT_RDWR)
shutdown = True
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception:
log.warning(_("Could not shut down socket"))
time.sleep(1)
@@ -238,39 +271,59 @@ class WallaceDaemon(object):
s.listen(5)
- self.timer = Timer(180, self.pickup_spool_messages)
+ self.timer = Timer(180, self.pickup_spool_messages, args=[], kwargs={'sync': True})
+ self.timer.daemon = True
self.timer.start()
# start background process to run periodic jobs in active modules
- self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
- self.heartbeat.daemon = True
- self.heartbeat.start()
+ try:
+ self.heartbeat = multiprocessing.Process(
+ target=modules_heartbeat,
+ name='Wallace_Heartbeat',
+ args=[self.modules]
+ )
+
+ self.heartbeat.finished = multiprocessing.Event()
+ self.heartbeat.daemon = True
+ self.heartbeat.start()
+ except Exception as errmsg:
+ log.error("Failed to start heartbeat daemon: %s" % (errmsg))
+ finally:
+ log.debug(
+ "Wallace heartbeat is %s" % ('not alive', 'alive')[self.heartbeat.is_alive()],
+ level=8
+ )
try:
while 1:
while self.current_connections >= self.max_connections:
+ log.debug("Out of connections.")
time.sleep(0.5)
pair = s.accept()
log.info(_("Accepted connection"))
- if not pair == None:
+ if pair is not None:
self.current_connections += 1
connection, address = pair
- channel = SMTPChannel(self, connection, address)
+ SMTPChannel(self, connection, address)
asyncore.loop()
- except Exception, errmsg:
+ # pylint: disable=broad-except
+ except Exception:
traceback.print_exc()
s.shutdown(1)
s.close()
# shut down hearbeat process
self.heartbeat.terminate()
+ self.timer.cancel()
+ self.timer.join()
def data_header(self, mailfrom, rcpttos):
COMMASPACE = ', '
+
return "X-Kolab-From: " + mailfrom + "\r\n" + \
- "X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
+ "X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
def pickup_spool_messages(self, sync=False):
# Mind you to include the trailing slash
@@ -281,7 +334,6 @@ class WallaceDaemon(object):
for filename in files:
messages.append((root, filename))
-
for root, filename in messages:
filepath = os.path.join(root, filename)
@@ -296,7 +348,8 @@ class WallaceDaemon(object):
log.debug("File is in locks directory. Skipping %s" % (filepath), level=8)
continue
- except Exception, errmsg:
+ # pylint: disable=broad-except
+ except Exception as errmsg:
log.error("Error: %s. Skipping %s" % (errmsg, filepath))
continue
@@ -374,7 +427,7 @@ class WallaceDaemon(object):
# @TODO: and add line separator (\n or \r\n?)
# we should make sure there's only one line separator between
# kolab headers and the original message (data)
- os.write(fp, header);
+ os.write(fp, header)
os.write(fp, data)
os.close(fp)
@@ -384,20 +437,56 @@ class WallaceDaemon(object):
return "250 OK Message %s queued" % (filename)
- def reload_config(self, *args, **kw):
+ def reload_config(self, *args, **kwargs):
pass
- def remove_pid(self, *args, **kw):
- if os.access(conf.pidfile, os.R_OK):
- os.remove(conf.pidfile)
+ def remove_pid(self, *args, **kwargs):
+ try:
+ if os.getpid() == self.parent_pid:
+ log.debug("Stopping process %s" % multiprocessing.current_process().name, level=8)
- if self.pool is not None:
- self.pool.close()
- self.pool.join()
+ log.debug(_("Terminating processes pool"), level=8)
+ self.pool.close()
- raise SystemExit
+ if hasattr(self, 'timer'):
+ if not self.timer.finished.is_set():
+ log.debug("Canceling Wallace Timer", level=8)
+ self.timer.finished.set()
+ self.timer.cancel()
- def run(self):
+ log.debug(_("Terminating heartbeat process"), level=8)
+ self.heartbeat.finished.set()
+ self.heartbeat.terminate()
+
+ self.pool.close()
+ self.pool.join(5)
+ self.timer.join(5)
+ self.heartbeat.join(5)
+
+ if os.access(conf.pidfile, os.R_OK):
+ log.warning(_("Removing PID file %s") % conf.pidfile)
+ os.remove(conf.pidfile)
+
+ log.warning("Exiting!")
+ sys.exit()
+
+ else:
+ sys.exit(0)
+
+ except Exception as errmsg:
+ log.debug(
+ "Exception while trying to stop %s: %s" % (
+ multiprocessing.current_process().name, errmsg
+ ),
+ level=8
+ )
+
+ sys.exit(1)
+
+ sys.exit(0)
+
+ # pylint: disable=too-many-locals
+ def run(self): # noqa: C901
"""
Run the Wallace daemon.
"""
@@ -408,7 +497,7 @@ class WallaceDaemon(object):
try:
(ruid, euid, suid) = os.getresuid()
(rgid, egid, sgid) = os.getresgid()
- except AttributeError, errmsg:
+ except AttributeError:
ruid = os.getuid()
rgid = os.getgid()
@@ -418,27 +507,25 @@ class WallaceDaemon(object):
# Get group entry details
try:
(
- group_name,
- group_password,
- group_gid,
- group_members
- ) = grp.getgrnam(conf.process_groupname)
+ group_name,
+ group_password,
+ group_gid,
+ group_members
+ ) = grp.getgrnam(conf.process_groupname)
except KeyError:
- print >> sys.stderr, _("Group %s does not exist") % (
- conf.process_groupname
- )
+ print(_("Group %s does not exist") % (conf.process_groupname))
sys.exit(1)
# Set real and effective group if not the same as current.
if not group_gid == rgid:
log.debug(
- _("Switching real and effective group id to %d") % (
- group_gid
- ),
- level=8
- )
+ _("Switching real and effective group id to %d") % (
+ group_gid
+ ),
+ level=8
+ )
os.setregid(group_gid, group_gid)
@@ -446,35 +533,33 @@ class WallaceDaemon(object):
# Means we haven't switched yet.
try:
(
- user_name,
- user_password,
- user_uid,
- user_gid,
- user_gecos,
- user_homedir,
- user_shell
- ) = pwd.getpwnam(conf.process_username)
+ user_name,
+ user_password,
+ user_uid,
+ user_gid,
+ user_gecos,
+ user_homedir,
+ user_shell
+ ) = pwd.getpwnam(conf.process_username)
except KeyError:
- print >> sys.stderr, _("User %s does not exist") % (
- conf.process_username
- )
+ print(_("User %s does not exist") % (conf.process_username))
sys.exit(1)
-
# Set real and effective user if not the same as current.
if not user_uid == ruid:
log.debug(
- _("Switching real and effective user id to %d") % (
- user_uid
- ),
- level=8
- )
+ _("Switching real and effective user id to %d") % (
+ user_uid
+ ),
+ level=8
+ )
os.setreuid(user_uid, user_uid)
- except:
+ # pylint: disable=broad-except
+ except Exception:
log.error(_("Could not change real and effective uid and/or gid"))
try:
@@ -517,24 +602,24 @@ class WallaceDaemon(object):
self.write_pid()
self.do_wallace()
- except SystemExit, e:
- exitcode = e
+ except SystemExit as errmsg:
+ exitcode = errmsg
except KeyboardInterrupt:
exitcode = 1
log.info(_("Interrupted by user"))
- except AttributeError, e:
+ except AttributeError:
exitcode = 1
traceback.print_exc()
- print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org")
+ print(_("Traceback occurred, please report a bug."))
- except TypeError, e:
+ except TypeError as errmsg:
exitcode = 1
traceback.print_exc()
- log.error(_("Type Error: %s") % e)
+ log.error(_("Type Error: %s") % errmsg)
except:
exitcode = 2
traceback.print_exc()
- print >> sys.stderr, _("Traceback occurred, please report a bug at https://issues.kolab.org")
+ print(_("Traceback occurred, please report a bug."))
sys.exit(exitcode)
@@ -550,4 +635,4 @@ class WallaceDaemon(object):
fp.write("%d\n" % (pid))
fp.close()
else:
- print >> sys.stderr, _("Could not write pid file %s") % (conf.pidfile)
+ print(_("Could not write pid file %s") % (conf.pidfile))
diff --git a/wallace/modules.py b/wallace/modules.py
index b4645a8..bb37eb0 100644
--- a/wallace/modules.py
+++ b/wallace/modules.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com)
+# Copyright 2010-2019 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
@@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
+
import os
import sys
import time
@@ -45,6 +47,7 @@ conf = pykolab.getConf()
modules = {}
+
def __init__():
# We only want the base path
modules_base_path = os.path.dirname(__file__)
@@ -55,15 +58,16 @@ def __init__():
for filename in filenames:
if filename.startswith('module_') and filename.endswith('.py'):
- module_name = filename.replace('.py','')
+ module_name = filename.replace('.py', '')
name = module_name.replace('module_', '')
- #print "exec(\"from %s import __init__ as %s_register\"" % (module_name,name)
+ # print("exec(\"from %s import __init__ as %s_register\")" % (module_name,name))
exec("from %s import __init__ as %s_register" % (module_name, name))
exec("%s_register()" % (name))
for dirname in dirnames:
register_group(modules_path, dirname)
+
def list_modules(*args, **kw):
"""
List modules
@@ -75,8 +79,8 @@ def list_modules(*args, **kw):
if isinstance(module, tuple):
module_group, module = module
__modules[module_group] = {
- module: modules[(module_group,module)]
- }
+ module: modules[(module_group, module)]
+ }
else:
__modules[module] = modules[module]
@@ -84,38 +88,45 @@ def list_modules(*args, **kw):
_modules.sort()
for _module in _modules:
- if __modules[_module].has_key('function'):
+ if 'function' in __modules[_module]:
# This is a top-level module
- if not __modules[_module]['description'] == None:
- print "%-25s - %s" % (_module.replace('_','-'),__modules[_module]['description'])
+ if __modules[_module]['description'] is not None:
+ print("%-25s - %s" % (_module.replace('_', '-'), __modules[_module]['description']))
else:
- print "%-25s" % (_module.replace('_','-'))
+ print("%-25s" % (_module.replace('_', '-')))
for _module in _modules:
- if not __modules[_module].has_key('function'):
+ if 'function' not in __modules[_module]:
# This is a nested module
- print "\n" + _("Module Group: %s") % (_module) + "\n"
+ print("\n" + _("Module Group: %s") % (_module) + "\n")
___modules = __modules[_module].keys()
___modules.sort()
for __module in ___modules:
- if not __modules[_module][__module]['description'] == None:
- print "%-4s%-21s - %s" % ('',__module.replace('_','-'),__modules[_module][__module]['description'])
+ if __modules[_module][__module]['description'] is not None:
+ print(
+ "%-4s%-21s - %s" % (
+ '',
+ _module.replace('_', '-'),
+ __modules[_module][__module]['description']
+ )
+ )
+
else:
- print "%-4s%-21s" % ('',__module.replace('_','-'))
+ print("%-4s%-21s" % ('', __module.replace('_', '-')))
+
def execute(name, *args, **kw):
- if not modules.has_key(name):
+ if name not in modules:
log.error(_("No such module %r in modules %r (1).") % (name, modules))
sys.exit(1)
- if not modules[name].has_key('function') and \
- not modules[name].has_key('group'):
- log.error(_("No such module %r in modules %r (2).") %(name, modules))
+ if 'function' not in modules[name] and 'group' not in modules[name]:
+ log.error(_("No such module %r in modules %r (2).") % (name, modules))
sys.exit(1)
try:
return modules[name]['function'](*args, **kw)
- except Exception, errmsg:
+ except Exception as errmsg:
log.exception(_("Module %r - Unknown error occurred; %r") % (name, errmsg))
def heartbeat(name, *args, **kw):
@@ -156,35 +167,35 @@ def _sendmail(sender, recipients, msg):
success = True
break
- except smtplib.SMTPServerDisconnected, errmsg:
+ except smtplib.SMTPServerDisconnected as errmsg:
log.error("SMTP Server Disconnected Error, %r" % (errmsg))
- except smtplib.SMTPConnectError, errmsg:
+ except smtplib.SMTPConnectError as errmsg:
# DEFER
log.error("SMTP Connect Error, %r" % (errmsg))
- except smtplib.SMTPDataError, errmsg:
+ except smtplib.SMTPDataError as errmsg:
# DEFER
log.error("SMTP Data Error, %r" % (errmsg))
- except smtplib.SMTPHeloError, errmsg:
+ except smtplib.SMTPHeloError as errmsg:
# DEFER
log.error("SMTP HELO Error, %r" % (errmsg))
- except smtplib.SMTPRecipientsRefused, errmsg:
+ except smtplib.SMTPRecipientsRefused as errmsg:
# REJECT, send NDR
log.error("SMTP Recipient(s) Refused, %r" % (errmsg))
- except smtplib.SMTPSenderRefused, errmsg:
+ except smtplib.SMTPSenderRefused as errmsg:
# REJECT, send NDR
log.error("SMTP Sender Refused, %r" % (errmsg))
- except Exception, errmsg:
+ except Exception as errmsg:
log.exception(_("smtplib - Unknown error occurred: %r") % (errmsg))
try:
smtp.quit()
- except Exception, errmsg:
+ except Exception as errmsg:
log.error("smtplib quit() error - %r" % errmsg)
time.sleep(10)
@@ -231,7 +242,7 @@ def cb_action_DEFER(module, filepath):
#now = datetime.datetime.now()
#delta = now - fileage
- #print "file:", filepath, "fileage:", fileage, "now:", now, "delta(seconds):", delta.seconds
+ #print("file:", filepath, "fileage:", fileage, "now:", now, "delta(seconds):", delta.seconds)
#if delta.seconds > 1800:
## TODO: Send NDR back to user