summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2018-07-27 10:22:18 +0200
committerJeroen van Meeuwen (Kolab Systems) <vanmeeuwen@kolabsys.com>2018-07-27 10:22:18 +0200
commit3c5ce19ae3e2b5ed90d0924c89d15f0c7609f53b (patch)
treed3e08b8874fdabfc858633a1ceb9e906ea743a51
parenta1c700d53d8a9b0405f7bc28f835ab5142d0f72d (diff)
downloadpykolab-3c5ce19ae3e2b5ed90d0924c89d15f0c7609f53b.tar.gz
Manage the pool processes such that they die after a limited quantity of time, and pick up messages from the spool asynchronously.
Summary: Reference T75735 Let a single worker process timeout itself Set the signal as late as possible Abstract the method to pickup messages from the spool, so that a new recurring Timer may pick up messages out of the spool Test Plan: * Run messages through it Reviewers: #pykolab_developers Subscribers: #pykolab_developers Differential Revision: https://git.kolab.org/D617
-rw-r--r--wallace/__init__.py144
-rw-r--r--wallace/module_invitationpolicy.py13
-rw-r--r--wallace/module_resources.py7
3 files changed, 116 insertions, 48 deletions
diff --git a/wallace/__init__.py b/wallace/__init__.py
index 1e050ef..5a2ffbd 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -29,6 +29,7 @@ import socket
import struct
import sys
import tempfile
+from threading import _Timer
import time
import traceback
@@ -99,6 +100,15 @@ def modules_heartbeat(wallace_modules):
def worker_process(*args, **kw):
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
+class Timer(_Timer):
+ def run(self):
+ while True:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+
+ self.finished.set()
+
class WallaceDaemon(object):
def __init__(self):
self.current_connections = 0
@@ -190,6 +200,8 @@ class WallaceDaemon(object):
else:
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ())
+ self.pickup_spool_messages(sync=True)
+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -226,54 +238,8 @@ class WallaceDaemon(object):
s.listen(5)
- # Mind you to include the trailing slash
- pickup_path = '/var/spool/pykolab/wallace/'
- for root, directory, files in os.walk(pickup_path):
- for filename in files:
- filepath = os.path.join(root, filename)
-
- if not root == pickup_path:
- module = os.path.dirname(root).replace(pickup_path, '')
-
- # Compare uppercase status (specifically, DEFER) with
- # lowercase (plugin names).
- #
- # The messages in DEFER are supposed to be picked up by
- # another thread, whereas the messages in other directories
- # are pending being handled by their respective plugins.
- #
- # TODO: Handle messages in spool directories for which a
- # plugin had been enabled, but is not enabled any longer.
- #
-
- if module.lower() == "defer":
- # Wallace was unable to deliver to re-injection smtpd.
- # Skip it, another thread is picking up the deferred
- # messages.
- continue
-
- stage = root.replace(pickup_path, '').split('/')
- if len(stage) < 2:
- stage = None
- else:
- stage = stage[1]
-
- if stage.lower() == "hold":
- continue
-
- # Do not handle messages in a defer state.
- if stage.lower() == "defer":
- continue
-
- self.current_connections += 1
- self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage}))
- self.current_connections -= 1
-
- continue
-
- self.current_connections += 1
- self.pool.apply_async(pickup_message, (filepath, (self.modules)))
- self.current_connections -= 1
+ self.timer = Timer(180, self.pickup_spool_messages)
+ self.timer.start()
# start background process to run periodic jobs in active modules
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
@@ -306,6 +272,88 @@ class WallaceDaemon(object):
return "X-Kolab-From: " + mailfrom + "\r\n" + \
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
+ def pickup_spool_messages(self, sync=False):
+ # Mind you to include the trailing slash
+ pickup_path = '/var/spool/pykolab/wallace/'
+
+ messages = []
+ for root, directory, files in os.walk(pickup_path):
+ for filename in files:
+ messages.append((root, filename))
+
+
+ for root, filename in messages:
+ filepath = os.path.join(root, filename)
+
+ try:
+ if os.stat(filepath).st_mtime + 150 > time.time():
+ log.debug("Skipping %s" % (filepath), level=8)
+ continue
+
+ except:
+ continue
+
+ if not root == pickup_path:
+ module = os.path.dirname(root).replace(pickup_path, '')
+
+ # Compare uppercase status (specifically, DEFER) with
+ # lowercase (plugin names).
+ #
+ # The messages in DEFER are supposed to be picked up by
+ # another thread, whereas the messages in other directories
+ # are pending being handled by their respective plugins.
+ #
+ # TODO: Handle messages in spool directories for which a
+ # plugin had been enabled, but is not enabled any longer.
+ #
+
+ if module.lower() == "defer":
+ # Wallace was unable to deliver to re-injection smtpd.
+ # Skip it, another thread is picking up the deferred
+ # messages.
+ continue
+
+ stage = root.replace(pickup_path, '').split('/')
+
+ if len(stage) < 2:
+ stage = None
+ else:
+ stage = stage[1]
+
+ if stage.lower() == "hold":
+ continue
+
+ # Do not handle messages in a defer state.
+ if stage.lower() == "defer":
+ continue
+
+ self.current_connections += 1
+
+ if sync:
+ pickup_message(filepath, self.modules, module=module, stage=stage)
+ else:
+ self.pool.apply_async(
+ pickup_message,
+ (
+ filepath,
+ (self.modules),
+ {'module': module, 'stage': stage}
+ )
+ )
+
+ self.current_connections -= 1
+
+ continue
+
+ self.current_connections += 1
+
+ if sync:
+ pickup_message(filepath, self.modules)
+ else:
+ self.pool.apply_async(pickup_message, (filepath, (self.modules)))
+
+ self.current_connections -= 1
+
def process_message(self, peer, mailfrom, rcpttos, data):
"""
We have retrieved the message. This should be as fast as possible,
diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py
index b0dfe4c..55ad940 100644
--- a/wallace/module_invitationpolicy.py
+++ b/wallace/module_invitationpolicy.py
@@ -19,6 +19,7 @@
import datetime
import os
+import signal
import tempfile
import time
from urlparse import urlparse
@@ -1290,8 +1291,14 @@ def send_update_notification(object, receiving_user, old=None, reply=True, sende
msg['From'] = Header(utils.str2unicode('%s' % orgname) if orgname else '')
msg['From'].append("<%s>" % orgemail)
+ seed = random.randint(0, 6)
+ alarm_after = (seed * 10) + 60
+ log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8)
+ signal.alarm(alarm_after)
+
result = modules._sendmail(orgemail, receiving_user['mail'], msg.as_string())
log.debug(_("Sent update notification to %r: %r") % (receiving_user['mail'], result), level=8)
+ signal.alarm(0)
def send_cancel_notification(object, receiving_user, deleted=False, sender=None, comment=None):
"""
@@ -1354,8 +1361,14 @@ def send_cancel_notification(object, receiving_user, deleted=False, sender=None,
msg['From'] = Header(utils.str2unicode('%s' % orgname) if orgname else '')
msg['From'].append("<%s>" % orgemail)
+ seed = random.randint(0, 6)
+ alarm_after = (seed * 10) + 60
+ log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8)
+ signal.alarm(alarm_after)
+
result = modules._sendmail(orgemail, receiving_user['mail'], msg.as_string())
log.debug(_("Sent cancel notification to %r: %r") % (receiving_user['mail'], result), level=8)
+ signal.alarm(0)
def is_auto_reply(user, sender_email, type):
accept_available = False
diff --git a/wallace/module_resources.py b/wallace/module_resources.py
index 962e11a..80c85f1 100644
--- a/wallace/module_resources.py
+++ b/wallace/module_resources.py
@@ -22,6 +22,7 @@ import icalendar
import os
import pytz
import random
+import signal
import tempfile
import time
from urlparse import urlparse
@@ -1355,8 +1356,14 @@ def send_owner_notification(resource, owner, itip_event, success=True):
resource['cn'], participant_status_label(status) if success else _('failed')
))
+ seed = random.randint(0, 6)
+ alarm_after = (seed * 10) + 60
+ log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8)
+ signal.alarm(alarm_after)
+
result = modules._sendmail(resource['mail'], owner['mail'], msg.as_string())
log.debug(_("Owner notification was sent successfully: %r") % result, level=8)
+ signal.alarm(0)
def owner_notification_text(resource, owner, event, success):
organizer = event.get_organizer()