summaryrefslogtreecommitdiffstats
path: root/wallace/__init__.py
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2018-07-27 10:22:18 +0200
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2018-07-27 10:22:18 +0200
commit3c5ce19ae3e2b5ed90d0924c89d15f0c7609f53b (patch)
treed3e08b8874fdabfc858633a1ceb9e906ea743a51 /wallace/__init__.py
parenta1c700d53d8a9b0405f7bc28f835ab5142d0f72d (diff)
downloadpykolab-3c5ce19ae3e2b5ed90d0924c89d15f0c7609f53b.tar.gz
Manage the pool processes such that they die after a limited quantity of time, and pick up messages from the spool asynchronously.
Summary: Reference T75735 Let a single worker process timeout itself Set the signal as late as possible Abstract the method to pickup messages from the spool, so that a new recurring Timer may pick up messages out of the spool Test Plan: * Run messages through it Reviewers: #pykolab_developers Subscribers: #pykolab_developers Differential Revision: https://git.kolab.org/D617
Diffstat (limited to 'wallace/__init__.py')
-rw-r--r--wallace/__init__.py144
1 files changed, 96 insertions, 48 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py
index 1e050ef..5a2ffbd 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -29,6 +29,7 @@ import socket
import struct
import sys
import tempfile
+from threading import _Timer
import time
import traceback
@@ -99,6 +100,15 @@ def modules_heartbeat(wallace_modules):
def worker_process(*args, **kw):
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
+class Timer(_Timer):
+ def run(self):
+ while True:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+
+ self.finished.set()
+
class WallaceDaemon(object):
def __init__(self):
self.current_connections = 0
@@ -190,6 +200,8 @@ class WallaceDaemon(object):
else:
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ())
+ self.pickup_spool_messages(sync=True)
+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -226,54 +238,8 @@ class WallaceDaemon(object):
s.listen(5)
- # Mind you to include the trailing slash
- pickup_path = '/var/spool/pykolab/wallace/'
- for root, directory, files in os.walk(pickup_path):
- for filename in files:
- filepath = os.path.join(root, filename)
-
- if not root == pickup_path:
- module = os.path.dirname(root).replace(pickup_path, '')
-
- # Compare uppercase status (specifically, DEFER) with
- # lowercase (plugin names).
- #
- # The messages in DEFER are supposed to be picked up by
- # another thread, whereas the messages in other directories
- # are pending being handled by their respective plugins.
- #
- # TODO: Handle messages in spool directories for which a
- # plugin had been enabled, but is not enabled any longer.
- #
-
- if module.lower() == "defer":
- # Wallace was unable to deliver to re-injection smtpd.
- # Skip it, another thread is picking up the deferred
- # messages.
- continue
-
- stage = root.replace(pickup_path, '').split('/')
- if len(stage) < 2:
- stage = None
- else:
- stage = stage[1]
-
- if stage.lower() == "hold":
- continue
-
- # Do not handle messages in a defer state.
- if stage.lower() == "defer":
- continue
-
- self.current_connections += 1
- self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage}))
- self.current_connections -= 1
-
- continue
-
- self.current_connections += 1
- self.pool.apply_async(pickup_message, (filepath, (self.modules)))
- self.current_connections -= 1
+ self.timer = Timer(180, self.pickup_spool_messages)
+ self.timer.start()
# start background process to run periodic jobs in active modules
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
@@ -306,6 +272,88 @@ class WallaceDaemon(object):
return "X-Kolab-From: " + mailfrom + "\r\n" + \
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
+ def pickup_spool_messages(self, sync=False):
+ # Mind you to include the trailing slash
+ pickup_path = '/var/spool/pykolab/wallace/'
+
+ messages = []
+ for root, directory, files in os.walk(pickup_path):
+ for filename in files:
+ messages.append((root, filename))
+
+
+ for root, filename in messages:
+ filepath = os.path.join(root, filename)
+
+ try:
+ if os.stat(filepath).st_mtime + 150 > time.time():
+ log.debug("Skipping %s" % (filepath), level=8)
+ continue
+
+ except:
+ continue
+
+ if not root == pickup_path:
+ module = os.path.dirname(root).replace(pickup_path, '')
+
+ # Compare uppercase status (specifically, DEFER) with
+ # lowercase (plugin names).
+ #
+ # The messages in DEFER are supposed to be picked up by
+ # another thread, whereas the messages in other directories
+ # are pending being handled by their respective plugins.
+ #
+ # TODO: Handle messages in spool directories for which a
+ # plugin had been enabled, but is not enabled any longer.
+ #
+
+ if module.lower() == "defer":
+ # Wallace was unable to deliver to re-injection smtpd.
+ # Skip it, another thread is picking up the deferred
+ # messages.
+ continue
+
+ stage = root.replace(pickup_path, '').split('/')
+
+ if len(stage) < 2:
+ stage = None
+ else:
+ stage = stage[1]
+
+ if stage.lower() == "hold":
+ continue
+
+ # Do not handle messages in a defer state.
+ if stage.lower() == "defer":
+ continue
+
+ self.current_connections += 1
+
+ if sync:
+ pickup_message(filepath, self.modules, module=module, stage=stage)
+ else:
+ self.pool.apply_async(
+ pickup_message,
+ (
+ filepath,
+ (self.modules),
+ {'module': module, 'stage': stage}
+ )
+ )
+
+ self.current_connections -= 1
+
+ continue
+
+ self.current_connections += 1
+
+ if sync:
+ pickup_message(filepath, self.modules)
+ else:
+ self.pool.apply_async(pickup_message, (filepath, (self.modules)))
+
+ self.current_connections -= 1
+
def process_message(self, peer, mailfrom, rcpttos, data):
"""
We have retrieved the message. This should be as fast as possible,