pykolab/itip tests/functional tests/unit wallace/module_invitationpolicy.py

Thomas Brüderli bruederli at kolabsys.com
Mon Jul 21 15:07:41 CEST 2014


 pykolab/itip/__init__.py                                   |    2 
 tests/functional/test_wallace/test_007_invitationpolicy.py |    8 -
 tests/unit/test-012-wallace_invitationpolicy.py            |   28 ++-
 wallace/module_invitationpolicy.py                         |  104 +++++++++++--
 4 files changed, 118 insertions(+), 24 deletions(-)

New commits:
commit 3e492389ad7f2d628134b3471bc2e4e054b7d204
Author: Thomas Bruederli <bruederli at kolabsys.com>
Date:   Wed Jul 9 16:52:32 2014 -0400

    Send iTip replies through wallace again; use a locking mechanism to sequencially process partstat updates from (automated) replies

diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py
index 43646df..8cf6435 100644
--- a/pykolab/itip/__init__.py
+++ b/pykolab/itip/__init__.py
@@ -224,7 +224,7 @@ def send_reply(from_address, itip_events, response_text, subject=None):
             log.error(_("Failed to compose iTip reply message: %r") % (e))
             return
 
-        smtp = smtplib.SMTP("localhost", 10027)
+        smtp = smtplib.SMTP("localhost", 10026)  # replies go through wallace again
 
         if conf.debuglevel > 8:
             smtp.set_debuglevel(True)
diff --git a/tests/functional/test_wallace/test_007_invitationpolicy.py b/tests/functional/test_wallace/test_007_invitationpolicy.py
index 2b669ff..b12b785 100644
--- a/tests/functional/test_wallace/test_007_invitationpolicy.py
+++ b/tests/functional/test_wallace/test_007_invitationpolicy.py
@@ -623,13 +623,7 @@ class TestWallaceInvitationpolicy(unittest.TestCase):
         self.send_itip_invitation(self.jane['mail'], start, template=event_itip)
         self.send_itip_invitation(self.jack['mail'], start, template=event_itip)
 
-        # send replies from jack and jane
-        # FIXME: replies should not be necessary if auto-replies get through wallace as well
-        self.send_itip_reply(uid, self.jane['mail'], self.john['mail'], start=start, partstat='ACCEPTED')
-        time.sleep(10)  # FIXME: implement locking in wallace
-        self.send_itip_reply(uid, self.jack['mail'], self.john['mail'], start=start, partstat='TENTATIVE')
-
-        # wait for replies to be processed and propagated
+        # wait for replies from jack and jane to be processed and propagated
         time.sleep(10)
         event = self.check_user_calendar_event(self.john['kolabtargetfolder'], uid)
         self.assertIsInstance(event, pykolab.xml.Event)
diff --git a/tests/unit/test-012-wallace_invitationpolicy.py b/tests/unit/test-012-wallace_invitationpolicy.py
index 650879b..0b64f6a 100644
--- a/tests/unit/test-012-wallace_invitationpolicy.py
+++ b/tests/unit/test-012-wallace_invitationpolicy.py
@@ -1,6 +1,7 @@
+import os
 import pykolab
 import logging
-import datetime
+import time
 
 from icalendar import Calendar
 from email import message
@@ -72,11 +73,10 @@ class TestWallaceInvitationpolicy(unittest.TestCase):
     def setUp(self):
         # monkey-patch the pykolab.auth module to check API calls
         # without actually connecting to LDAP
-        #self.patch(pykolab.auth.Auth, "connect", self._mock_nop)
-        #self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop)
-        #self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn)
-        #self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes)
-        #self.patch(pykolab.auth.Auth, "search_entry_by_attribute", self._mock_search_entry_by_attribute)
+        self.patch(pykolab.auth.Auth, "connect", self._mock_nop)
+        self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop)
+        self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn)
+        self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes)
 
         # intercept calls to smtplib.SMTP.sendmail()
         import smtplib
@@ -127,3 +127,19 @@ class TestWallaceInvitationpolicy(unittest.TestCase):
 
         user = { 'kolabinvitationpolicy': ['ACT_ACCEPT:example.org', 'ACT_MANUAL:others'] }
         self.assertEqual(MIP.get_matching_invitation_policies(user, 'somedomain.net'), [MIP.ACT_MANUAL])
