wallace/__init__.py wallace/module_resources.py wallace/modules.py
Thomas Brüderli
bruederli at kolabsys.com
Wed Oct 29 21:11:00 CET 2014
wallace/__init__.py | 27 +++++++++++
wallace/module_resources.py | 104 +++++++++++++++++++++++++++++++++++++++++++-
wallace/modules.py | 11 ++++
3 files changed, 139 insertions(+), 3 deletions(-)
New commits:
commit 1ad076886f2ca1f61438005c1547a7229c0ef287
Author: Thomas Bruederli <bruederli at kolabsys.com>
Date: Thu Oct 23 17:20:48 2014 -0400
Run archival jobs in another Wallace child process (#3843)
diff --git a/wallace/__init__.py b/wallace/__init__.py
index 6698666..8d43be2 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -87,6 +87,23 @@ def pickup_message(filepath, *args, **kw):
if continue_with_accept:
cb_action_ACCEPT('wallace', filepath)
+def modules_heartbeat(wallace_modules):
+ lastrun = 0
+
+ while True:
+ try:
+ for module in wallace_modules:
+ try:
+ modules.heartbeat(module, lastrun)
+ except:
+ log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc())))
+
+ lastrun = int(time.time())
+ time.sleep(60)
+ except (SystemExit, KeyboardInterrupt), e:
+ log.info("Terminating heartbeat process")
+ break
+
def worker_process(*args, **kw):
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
@@ -255,6 +272,11 @@ class WallaceDaemon(object):
self.pool.apply_async(pickup_message, (filepath, (self.modules)))
self.current_connections -= 1
+ # start background process to run periodic jobs in active modules
+ self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
+ self.heartbeat.daemon = True
+ self.heartbeat.start()
+
try:
while 1:
while self.current_connections >= self.max_connections:
@@ -273,6 +295,9 @@ class WallaceDaemon(object):
s.shutdown(1)
s.close()
+ # shut down hearbeat process
+ self.heartbeat.terminate()
+
def data_header(self, mailfrom, rcpttos):
COMMASPACE = ', '
return "X-Kolab-From: " + mailfrom + "\r\n" + \
@@ -305,6 +330,8 @@ class WallaceDaemon(object):
pass
def remove_pid(self, *args, **kw):
+ if hasattr(self, 'heartbeat'):
+ self.heartbeat.terminate()
if os.access(conf.pidfile, os.R_OK):
os.remove(conf.pidfile)
raise SystemExit
diff --git a/wallace/module_resources.py b/wallace/module_resources.py
index 3d16844..9376749 100644
--- a/wallace/module_resources.py
+++ b/wallace/module_resources.py
@@ -25,6 +25,7 @@ import random
import tempfile
import time
from urlparse import urlparse
+from dateutil.tz import tzlocal
import base64
import uuid
import re
@@ -70,7 +71,7 @@ auth = None
imap = None
def __init__():
- modules.register('resources', execute, description=description())
+ modules.register('resources', execute, description=description(), heartbeat=heartbeat)
def accept(filepath):
new_filepath = os.path.join(
@@ -391,6 +392,105 @@ def execute(*args, **kw):
os.unlink(filepath)
+def heartbeat(lastrun):
+ global imap
+
+ # run archival job every hour only
+ now = int(time.time())
+ if lastrun == 0 or now - heartbeat._lastrun < 3600:
+ return
+
+ log.debug(_("module_resources.heartbeat(%d)") % (heartbeat._lastrun), level=8)
+
+ # get a list of resource records from LDAP
+ auth = Auth()
+ auth.connect()
+
+ resource_dns = auth.find_resource('*')
+
+ # filter by resource_base_dn
+ resource_base_dn = conf.get('ldap', 'resource_base_dn', None)
+ if resource_base_dn is not None:
+ resource_dns = [dn for dn in resource_dns if resource_base_dn in dn]
+
+ if len(resource_dns) > 0:
+ imap = IMAP()
+ imap.connect()
+
+ for resource_dn in resource_dns:
+ resource_attrs = auth.get_entry_attributes(None, resource_dn, ['kolabtargetfolder'])
+ if resource_attrs.has_key('kolabtargetfolder'):
+ try:
+ expunge_resource_calendar(resource_attrs['kolabtargetfolder'])
+ except Exception, e:
+ log.error(_("Expunge resource calendar for %s (%s) failed: %r") % (
+ resource_dn, resource_attrs['kolabtargetfolder'], e
+ ))
+
+ imap.disconnect()
+
+ auth.disconnect()
+
+ heartbeat._lastrun = now
+
+heartbeat._lastrun = 0
+
+
+def expunge_resource_calendar(mailbox):
+ """
+ Cleanup routine to remove events older than 100 days from the given resource calendar
+ """
+ global imap
+
+ log.debug(
+ _("Expunge events in resource folder %r") % (mailbox),
+ level=8
+ )
+
+ now = datetime.datetime.now(tzlocal())
+ expire_date = now - datetime.timedelta(days=100)
+
+ # might raise an exception, let that bubble
+ targetfolder = imap.folder_quote(mailbox)
+ imap.set_acl(targetfolder, conf.get(conf.get('kolab', 'imap_backend'), 'admin_login'), "lrswipkxtecda")
+ imap.imap.m.select(targetfolder)
+
+ typ, data = imap.imap.m.search(None, 'UNDELETED')
+
+ for num in data[0].split():
+ log.debug(
+ _("Fetching message ID %r from folder %r") % (num, mailbox),
+ level=9
+ )
+
+ typ, data = imap.imap.m.fetch(num, '(RFC822)')
+
+ event_message = message_from_string(data[0][1])
+
+ try:
+ event = event_from_message(message_from_string(data[0][1]))
+ except Exception, e:
+ log.error(_("Failed to parse event from message %s/%s: %r") % (mailbox, num, e))
+ continue
+
+ if event:
+ dt_end = to_dt(event.get_end())
+
+ # consider recurring events and get real end date
+ if event.is_recurring():
+ dt_end = event.get_last_occurrence()
+ if dt_end is None:
+ # skip if recurring forever
+ continue
+
+ if dt_end and dt_end < expire_date:
+ age = now - dt_end
+ log.debug(_("Flag event %s from message %s/%s as deleted (age = %d days)") % (event.uid, mailbox, num, age.days), level=8)
+ imap.imap.m.store(num, '+FLAGS', '\\Deleted')
+
+ imap.imap.m.expunge()
+
+
def check_availability(itip_events, resource_dns, resources, receiving_attendee=None):
"""
For each resource, determine if any of the events in question are in conflict.
@@ -540,7 +640,7 @@ def read_resource_calendar(resource_rec, itip_events):
# might raise an exception, let that bubble
imap.imap.m.select(imap.folder_quote(mailbox))
- typ, data = imap.imap.m.search(None, 'ALL')
+ typ, data = imap.imap.m.search(None, 'UNDELETED')
num_messages = len(data[0].split())
diff --git a/wallace/modules.py b/wallace/modules.py
index d4432d0..632a6fc 100644
--- a/wallace/modules.py
+++ b/wallace/modules.py
@@ -115,6 +115,13 @@ def execute(name, *args, **kw):
return modules[name]['function'](*args, **kw)
+def heartbeat(name, *args, **kw):
+ if not modules.has_key(name):
+ log.warning(_("No such module %r in modules %r (1).") % (name, modules))
+
+ if modules[name].has_key('heartbeat'):
+ return modules[name]['heartbeat'](*args, **kw)
+
def cb_action_HOLD(module, filepath):
log.info(_("Holding message in queue for manual review (%s by %s)") % (filepath, module))
@@ -334,7 +341,7 @@ def register_group(dirname, module):
exec("%s_%s_register()" % (module,name))
-def register(name, func, group=None, description=None, aliases=[]):
+def register(name, func, group=None, description=None, aliases=[], heartbeat=None):
if not group == None:
module = "%s_%s" % (group,name)
else:
@@ -369,3 +376,5 @@ def register(name, func, group=None, description=None, aliases=[]):
'description': _("Alias for %s") % (name)
}
+ if callable(heartbeat):
+ modules[module]['heartbeat'] = heartbeat
More information about the commits
mailing list