wallace/__init__.py wallace/module_resources.py wallace/modules.py

Thomas Brüderli bruederli at kolabsys.com
Wed Oct 29 21:11:00 CET 2014


 wallace/__init__.py         |   27 +++++++++++
 wallace/module_resources.py |  104 +++++++++++++++++++++++++++++++++++++++++++-
 wallace/modules.py          |   11 ++++
 3 files changed, 139 insertions(+), 3 deletions(-)

New commits:
commit 1ad076886f2ca1f61438005c1547a7229c0ef287
Author: Thomas Bruederli <bruederli at kolabsys.com>
Date:   Thu Oct 23 17:20:48 2014 -0400

    Run archival jobs in another Wallace child process (#3843)

diff --git a/wallace/__init__.py b/wallace/__init__.py
index 6698666..8d43be2 100644
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -87,6 +87,23 @@ def pickup_message(filepath, *args, **kw):
     if continue_with_accept:
         cb_action_ACCEPT('wallace', filepath)
 
+def modules_heartbeat(wallace_modules):
+    lastrun = 0
+
+    while True:
+        try:
+            for module in wallace_modules:
+                try:
+                    modules.heartbeat(module, lastrun)
+                except:
+                    log.error(_("Module %s.heartbeat() failed with error: %s" % (module, traceback.format_exc())))
+
+            lastrun = int(time.time())
+            time.sleep(60)
+        except (SystemExit, KeyboardInterrupt), e:
+            log.info("Terminating heartbeat process")
+            break
+
 def worker_process(*args, **kw):
     log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
 
@@ -255,6 +272,11 @@ class WallaceDaemon(object):
                 self.pool.apply_async(pickup_message, (filepath, (self.modules)))
                 self.current_connections -= 1
 
+        # start background process to run periodic jobs in active modules
+        self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
+        self.heartbeat.daemon = True
+        self.heartbeat.start()
+
         try:
             while 1:
                 while self.current_connections >= self.max_connections:
@@ -273,6 +295,9 @@ class WallaceDaemon(object):
             s.shutdown(1)
             s.close()
 
+        # shut down hearbeat process
+        self.heartbeat.terminate()
+
     def data_header(self, mailfrom, rcpttos):
         COMMASPACE = ', '
         return "X-Kolab-From: " + mailfrom + "\r\n" + \
@@ -305,6 +330,8 @@ class WallaceDaemon(object):
         pass
 
     def remove_pid(self, *args, **kw):
+        if hasattr(self, 'heartbeat'):
+            self.heartbeat.terminate()
         if os.access(conf.pidfile, os.R_OK):
             os.remove(conf.pidfile)
         raise SystemExit
diff --git a/wallace/module_resources.py b/wallace/module_resources.py
index 3d16844..9376749 100644
--- a/wallace/module_resources.py
+++ b/wallace/module_resources.py
@@ -25,6 +25,7 @@ import random
 import tempfile
 import time
 from urlparse import urlparse
+from dateutil.tz import tzlocal
 import base64
 import uuid
 import re
@@ -70,7 +71,7 @@ auth = None
 imap = None
 
 def __init__():
-    modules.register('resources', execute, description=description())
+    modules.register('resources', execute, description=description(), heartbeat=heartbeat)
 
 def accept(filepath):
     new_filepath = os.path.join(
@@ -391,6 +392,105 @@ def execute(*args, **kw):
     os.unlink(filepath)
 
 
+def heartbeat(lastrun):
+    global imap
+
+    # run archival job every hour only
+    now = int(time.time())
+    if lastrun == 0 or now - heartbeat._lastrun < 3600:
+        return
+
+    log.debug(_("module_resources.heartbeat(%d)") % (heartbeat._lastrun), level=8)
+
+    # get a list of resource records from LDAP
+    auth = Auth()
+    auth.connect()
+
+    resource_dns = auth.find_resource('*')
+
+    # filter by resource_base_dn
+    resource_base_dn = conf.get('ldap', 'resource_base_dn', None)
+    if resource_base_dn is not None:
+        resource_dns = [dn for dn in resource_dns if resource_base_dn in dn]
+
+    if len(resource_dns) > 0:
+        imap = IMAP()
+        imap.connect()
+
+        for resource_dn in resource_dns:
+            resource_attrs = auth.get_entry_attributes(None, resource_dn, ['kolabtargetfolder'])
+            if resource_attrs.has_key('kolabtargetfolder'):
+                try:
+                    expunge_resource_calendar(resource_attrs['kolabtargetfolder'])
+                except Exception, e:
+                    log.error(_("Expunge resource calendar for %s (%s) failed: %r") % (
+                        resource_dn, resource_attrs['kolabtargetfolder'], e
+                    ))
+
+        imap.disconnect()
+
+    auth.disconnect()
+
+    heartbeat._lastrun = now
+
+heartbeat._lastrun = 0
+
+
+def expunge_resource_calendar(mailbox):
+    """
+        Cleanup routine to remove events older than 100 days from the given resource calendar
+    """
+    global imap
+
+    log.debug(
+        _("Expunge events in resource folder %r") % (mailbox),
+        level=8
+    )
+
+    now = datetime.datetime.now(tzlocal())
+    expire_date = now - datetime.timedelta(days=100)
+
+    # might raise an exception, let that bubble
+    targetfolder = imap.folder_quote(mailbox)
+    imap.set_acl(targetfolder, conf.get(conf.get('kolab', 'imap_backend'), 'admin_login'), "lrswipkxtecda")
+    imap.imap.m.select(targetfolder)
+
+    typ, data = imap.imap.m.search(None, 'UNDELETED')
+
+    for num in data[0].split():
+        log.debug(
+            _("Fetching message ID %r from folder %r") % (num, mailbox),
+            level=9
+        )
+
+        typ, data = imap.imap.m.fetch(num, '(RFC822)')
+
+        event_message = message_from_string(data[0][1])
+
+        try:
+            event = event_from_message(message_from_string(data[0][1]))
+        except Exception, e:
+            log.error(_("Failed to parse event from message %s/%s: %r") % (mailbox, num, e))
+            continue
+
+        if event:
+            dt_end = to_dt(event.get_end())
+
+            # consider recurring events and get real end date
+            if event.is_recurring():
+                dt_end = event.get_last_occurrence()
+                if dt_end is None:
+                    # skip if recurring forever
+                    continue
+
+            if dt_end and dt_end < expire_date:
+                age = now - dt_end
+                log.debug(_("Flag event %s from message %s/%s as deleted (age = %d days)") % (event.uid, mailbox, num, age.days), level=8)
+                imap.imap.m.store(num, '+FLAGS', '\\Deleted')
+
+    imap.imap.m.expunge()
+
+
 def check_availability(itip_events, resource_dns, resources, receiving_attendee=None):
     """
         For each resource, determine if any of the events in question are in conflict.
@@ -540,7 +640,7 @@ def read_resource_calendar(resource_rec, itip_events):
 
     # might raise an exception, let that bubble
     imap.imap.m.select(imap.folder_quote(mailbox))
-    typ, data = imap.imap.m.search(None, 'ALL')
+    typ, data = imap.imap.m.search(None, 'UNDELETED')
 
     num_messages = len(data[0].split())
 
diff --git a/wallace/modules.py b/wallace/modules.py
index d4432d0..632a6fc 100644
--- a/wallace/modules.py
+++ b/wallace/modules.py
@@ -115,6 +115,13 @@ def execute(name, *args, **kw):
 
     return modules[name]['function'](*args, **kw)
 
+def heartbeat(name, *args, **kw):
+    if not modules.has_key(name):
+        log.warning(_("No such module %r in modules %r (1).") % (name, modules))
+
+    if modules[name].has_key('heartbeat'):
+        return modules[name]['heartbeat'](*args, **kw)
+
 def cb_action_HOLD(module, filepath):
     log.info(_("Holding message in queue for manual review (%s by %s)") % (filepath, module))
 
@@ -334,7 +341,7 @@ def register_group(dirname, module):
 
                 exec("%s_%s_register()" % (module,name))
 
-def register(name, func, group=None, description=None, aliases=[]):
+def register(name, func, group=None, description=None, aliases=[], heartbeat=None):
     if not group == None:
         module = "%s_%s" % (group,name)
     else:
@@ -369,3 +376,5 @@ def register(name, func, group=None, description=None, aliases=[]):
                     'description': _("Alias for %s") % (name)
                 }
 
+        if callable(heartbeat):
+            modules[module]['heartbeat'] = heartbeat




More information about the commits mailing list