+
+    def test_004_write_locks(self):
+        user = { 'cn': 'John Doe', 'mail': "doe at example.org" }
+
+        lock_key = MIP.get_lock_key(user, '1234567890-abcdef')
+        lock_file = os.path.join(MIP.mybasepath, 'locks', lock_key + '.lock')
+        MIP.set_write_lock(lock_key)
+
+        time.sleep(1)
+        self.assertTrue(os.path.isfile(lock_file))
+        self.assertFalse(MIP.set_write_lock(lock_key, False))
+
+        MIP.remove_write_lock(lock_key)
+        self.assertFalse(os.path.isfile(lock_file))
+
+        
\ No newline at end of file
diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py
index ec3ad44..b7f59de 100644
--- a/wallace/module_invitationpolicy.py
+++ b/wallace/module_invitationpolicy.py
@@ -23,6 +23,7 @@ import tempfile
 import time
 from urlparse import urlparse
 import urllib
+import md5
 
 from email import message_from_string
 from email.parser import Parser
@@ -92,6 +93,7 @@ mybasepath = '/var/spool/pykolab/wallace/invitationpolicy/'
 
 auth = None
 imap = None
+write_locks = []
 
 def __init__():
     modules.register('invitationpolicy', execute, description=description())
@@ -123,7 +125,7 @@ def description():
     return """Invitation policy execution module."""
 
 def cleanup():
-    global auth, imap
+    global auth, imap, write_locks
 
     log.debug("cleanup(): %r, %r" % (auth, imap), level=9)
 
@@ -134,13 +136,17 @@ def cleanup():
     imap.disconnect()
     del imap
 
+    # remove remaining write locks
+    for key in write_locks:
+        remove_write_lock(key, False)
+
 def execute(*args, **kw):
     global auth, imap
 
     if not os.path.isdir(mybasepath):
         os.makedirs(mybasepath)
 
-    for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER' ]:
+    for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER', 'locks']:
         if not os.path.isdir(os.path.join(mybasepath, stage)):
             os.makedirs(os.path.join(mybasepath, stage))
 
@@ -149,9 +155,14 @@ def execute(*args, **kw):
     auth = Auth()
     imap = IMAP()
 
-    # TODO: Test for correct call.
     filepath = args[0]
 
+    # ignore calls on lock files
+    if '/locks/' in filepath or kw.has_key('stage') and kw['stage'] == 'locks':
+        return False
+
+    log.debug("Invitation policy executing for %r, %r" % (filepath, '/locks/' in filepath), level=8)
+
     if kw.has_key('stage'):
         log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8)
 
@@ -276,6 +287,9 @@ def execute(*args, **kw):
             if done is not None:
                 break
 
+            # remove possible write lock from this iteration
+            remove_write_lock(get_lock_key(receiving_user, itip_event['uid']))
+
     else:
         log.debug(_("Ignoring '%s' iTip method") % (itip_event['method']), level=8)
 
@@ -316,7 +330,7 @@ def process_itip_request(itip_event, policy, recipient_email, sender_email, rece
     condition_fulfilled = True
 
     # find existing event in user's calendar
-    existing = find_existing_event(itip_event['uid'], receiving_user)
+    existing = find_existing_event(itip_event['uid'], receiving_user, True)
 
     # compare sequence number to determine a (re-)scheduling request
     if existing is not None:
@@ -407,7 +421,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
 
         # find existing event in user's calendar
         # TODO: set/check lock to avoid concurrent wallace processes trying to update the same event simultaneously
-        existing = find_existing_event(itip_event['uid'], receiving_user)
+        existing = find_existing_event(itip_event['uid'], receiving_user, True)
 
         if existing:
             # compare sequence number to avoid outdated replies?
@@ -415,6 +429,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
                 log.info(_("The iTip reply sequence (%r) doesn't match the referred event version (%r). Forwarding to Inbox.") % (
                     itip_event['sequence'], existing.get_sequence()
                 ))
+                remove_write_lock(existing._lock_key)
                 return MESSAGE_FORWARD
 
             log.debug(_("Auto-updating event %r on iTip REPLY") % (existing.uid), level=8)
@@ -424,6 +439,7 @@ def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiv
                 log.error("Could not find corresponding attende in organizer's event: %r" % (e))
 
                 # TODO: accept new participant if ACT_ACCEPT ?
+                remove_write_lock(existing._lock_key)
                 return MESSAGE_FORWARD
 
             # update the organizer's copy of the event
@@ -457,7 +473,7 @@ def process_itip_cancel(itip_event, policy, recipient_email, sender_email, recei
     # auto-update the local copy with STATUS=CANCELLED
     if policy & ACT_UPDATE:
         # find existing event in user's calendar
-        existing = find_existing_event(itip_event['uid'], receiving_user)
+        existing = find_existing_event(itip_event['uid'], receiving_user, True)
 
         if existing:
             existing.set_status('CANCELLED')
@@ -615,12 +631,18 @@ def list_user_calendars(user_rec):
     return calendars
 
 
-def find_existing_event(uid, user_rec):
+def find_existing_event(uid, user_rec, lock=False):
     """
         Search user's calendar folders for the given event (by UID)
     """
     global imap
 
+    lock_key = None
+
+    if lock:
+        lock_key = get_lock_key(user_rec, uid)
+        set_write_lock(lock_key)
+
     event = None
     for folder in list_user_calendars(user_rec):
         log.debug(_("Searching folder %r for event %r") % (folder, uid), level=8)
@@ -633,6 +655,7 @@ def find_existing_event(uid, user_rec):
             try:
                 event = event_from_message(message_from_string(data[0][1]))
                 setattr(event, '_imap_folder', folder)
+                setattr(event, '_lock_key', lock_key)
             except Exception, e:
                 log.error(_("Failed to parse event from message %s/%s: %r") % (folder, num, e))
                 continue
@@ -640,6 +663,9 @@ def find_existing_event(uid, user_rec):
             if event and event.uid == uid:
                 return event
 
+    if lock_key is not None:
+        remove_write_lock(lock_key)
+
     return event
 
 
@@ -691,15 +717,73 @@ def check_availability(itip_event, receiving_user):
     return not conflict
 
 
+def set_write_lock(key, wait=True):
+    """
+        Set a write-lock for the given key and wait if such a lock already exists
+    """
+    if not os.path.isdir(mybasepath):
+        os.makedirs(mybasepath)
+    if not os.path.isdir(os.path.join(mybasepath, 'locks')):
+        os.makedirs(os.path.join(mybasepath, 'locks'))
+
+    file = os.path.join(mybasepath, 'locks', key + '.lock')
+    locked = os.path.getmtime(file) if os.path.isfile(file) else 0
+    expired = time.time() - 300
+
+    # wait if file lock is in place
+    while locked and locked > expired:
+        if not wait:
+            return False
+
+        log.debug(_("%r is locked, waiting...") % (key), level=9)
+        time.sleep(0.5)
+        locked = os.path.getmtime(file) if os.path.isfile(file) else 0
+
+    # touch the file
+    if os.path.isfile(file):
+        os.utime(file, None)
+    else:
+        open(file, 'w').close()
+
+    # register active lock
+    write_locks.append(key)
+
+    return True
+
+
+def remove_write_lock(key, update=True):
+    """
+        Remove the lock file for the given key
+    """
+    global write_locks
+
+    if key is not None:
+        file = os.path.join(mybasepath, 'locks', key + '.lock')
+        if os.path.isfile(file):
+            os.remove(file)
+            if update:
+                write_locks = [k for k in write_locks if not k == key]
+
+
+def get_lock_key(user, uid):
+    return md5.new("%s/%s" % (user['mail'], uid)).hexdigest()
+
+
 def update_event(event, user_rec):
     """
         Update the given event in IMAP (i.e. delete + append)
     """
+    success = False
+
     if hasattr(event, '_imap_folder'):
         delete_event(event)
-        return store_event(event, user_rec, event._imap_folder)
+        success = store_event(event, user_rec, event._imap_folder)
 
-    return False
+        # remove write lock for this event
+        if hasattr(event, '_lock_key') and event._lock_key is not None:
+            remove_write_lock(event._lock_key)
+
+    return success
 
 
 def store_event(event, user_rec, targetfolder=None):
@@ -829,7 +913,7 @@ def propagate_changes_to_attendees_calendars(event):
             log.debug(_("Update attendee copy of %r") % (attendee_user_dn), level=9)
 
             attendee_user = auth.get_entry_attributes(None, attendee_user_dn, ['*'])
-            attendee_event = find_existing_event(event.uid, attendee_user)  # does IMAP authenticate
+            attendee_event = find_existing_event(event.uid, attendee_user, True)  # does IMAP authenticate
             if attendee_event:
                 attendee_event.event.setAttendees(event.get_attendees())
                 success = update_event(attendee_event, attendee_user)




More information about the commits mailing list