conf/kolab.conf tests/functional tests/unit wallace/module_invitationpolicy.py
Thomas Brüderli
bruederli at kolabsys.com
Wed Jul 16 17:23:26 CEST 2014
conf/kolab.conf | 5
tests/functional/test_wallace/test_007_invitationpolicy.py | 449 ++++++++
tests/functional/user_add.py | 4
tests/unit/test-012-wallace_invitationpolicy.py | 129 ++
wallace/module_invitationpolicy.py | 721 +++++++++++++
5 files changed, 1306 insertions(+), 2 deletions(-)
New commits:
commit ce4be6aec8a5112ead076f7a2c6a8ad7eeb403e6
Author: Thomas Bruederli <bruederli at kolabsys.com>
Date: Sun Jul 6 22:14:32 2014 -0400
Start implementing a new wallace module 'invitationpolicy' to automatically process iTip messages according to per-user policies
diff --git a/conf/kolab.conf b/conf/kolab.conf
index 2f8ea2b..cb3a7ba 100644
--- a/conf/kolab.conf
+++ b/conf/kolab.conf
@@ -361,10 +361,13 @@ admin_password = Welcome123
result_attribute = mail
[wallace]
-modules = resources, footer
+modules = resources, invitationpolicy, footer
footer_text = /etc/kolab/footer.text
footer_html = /etc/kolab/footer.html
+; default settings for kolabInvitationPolicy
+kolab_invitation_policy = ACT_ACCEPT_IF_NO_CONFLICT:example.org, ACT_MANUAL
+
; This is a domain name space specific section, that enables us to override
; all settings, for example, the LDAP URI, base and bind DNs, scopes, filters,
; etc. Note that overriding the LDAP settings for the primary domain name space
diff --git a/tests/functional/test_wallace/test_007_invitationpolicy.py b/tests/functional/test_wallace/test_007_invitationpolicy.py
new file mode 100644
index 0000000..0490ec1
--- /dev/null
+++ b/tests/functional/test_wallace/test_007_invitationpolicy.py
@@ -0,0 +1,449 @@
+import time
+import pykolab
+import smtplib
+import email
+import datetime
+import pytz
+import uuid
+import kolabformat
+
+from pykolab.imap import IMAP
+from wallace import module_resources
+
+from email import message_from_string
+from twisted.trial import unittest
+
+import tests.functional.resource_func as funcs
+
+conf = pykolab.getConf()
+
+itip_invitation = """
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN
+CALSCALE:GREGORIAN
+METHOD:REQUEST
+BEGIN:VEVENT
+UID:%(uid)s
+DTSTAMP:20140213T1254140
+DTSTART;TZID=Europe/Berlin:%(start)s
+DTEND;TZID=Europe/Berlin:%(end)s
+SUMMARY:%(summary)s
+DESCRIPTION:test
+ORGANIZER;CN="Doe, John":mailto:john.doe at example.org
+ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=%(partstat)s;RSVP=TRUE:mailto:%(mailto)s
+ATTENDEE;ROLE=OPT-PARTICIPANT;PARTSTAT=TENTATIVE;RSVP=FALSE:mailto:somebody at else.com
+TRANSP:OPAQUE
+SEQUENCE:%(sequence)d
+END:VEVENT
+END:VCALENDAR
+"""
+
+itip_cancellation = """
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN
+CALSCALE:GREGORIAN
+METHOD:CANCEL
+BEGIN:VEVENT
+UID:%(uid)s
+DTSTAMP:20140218T1254140
+DTSTART;TZID=Europe/Berlin:20120713T100000
+DTEND;TZID=Europe/Berlin:20120713T110000
+SUMMARY:%(summary)s
+DESCRIPTION:test
+ORGANIZER;CN="Doe, John":mailto:john.doe at example.org
+ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=ACCEPTED;RSVP=TRUE:mailto:%(mailto)s
+TRANSP:OPAQUE
+SEQUENCE:%(sequence)d
+END:VEVENT
+END:VCALENDAR
+"""
+
+itip_recurring = """
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//Apple Inc.//Mac OS X 10.9.2//EN
+CALSCALE:GREGORIAN
+METHOD:REQUEST
+BEGIN:VEVENT
+UID:%(uid)s
+DTSTAMP:20140213T1254140
+DTSTART;TZID=Europe/Zurich:%(start)s
+DTEND;TZID=Europe/Zurich:%(end)s
+RRULE:FREQ=WEEKLY;INTERVAL=1;COUNT=10
+SUMMARY:%(summary)s
+DESCRIPTION:test
+ORGANIZER;CN="Doe, John":mailto:john.doe at example.org
+ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=%(partstat)s;RSVP=TRUE:mailto:%(mailto)s
+TRANSP:OPAQUE
+SEQUENCE:%(sequence)d
+END:VEVENT
+END:VCALENDAR
+"""
+
+itip_reply = """
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//pykolab-0.6.9-1//kolab.org//
+CALSCALE:GREGORIAN
+METHOD:REPLY
+BEGIN:VEVENT
+SUMMARY:%(summary)s
+UID:%(uid)s
+DTSTART;TZID=Europe/Berlin;VALUE=DATE-TIME:%(start)s
+DTEND;TZID=Europe/Berlin;VALUE=DATE-TIME:%(end)s
+DTSTAMP;VALUE=DATE-TIME:20140706T171038Z
+ORGANIZER;CN="Doe, John":MAILTO:%(organizer)s
+ATTENDEE;CUTYPE=INDIVIDUAL;PARTSTAT=%(partstat)s;ROLE=REQ-PARTICIPANT:mailto:%(mailto)s
+PRIORITY:0
+SEQUENCE:%(sequence)d
+END:VEVENT
+END:VCALENDAR
+"""
+
+mime_message = """MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="=_c8894dbdb8baeedacae836230e3436fd"
+From: "Doe, John" <john.doe at example.org>
+Date: Tue, 25 Feb 2014 13:54:14 +0100
+Message-ID: <240fe7ae7e139129e9eb95213c1016d7 at example.org>
+To: %s
+Subject: "test"
+
+--=_c8894dbdb8baeedacae836230e3436fd
+Content-Type: text/plain; charset=UTF-8; format=flowed
+Content-Transfer-Encoding: quoted-printable
+
+*test*
+
+--=_c8894dbdb8baeedacae836230e3436fd
+Content-Type: text/calendar; charset=UTF-8; method=%s; name=event.ics
+Content-Disposition: attachment; filename=event.ics
+Content-Transfer-Encoding: 8bit
+
+%s
+--=_c8894dbdb8baeedacae836230e3436fd--
+"""
+
+class TestWallaceInvitationpolicy(unittest.TestCase):
+
+ john = None
+
+ @classmethod
+ def setUp(self):
+ """ Compatibility for twisted.trial.unittest
+ """
+ if not self.john:
+ self.setup_class()
+
+ @classmethod
+ def setup_class(self, *args, **kw):
+ from tests.functional.purge_users import purge_users
+ purge_users()
+
+ self.john = {
+ 'displayname': 'John Doe',
+ 'mail': 'john.doe at example.org',
+ 'sender': 'John Doe <john.doe at example.org>',
+ 'dn': 'uid=doe,ou=People,dc=example,dc=org',
+ 'mailbox': 'user/john.doe at example.org',
+ 'kolabtargetfolder': 'user/john.doe/Calendar at example.org',
+ 'kolabinvitationpolicy': ['ACT_UPDATE', 'ACT_MANUAL']
+ }
+
+ self.jane = {
+ 'displayname': 'Jane Manager',
+ 'mail': 'jane.manager at example.org',
+ 'sender': 'Jane Manager <jane.manager at example.org>',
+ 'dn': 'uid=manager,ou=People,dc=example,dc=org',
+ 'mailbox': 'user/jane.manager at example.org',
+ 'kolabtargetfolder': 'user/jane.manager/Calendar at example.org',
+ 'kolabinvitationpolicy': ['ACT_ACCEPT_IF_NO_CONFLICT','ACT_REJECT_IF_CONFLICT']
+ }
+
+ from tests.functional.user_add import user_add
+ user_add("John", "Doe", kolabinvitationpolicy=self.john['kolabinvitationpolicy'])
+ user_add("Jane", "Manager", kolabinvitationpolicy=self.jane['kolabinvitationpolicy'])
+
+ time.sleep(1)
+ from tests.functional.synchronize import synchronize_once
+ synchronize_once()
+
+ def send_message(self, itip_payload, to_addr, from_addr=None, method="REQUEST"):
+ if from_addr is None:
+ from_addr = self.john['mail']
+
+ smtp = smtplib.SMTP('localhost', 10026)
+ smtp.sendmail(from_addr, to_addr, mime_message % (to_addr, method, itip_payload))
+
+ def send_itip_invitation(self, attendee_email, start=None, allday=False, template=None, summary="test", sequence=0, partstat='NEEDS-ACTION'):
+ if start is None:
+ start = datetime.datetime.now()
+
+ uid = str(uuid.uuid4())
+
+ if allday:
+ default_template = itip_allday
+ end = start + datetime.timedelta(days=1)
+ date_format = '%Y%m%d'
+ else:
+ end = start + datetime.timedelta(hours=4)
+ default_template = itip_invitation
+ date_format = '%Y%m%dT%H%M%S'
+
+ self.send_message((template if template is not None else default_template) % {
+ 'uid': uid,
+ 'start': start.strftime(date_format),
+ 'end': end.strftime(date_format),
+ 'mailto': attendee_email,
+ 'summary': summary,
+ 'sequence': sequence,
+ 'partstat': partstat
+ },
+ attendee_email)
+
+ return uid
+
+ def send_itip_update(self, attendee_email, uid, start=None, template=None, summary="test", sequence=1, partstat='ACCEPTED'):
+ if start is None:
+ start = datetime.datetime.now()
+
+ end = start + datetime.timedelta(hours=4)
+ self.send_message((template if template is not None else itip_invitation) % {
+ 'uid': uid,
+ 'start': start.strftime('%Y%m%dT%H%M%S'),
+ 'end': end.strftime('%Y%m%dT%H%M%S'),
+ 'mailto': attendee_email,
+ 'summary': summary,
+ 'sequence': sequence,
+ 'partstat': partstat
+ },
+ attendee_email)
+
+ return uid
+
+ def send_itip_reply(self, uid, mailto, attendee_email, start=None, template=None, summary="test", sequence=1, partstat='ACCEPTED'):
+ if start is None:
+ start = datetime.datetime.now()
+
+ end = start + datetime.timedelta(hours=4)
+ self.send_message((template if template is not None else itip_reply) % {
+ 'uid': uid,
+ 'start': start.strftime('%Y%m%dT%H%M%S'),
+ 'end': end.strftime('%Y%m%dT%H%M%S'),
+ 'mailto': attendee_email,
+ 'organizer': mailto,
+ 'summary': summary,
+ 'sequence': sequence,
+ 'partstat': partstat
+ },
+ mailto,
+ attendee_email,
+ method="REPLY")
+
+ return uid
+
+ def send_itip_cancel(self, resource_email, uid):
+ self.send_message(itip_cancellation % (
+ uid,
+ resource_email
+ ),
+ resource_email)
+
+ return uid
+
+ def create_calendar_event(self, start=None, summary="test", sequence=0, user=None, attendee=None):
+ if start is None:
+ start = datetime.datetime.now(pytz.timezone("Europe/Berlin"))
+ if user is None:
+ user = self.john
+ if attendee is None:
+ attendee = self.jane
+
+ end = start + datetime.timedelta(hours=4)
+
+ event = pykolab.xml.Event()
+ event.set_start(start)
+ event.set_end(end)
+ event.set_organizer(user['mail'], user['displayname'])
+ event.add_attendee(attendee['mail'], attendee['displayname'], role="REQ-PARTICIPANT", participant_status="NEEDS-ACTION", rsvp=True)
+ event.set_summary(summary)
+ event.set_sequence(sequence)
+
+ imap = IMAP()
+ imap.connect()
+
+ mailbox = imap.folder_quote(user['kolabtargetfolder'])
+ imap.set_acl(mailbox, "cyrus-admin", "lrswipkxtecda")
+ imap.imap.m.select(mailbox)
+
+ result = imap.imap.m.append(
+ mailbox,
+ None,
+ None,
+ event.to_message().as_string()
+ )
+
+ return event.get_uid()
+
+ def check_message_received(self, subject, from_addr=None, mailbox=None):
+ if mailbox is None:
+ mailbox = self.john['mailbox']
+
+ imap = IMAP()
+ imap.connect()
+
+ mailbox = imap.folder_quote(mailbox)
+ imap.set_acl(mailbox, "cyrus-admin", "lrs")
+ imap.imap.m.select(mailbox)
+
+ found = None
+ retries = 15
+
+ while not found and retries > 0:
+ retries -= 1
+
+ typ, data = imap.imap.m.search(None, '(UNDELETED HEADER FROM "%s")' % (from_addr) if from_addr else 'UNDELETED')
+ for num in data[0].split():
+ typ, msg = imap.imap.m.fetch(num, '(RFC822)')
+ message = message_from_string(msg[0][1])
+ if message['Subject'] == subject:
+ found = message
+ break
+
+ time.sleep(1)
+
+ imap.disconnect()
+
+ return found
+
+ def check_user_calendar_event(self, mailbox, uid=None):
+ imap = IMAP()
+ imap.connect()
+
+ mailbox = imap.folder_quote(mailbox)
+ imap.set_acl(mailbox, "cyrus-admin", "lrs")
+ imap.imap.m.select(mailbox)
+
+ found = None
+ retries = 15
+
+ while not found and retries > 0:
+ retries -= 1
+
+ typ, data = imap.imap.m.search(None, '(UNDELETED HEADER SUBJECT "%s")' % (uid) if uid else '(UNDELETED HEADER X-Kolab-Type "application/x-vnd.kolab.event")')
+ for num in data[0].split():
+ typ, data = imap.imap.m.fetch(num, '(RFC822)')
+ event_message = message_from_string(data[0][1])
+
+ # return matching UID or first event found
+ if uid and event_message['subject'] != uid:
+ continue
+
+ for part in event_message.walk():
+ if part.get_content_type() == "application/calendar+xml":
+ payload = part.get_payload(decode=True)
+ found = pykolab.xml.event_from_string(payload)
+ break
+
+ if found:
+ break
+
+ time.sleep(1)
+
+ return found
+
+ def purge_mailbox(self, mailbox):
+ imap = IMAP()
+ imap.connect()
+ mailbox = imap.folder_quote(mailbox)
+ imap.set_acl(mailbox, "cyrus-admin", "lrwcdest")
+ imap.imap.m.select(mailbox)
+
+ typ, data = imap.imap.m.search(None, 'ALL')
+ for num in data[0].split():
+ imap.imap.m.store(num, '+FLAGS', '\\Deleted')
+
+ imap.imap.m.expunge()
+ imap.disconnect()
+
+
+ def test_001_invite_user(self):
+ start = datetime.datetime(2014,8,13, 10,0,0)
+ uid = self.send_itip_invitation(self.jane['mail'], start)
+
+ response = self.check_message_received('"test" has been ACCEPTED', self.jane['mail'])
+ self.assertIsInstance(response, email.message.Message)
+
+ event = self.check_user_calendar_event(self.jane['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+ self.assertEqual(event.get_summary(), "test")
+
+ # send update with the same sequence: no re-scheduling
+ self.send_itip_update(self.jane['mail'], uid, start, summary="test updated", sequence=0, partstat='ACCEPTED')
+
+ time.sleep(10)
+ event = self.check_user_calendar_event(self.jane['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+ self.assertEqual(event.get_summary(), "test updated")
+ self.assertEqual(event.get_attendee(self.jane['mail']).get_participant_status(), kolabformat.PartAccepted)
+
+
+ # @depends on test_001_invite_user
+ def test_002_invite_conflict(self):
+ uid = self.send_itip_invitation(self.jane['mail'], datetime.datetime(2014,8,13, 11,0,0), summary="test2")
+
+ response = self.check_message_received('"test2" has been DECLINED', self.jane['mail'])
+ self.assertIsInstance(response, email.message.Message)
+
+ event = self.check_user_calendar_event(self.jane['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+ self.assertEqual(event.get_summary(), "test2")
+
+
+ def test_003_invite_rescheduling(self):
+ start = datetime.datetime(2014,8,14, 9,0,0, tzinfo=pytz.timezone("Europe/Berlin"))
+ uid = self.send_itip_invitation(self.jane['mail'], start)
+
+ response = self.check_message_received('"test" has been ACCEPTED', self.jane['mail'])
+ self.assertIsInstance(response, email.message.Message)
+
+ event = self.check_user_calendar_event(self.jane['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+ self.assertEqual(event.get_summary(), "test")
+
+ self.purge_mailbox(self.john['mailbox'])
+
+ # send update with new date and incremented sequence
+ new_start = datetime.datetime(2014,8,15, 15,0,0, tzinfo=pytz.timezone("Europe/Berlin"))
+ self.send_itip_update(self.jane['mail'], uid, new_start, summary="test", sequence=1)
+
+ response = self.check_message_received('"test" has been ACCEPTED', self.jane['mail'])
+ self.assertIsInstance(response, email.message.Message)
+
+ event = self.check_user_calendar_event(self.jane['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+ self.assertEqual(event.get_start(), new_start)
+ self.assertEqual(event.get_sequence(), 1)
+
+
+ def test_004_invitation_reply(self):
+ start = datetime.datetime(2014,8,18, 14,30,0, tzinfo=pytz.timezone("Europe/Berlin"))
+ uid = self.create_calendar_event(start, user=self.john)
+
+ event = self.check_user_calendar_event(self.john['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+
+ # send a reply from jane to john
+ self.send_itip_reply(uid, self.john['mail'], self.jane['mail'], start=start)
+
+ # check for the updated event in john's calendar
+ time.sleep(10)
+ event = self.check_user_calendar_event(self.john['kolabtargetfolder'], uid)
+ self.assertIsInstance(event, pykolab.xml.Event)
+
+ attendee = event.get_attendee(self.jane['mail'])
+ self.assertIsInstance(attendee, pykolab.xml.Attendee)
+ self.assertEqual(attendee.get_participant_status(), kolabformat.PartAccepted)
+
\ No newline at end of file
diff --git a/tests/functional/user_add.py b/tests/functional/user_add.py
index 4939f93..b1b37f1 100644
--- a/tests/functional/user_add.py
+++ b/tests/functional/user_add.py
@@ -4,7 +4,7 @@ from pykolab import wap_client
conf = pykolab.getConf()
-def user_add(givenname, sn, preferredlanguage='en_US'):
+def user_add(givenname, sn, preferredlanguage='en_US', **kw):
if givenname == None:
raise Exception
@@ -25,6 +25,8 @@ def user_add(givenname, sn, preferredlanguage='en_US'):
'userpassword': 'Welcome2KolabSystems'
}
+ user_details.update(kw)
+
login = conf.get('ldap', 'bind_dn')
password = conf.get('ldap', 'bind_pw')
domain = conf.get('kolab', 'primary_domain')
diff --git a/tests/unit/test-012-wallace_invitationpolicy.py b/tests/unit/test-012-wallace_invitationpolicy.py
new file mode 100644
index 0000000..75939d0
--- /dev/null
+++ b/tests/unit/test-012-wallace_invitationpolicy.py
@@ -0,0 +1,129 @@
+import pykolab
+import logging
+import datetime
+
+from icalendar import Calendar
+from email import message
+from email import message_from_string
+from wallace import module_invitationpolicy as MIP
+from twisted.trial import unittest
+
+from pykolab.auth.ldap import LDAP
+from pykolab.constants import *
+
+
+# define some iTip MIME messages
+
+itip_multipart = """MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="=_c8894dbdb8baeedacae836230e3436fd"
+From: "Doe, John" <john.doe at example.org>
+Date: Fri, 13 Jul 2012 13:54:14 +0100
+Message-ID: <240fe7ae7e139129e9eb95213c1016d7 at example.org>
+User-Agent: Roundcube Webmail/0.9-0.3.el6.kolab_3.0
+To: jane.doe at example.org
+Subject: "test" has been updated
+
+--=_c8894dbdb8baeedacae836230e3436fd
+Content-Type: text/plain; charset=UTF-8; format=flowed
+Content-Transfer-Encoding: quoted-printable
+
+*test*
+
+--=_c8894dbdb8baeedacae836230e3436fd
+Content-Type: text/calendar; charset=UTF-8; method=REQUEST;
+ name=event.ics
+Content-Disposition: attachment;
+ filename=event.ics
+Content-Transfer-Encoding: quoted-printable
+
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//Roundcube Webmail 1.0.1//NONSGML Calendar//EN
+CALSCALE:GREGORIAN
+METHOD:REQUEST
+BEGIN:VEVENT
+UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271
+DTSTAMP:20120713T1254140
+DTSTART;TZID=3DEurope/London:20120713T100000
+DTEND;TZID=3DEurope/London:20120713T110000
+SUMMARY:test
+DESCRIPTION:test
+ORGANIZER;CN=3D"Doe, John":mailto:john.doe at example.org
+ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailt=
+o:jane.doe at example.org
+ATTENDEE;ROLE=3DOPT-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailt=
+user.external at example.com
+SEQUENCE:1
+TRANSP:OPAQUE
+END:VEVENT
+END:VCALENDAR
+
+--=_c8894dbdb8baeedacae836230e3436fd--
+"""
+
+conf = pykolab.getConf()
+
+if not hasattr(conf, 'defaults'):
+ conf.finalize_conf()
+
+class TestWallaceInvitationpolicy(unittest.TestCase):
+
+ def setUp(self):
+ # monkey-patch the pykolab.auth module to check API calls
+ # without actually connecting to LDAP
+ #self.patch(pykolab.auth.Auth, "connect", self._mock_nop)
+ #self.patch(pykolab.auth.Auth, "disconnect", self._mock_nop)
+ #self.patch(pykolab.auth.Auth, "find_user_dn", self._mock_find_user_dn)
+ #self.patch(pykolab.auth.Auth, "get_entry_attributes", self._mock_get_entry_attributes)
+ #self.patch(pykolab.auth.Auth, "search_entry_by_attribute", self._mock_search_entry_by_attribute)
+
+ # intercept calls to smtplib.SMTP.sendmail()
+ import smtplib
+ self.patch(smtplib.SMTP, "__init__", self._mock_smtp_init)
+ self.patch(smtplib.SMTP, "quit", self._mock_nop)
+ self.patch(smtplib.SMTP, "sendmail", self._mock_smtp_sendmail)
+
+ self.smtplog = [];
+
+ def _mock_find_user_dn(self, value, kolabuser=False):
+ (prefix, domain) = value.split('@')
+ return "uid=" + prefix + ",ou=People,dc=" + ",dc=".join(domain.split('.'))
+
+ def _mock_get_entry_attributes(self, domain, entry, attributes):
+ (_, uid) = entry.split(',')[0].split('=')
+ return { 'cn': uid, 'mail': uid + "@example.org", '_attrib': attributes }
+
+ def _mock_nop(self, domain=None):
+ pass
+
+ def _mock_smtp_init(self, host=None, port=None, local_hostname=None, timeout=0):
+ pass
+
+ def _mock_smtp_sendmail(self, from_addr, to_addr, message, mail_options=None, rcpt_options=None):
+ self.smtplog.append((from_addr, to_addr, message))
+
+ def test_001_itip_events_from_message(self):
+ itips = pykolab.itip.events_from_message(message_from_string(itip_multipart))
+ self.assertEqual(len(itips), 1, "Multipart iTip message with text/calendar")
+ self.assertEqual(itips[0]['method'], "REQUEST", "iTip request method property")
+ self.assertEqual(len(itips[0]['attendees']), 2, "List attendees from iTip")
+ self.assertEqual(itips[0]['attendees'][0], "mailto:jane.doe at example.org", "First attendee from iTip")
+
+ def test_002_user_dn_from_email_address(self):
+ res = MIP.user_dn_from_email_address("doe at example.org")
+ # assert call to (patched) pykolab.auth.Auth.find_resource()
+ self.assertEqual("uid=doe,ou=People,dc=example,dc=org", res);
+
+ def test_003_get_matching_invitation_policy(self):
+ user = { 'kolabinvitationpolicy': [
+ 'ACT_ACCEPT:example.org',
+ 'ACT_REJECT:gmail.com',
+ 'ACT_MANUAL:*'
+ ] }
+ self.assertEqual(MIP.get_matching_invitation_policies(user, 'fastmail.net'), [MIP.ACT_MANUAL])
+ self.assertEqual(MIP.get_matching_invitation_policies(user, 'example.org'), [MIP.ACT_ACCEPT,MIP.ACT_MANUAL])
+ self.assertEqual(MIP.get_matching_invitation_policies(user, 'gmail.com'), [MIP.ACT_REJECT,MIP.ACT_MANUAL])
+
+ user = { 'kolabinvitationpolicy': ['ACT_ACCEPT:example.org', 'ACT_MANUAL:others'] }
+ self.assertEqual(MIP.get_matching_invitation_policies(user, 'somedomain.net'), [MIP.ACT_MANUAL])
diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py
new file mode 100644
index 0000000..b5863c2
--- /dev/null
+++ b/wallace/module_invitationpolicy.py
@@ -0,0 +1,721 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 Kolab Systems AG (http://www.kolabsys.com)
+#
+# Thomas Bruederli (Kolab Systems) <bruederli at kolabsys.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import datetime
+import os
+import tempfile
+import time
+from urlparse import urlparse
+import urllib
+
+from email import message_from_string
+from email.parser import Parser
+from email.utils import formataddr
+from email.utils import getaddresses
+
+import modules
+
+import pykolab
+import kolabformat
+
+from pykolab.auth import Auth
+from pykolab.conf import Conf
+from pykolab.imap import IMAP
+from pykolab.xml import to_dt
+from pykolab.xml import event_from_string
+from pykolab.itip import events_from_message
+from pykolab.itip import check_event_conflict
+from pykolab.itip import send_reply
+from pykolab.translate import _
+
+# define some contstants used in the code below
+MOD_IF_AVAILABLE = 32
+MOD_IF_CONFLICT = 64
+MOD_TENTATIVE = 128
+MOD_NOTIFY = 256
+ACT_MANUAL = 1
+ACT_ACCEPT = 2
+ACT_DELEGATE = 4
+ACT_REJECT = 8
+ACT_UPDATE = 16
+ACT_TENTATIVE = ACT_ACCEPT + MOD_TENTATIVE
+ACT_ACCEPT_IF_NO_CONFLICT = ACT_ACCEPT + MOD_IF_AVAILABLE
+ACT_TENTATIVE_IF_NO_CONFLICT = ACT_ACCEPT + MOD_TENTATIVE + MOD_IF_AVAILABLE
+ACT_DELEGATE_IF_CONFLICT = ACT_DELEGATE + MOD_IF_CONFLICT
+ACT_REJECT_IF_CONFLICT = ACT_REJECT + MOD_IF_CONFLICT
+ACT_UPDATE_AND_NOTIFY = ACT_UPDATE + MOD_NOTIFY
+
+FOLDER_TYPE_ANNOTATION = '/vendor/kolab/folder-type'
+
+MESSAGE_PROCESSED = 1
+MESSAGE_FORWARD = 2
+
+policy_name_map = {
+ 'ACT_MANUAL': ACT_MANUAL,
+ 'ACT_ACCEPT': ACT_ACCEPT,
+ 'ACT_ACCEPT_IF_NO_CONFLICT': ACT_ACCEPT_IF_NO_CONFLICT,
+ 'ACT_TENTATIVE': ACT_TENTATIVE,
+ 'ACT_TENTATIVE_IF_NO_CONFLICT': ACT_TENTATIVE_IF_NO_CONFLICT,
+ 'ACT_DELEGATE': ACT_DELEGATE,
+ 'ACT_DELEGATE_IF_CONFLICT': ACT_DELEGATE_IF_CONFLICT,
+ 'ACT_REJECT': ACT_REJECT,
+ 'ACT_REJECT_IF_CONFLICT': ACT_REJECT_IF_CONFLICT,
+ 'ACT_UPDATE': ACT_UPDATE,
+ 'ACT_UPDATE_AND_NOTIFY': ACT_UPDATE_AND_NOTIFY
+}
+
+policy_value_map = dict([(v, k) for (k, v) in policy_name_map.iteritems()])
+
+log = pykolab.getLogger('pykolab.wallace')
+conf = pykolab.getConf()
+
+mybasepath = '/var/spool/pykolab/wallace/invitationpolicy/'
+
+auth = None
+imap = None
+
+def __init__():
+ modules.register('invitationpolicy', execute, description=description())
+
+def accept(filepath):
+ new_filepath = os.path.join(
+ mybasepath,
+ 'ACCEPT',
+ os.path.basename(filepath)
+ )
+
+ cleanup()
+ os.rename(filepath, new_filepath)
+ filepath = new_filepath
+ exec('modules.cb_action_ACCEPT(%r, %r)' % ('invitationpolicy',filepath))
+
+def reject(filepath):
+ new_filepath = os.path.join(
+ mybasepath,
+ 'REJECT',
+ os.path.basename(filepath)
+ )
+
+ os.rename(filepath, new_filepath)
+ filepath = new_filepath
+ exec('modules.cb_action_REJECT(%r, %r)' % ('invitationpolicy',filepath))
+
+def description():
+ return """Invitation policy execution module."""
+
+def cleanup():
+ global auth, imap
+
+ log.debug("cleanup(): %r, %r" % (auth, imap), level=9)
+
+ auth.disconnect()
+ del auth
+
+ # Disconnect IMAP or we lock the mailbox almost constantly
+ imap.disconnect()
+ del imap
+
+def execute(*args, **kw):
+ global auth, imap
+
+ if not os.path.isdir(mybasepath):
+ os.makedirs(mybasepath)
+
+ for stage in ['incoming', 'ACCEPT', 'REJECT', 'HOLD', 'DEFER' ]:
+ if not os.path.isdir(os.path.join(mybasepath, stage)):
+ os.makedirs(os.path.join(mybasepath, stage))
+
+ log.debug(_("Invitation policy called for %r, %r") % (args, kw), level=9)
+
+ auth = Auth()
+ imap = IMAP()
+
+ # TODO: Test for correct call.
+ filepath = args[0]
+
+ if kw.has_key('stage'):
+ log.debug(_("Issuing callback after processing to stage %s") % (kw['stage']), level=8)
+
+ log.debug(_("Testing cb_action_%s()") % (kw['stage']), level=8)
+ if hasattr(modules, 'cb_action_%s' % (kw['stage'])):
+ log.debug(_("Attempting to execute cb_action_%s()") % (kw['stage']), level=8)
+
+ exec(
+ 'modules.cb_action_%s(%r, %r)' % (
+ kw['stage'],
+ 'invitationpolicy',
+ filepath
+ )
+ )
+
+ return filepath
+ else:
+ # Move to incoming
+ new_filepath = os.path.join(
+ mybasepath,
+ 'incoming',
+ os.path.basename(filepath)
+ )
+
+ if not filepath == new_filepath:
+ log.debug("Renaming %r to %r" % (filepath, new_filepath))
+ os.rename(filepath, new_filepath)
+ filepath = new_filepath
+
+ # parse full message
+ message = Parser().parse(open(filepath, 'r'))
+
+ recipients = [address for displayname,address in getaddresses(message.get_all('X-Kolab-To'))]
+ sender_email = [address for displayname,address in getaddresses(message.get_all('X-Kolab-From'))][0]
+
+ any_itips = False
+ recipient_email = None
+ recipient_user_dn = None
+
+ # An iTip message may contain multiple events. Later on, test if the message
+ # is an iTip message by checking the length of this list.
+ try:
+ itip_events = events_from_message(message, ['REQUEST', 'REPLY', 'CANCEL'])
+ except Exception, e:
+ log.error(_("Failed to parse iTip events from message: %r" % (e)))
+ itip_events = []
+
+ if not len(itip_events) > 0:
+ log.info(_("Message is not an iTip message or does not contain any (valid) iTip events."))
+
+ else:
+ any_itips = True
+ log.debug(_("iTip events attached to this message contain the following information: %r") % (itip_events), level=9)
+
+ # See if any iTip actually allocates a user.
+ if any_itips and len([x['uid'] for x in itip_events if x.has_key('attendees') or x.has_key('organizer')]) > 0:
+ auth.connect()
+
+ for recipient in recipients:
+ recipient_user_dn = user_dn_from_email_address(recipient)
+ if recipient_user_dn is not None:
+ recipient_email = recipient
+ break
+
+ if not any_itips:
+ log.debug(_("No itips, no users, pass along %r") % (filepath), level=5)
+ return filepath
+ elif recipient_email is None:
+ log.debug(_("iTips, but no users, pass along %r") % (filepath), level=5)
+ return filepath
+
+ # we're looking at the first itip event object
+ itip_event = itip_events[0];
+
+ # for replies, the organizer is the recipient
+ if itip_event['method'] == 'REPLY':
+ user_attendees = [itip_event['organizer']] if str(itip_event['organizer']).split(':')[-1] == recipient_email else []
+
+ else:
+ # Limit the attendees to the one that is actually invited with the current message.
+ attendees = [str(a).split(':')[-1] for a in (itip_event['attendees'] if itip_event.has_key('attendees') else [])]
+ user_attendees = [a for a in attendees if a == recipient_email]
+
+ if itip_event.has_key('organizer'):
+ sender_email = itip_event['xml'].get_organizer().email()
+
+ # abort if no attendee matches the envelope recipient
+ if len(user_attendees) == 0:
+ log.info(_("No user attendee matching envelope recipient %s, skip message") % (recipient_email))
+ return filepath
+
+ receiving_user = auth.get_entry_attributes(None, recipient_user_dn, ['*'])
+ log.debug(_("Receiving user: %r") % (receiving_user), level=9)
+
+ # find user's kolabInvitationPolicy settings and the matching policy values
+ sender_domain = str(sender_email).split('@')[-1]
+ policies = get_matching_invitation_policies(receiving_user, sender_domain)
+
+ # select a processing function according to the iTip request method
+ method_processing_map = {
+ 'REQUEST': process_itip_request,
+ 'REPLY': process_itip_reply,
+ 'CANCEL': process_itip_cancel
+ }
+
+ done = None
+ if method_processing_map.has_key(itip_event['method']):
+ processor_func = method_processing_map[itip_event['method']]
+
+ # connect as cyrus-admin
+ imap.connect()
+
+ for policy in policies:
+ log.debug(_("Apply invitation policy %r for domain %r") % (policy_value_map[policy], sender_domain), level=8)
+ done = processor_func(itip_event, policy, recipient_email, sender_email, receiving_user)
+
+ # matching policy found
+ if done is not None:
+ break
+
+ else:
+ log.debug(_("Ignoring '%s' iTip method") % (itip_event['method']), level=8)
+
+ # message has been processed by the module, remove it
+ if done == MESSAGE_PROCESSED:
+ log.debug(_("iTip message %r consumed by the invitationpolicy module") % (message.get('Message-ID')), level=5)
+ os.unlink(filepath)
+ filepath = None
+
+ cleanup()
+ return filepath
+
+
+def process_itip_request(itip_event, policy, recipient_email, sender_email, receiving_user):
+ """
+ Process an iTip REQUEST message according to the given policy
+ """
+
+ # if invitation policy is set to MANUAL, pass message along
+ if policy & ACT_MANUAL:
+ log.info(_("Pass invitation for manual processing"))
+ return MESSAGE_FORWARD
+
+ try:
+ receiving_attendee = itip_event['xml'].get_attendee_by_email(recipient_email)
+ log.debug(_("Receiving Attendee: %r") % (receiving_attendee), level=9)
+ except Exception, e:
+ log.error("Could not find envelope attendee: %r" % (e))
+ return MESSAGE_FORWARD
+
+ # process request to participating attendees with RSVP=TRUE or PARTSTAT=NEEDS-ACTION
+ nonpart = receiving_attendee.get_role() == kolabformat.NonParticipant
+ partstat = receiving_attendee.get_participant_status()
+ save_event = not nonpart or not partstat == kolabformat.PartNeedsAction
+ scheduling_required = receiving_attendee.get_rsvp() or partstat == kolabformat.PartNeedsAction
+ condition_fulfilled = True
+
+ # find existing event in user's calendar
+ existing = find_existing_event(itip_event, receiving_user)
+
+ # compare sequence number to determine a (re-)scheduling request
+ if existing is not None:
+ log.debug(_("Existing event: %r") % (existing), level=9)
+ scheduling_required = itip_event['sequence'] > 0 and itip_event['sequence'] >= existing.get_sequence()
+ save_event = True
+
+ # if scheduling: check availability
+ if scheduling_required:
+ if policy & (MOD_IF_AVAILABLE | MOD_IF_CONFLICT):
+ condition_fulfilled = check_availability(itip_event, receiving_user)
+ if policy & MOD_IF_CONFLICT:
+ condition_fulfilled = not condition_fulfilled
+
+ log.debug(_("Precondition for event %r fulfilled: %r") % (itip_event['uid'], condition_fulfilled), level=5)
+
+ # if RSVP, send an iTip REPLY
+ if scheduling_required:
+ respond_with = None
+ if policy & ACT_ACCEPT and condition_fulfilled:
+ respond_with = 'TENTATIVE' if policy & MOD_TENTATIVE else 'ACCEPTED'
+
+ elif policy & ACT_REJECT and condition_fulfilled:
+ respond_with = 'DECLINED'
+ # TODO: only save declined invitation when a certain config option is set?
+
+ elif policy & ACT_DELEGATE and condition_fulfilled:
+ # TODO: save and delegate (but to whom?)
+ pass
+
+ # send iTip reply
+ if respond_with is not None:
+ # set attendee's CN from LDAP record if yet missing
+ if not receiving_attendee.get_name() and receiving_user.has_key('cn'):
+ receiving_attendee.set_name(receiving_user['cn'])
+
+ receiving_attendee.set_participant_status(respond_with)
+ send_reply(recipient_email, itip_event, invitation_response_text(),
+ subject=_('"%(summary)s" has been %(status)s'))
+
+ else:
+ # policy doesn't match, pass on to next one
+ return None
+
+ else:
+ log.debug(_("No RSVP for recipient %r requested") % (receiving_user['mail']), level=8)
+ # TODO: only update if policy & ACT_UPDATE ?
+
+ if save_event:
+ targetfolder = None
+
+ if existing:
+ # delete old version from IMAP
+ targetfolder = existing._imap_folder
+ delete_event(existing)
+
+ if not nonpart or existing:
+ # save new copy from iTip
+ if store_event(itip_event['xml'], receiving_user, targetfolder):
+ return MESSAGE_PROCESSED
+
+ return None
+
+
+def process_itip_reply(itip_event, policy, recipient_email, sender_email, receiving_user):
+ """
+ Process an iTip REPLY message according to the given policy
+ """
+
+ # if invitation policy is set to MANUAL, pass message along
+ if policy & ACT_MANUAL:
+ log.info(_("Pass reply for manual processing"))
+ return MESSAGE_FORWARD
+
+ # auto-update is enabled for this user
+ if policy & ACT_UPDATE:
+ try:
+ sender_attendee = itip_event['xml'].get_attendee_by_email(sender_email)
+ log.debug(_("Sender Attendee: %r") % (sender_attendee), level=9)
+ except Exception, e:
+ log.error("Could not find envelope sender attendee: %r" % (e))
+ return MESSAGE_FORWARD
+
+ # find existing event in user's calendar
+ existing = find_existing_event(itip_event, receiving_user)
+
+ if existing:
+ log.debug(_("Auto-updating event %r on iTip REPLY") % (existing.uid), level=8)
+
+ # TODO: compare sequence number to avoid outdated replies?
+ try:
+ existing.set_attendee_participant_status(sender_email, sender_attendee.get_participant_status())
+ except Exception, e:
+ log.error("Could not find corresponding attende in organizer's event: %r" % (e))
+
+ # TODO: accept new participant if ACT_ACCEPT ?
+ return MESSAGE_FORWARD
+
+ # update the organizer's copy of the event
+ delete_event(existing)
+ if store_event(existing, receiving_user, existing._imap_folder):
+ # TODO: send (consolidated) notification to organizer if policy & ACT_UPDATE_AND_NOTIFY:
+ # TODO: update all other attendee's copies if conf.get('wallace','invitationpolicy_autoupdate_other_attendees_on_reply'):
+ return MESSAGE_PROCESSED
+
+ else:
+ log.error(_("The event referred by this reply was not found in the user's calendars. Forwarding to Inbox."))
+ return MESSAGE_FORWARD
+
+ return None
+
+
+def process_itip_cancel(itip_event, policy, recipient_email, sender_email, receiving_user):
+ """
+ Process an iTip CANCEL message according to the given policy
+ """
+
+ # if invitation policy is set to MANUAL, pass message along
+ if policy & ACT_MANUAL:
+ log.info(_("Pass cancellation for manual processing"))
+ return MESSAGE_FORWARD
+
+ # update_event_in_user_calendar(itip_event, receiving_user)
+
+ return MESSAGE_PROCESSED
+
+
+def user_dn_from_email_address(email_address):
+ """
+ Resolves the given email address to a Kolab user entity
+ """
+ global auth
+
+ if not auth:
+ auth = Auth()
+ auth.connect()
+
+ local_domains = auth.list_domains()
+
+ if not local_domains == None:
+ local_domains = list(set(local_domains.keys()))
+
+ if not email_address.split('@')[1] in local_domains:
+ return None
+
+ log.debug(_("Checking if email address %r belongs to a local user") % (email_address), level=8)
+
+ user_dn = auth.find_user_dn(email_address, True)
+
+ if isinstance(user_dn, basestring):
+ log.debug(_("User DN: %r") % (user_dn), level=8)
+ else:
+ log.debug(_("No user record(s) found for %r") % (email_address), level=9)
+
+ auth.disconnect()
+
+ return user_dn
+
+
+def get_matching_invitation_policies(receiving_user, sender_domain):
+ # get user's kolabInvitationPolicy settings
+ policies = receiving_user['kolabinvitationpolicy'] if receiving_user.has_key('kolabinvitationpolicy') else []
+ if policies and not isinstance(policies, list):
+ policies = [policies]
+
+ if len(policies) == 0:
+ policies = conf.get_list('wallace', 'kolab_invitation_policy')
+
+ # match policies agains the given sender_domain
+ matches = []
+ for p in policies:
+ if ':' in p:
+ (value, domain) = p.split(':')
+ else:
+ value = p
+ domain = ''
+
+ if domain == '' or domain == '*' or sender_domain.endswith(domain):
+ value = value.upper()
+ if policy_name_map.has_key(value):
+ matches.append(policy_name_map[value])
+
+ # add manual as default action
+ if len(matches) == 0:
+ matches.append(ACT_MANUAL)
+
+ return matches
+
+
+def imap_proxy_auth(user_rec):
+ """
+
+ """
+ global imap
+
+ mail_attribute = conf.get('cyrus-sasl', 'result_attribute')
+ if mail_attribute == None:
+ mail_attribute = 'mail'
+
+ mail_attribute = mail_attribute.lower()
+
+ if not user_rec.has_key(mail_attribute):
+ log.error(_("User record doesn't have the mailbox attribute %r set" % (mail_attribute)))
+ return False
+
+ # do IMAP prox auth with the given user
+ backend = conf.get('kolab', 'imap_backend')
+ admin_login = conf.get(backend, 'admin_login')
+ admin_password = conf.get(backend, 'admin_password')
+
+ try:
+ imap.disconnect()
+ imap.connect(login=False)
+ imap.login_plain(admin_login, admin_password, user_rec[mail_attribute])
+ except Exception, errmsg:
+ log.error(_("IMAP proxy authentication failed: %r") % (errmsg))
+ return False
+
+ return True
+
+
+def list_user_calendars(user_rec):
+ """
+ Get a list of the given user's private calendar folders
+ """
+ global imap
+
+ # return cached list
+ if user_rec.has_key('_calendar_folders'):
+ return user_rec['_calendar_folders'];
+
+ calendars = []
+
+ if not imap_proxy_auth(user_rec):
+ return calendars
+
+ folders = imap.list_folders('*')
+ log.debug(_("List calendar folders for user %r: %r") % (user_rec['mail'], folders), level=8)
+
+ (ns_personal, ns_other, ns_shared) = imap.namespaces()
+
+ if isinstance(ns_shared, list):
+ ns_shared = ns_shared[0]
+ if isinstance(ns_other, list):
+ ns_other = ns_other[0]
+
+ for folder in folders:
+ # exclude shared and other user's namespace
+ # TODO: list shared folders the user has write privileges ?
+ if folder.startswith(ns_other) or folder.startswith(ns_shared):
+ continue;
+
+ metadata = imap.get_metadata(folder)
+ log.debug(_("IMAP metadata for %r: %r") % (folder, metadata), level=9)
+ if metadata.has_key(folder) and ( \
+ metadata[folder].has_key('/shared' + FOLDER_TYPE_ANNOTATION) and metadata[folder]['/shared' + FOLDER_TYPE_ANNOTATION].startswith('event') \
+ or metadata[folder].has_key('/private' + FOLDER_TYPE_ANNOTATION) and metadata[folder]['/private' + FOLDER_TYPE_ANNOTATION].startswith('event')):
+ calendars.append(folder)
+
+ # store default calendar folder in user record
+ if metadata[folder].has_key('/private' + FOLDER_TYPE_ANNOTATION) and metadata[folder]['/private' + FOLDER_TYPE_ANNOTATION].endswith('.default'):
+ user_rec['_default_calendar'] = folder
+
+ # cache with user record
+ user_rec['_calendar_folders'] = calendars
+
+ return calendars
+
+
+def find_existing_event(itip_event, user_rec):
+ """
+ Search user's calendar folders for the given event (by UID)
+ """
+ global imap
+
+ event = None
+ for folder in list_user_calendars(user_rec):
+ log.debug(_("Searching folder %r for event %r") % (folder, itip_event['uid']), level=8)
+ imap.imap.m.select(imap.folder_utf7(folder))
+
+ typ, data = imap.imap.m.search(None, '(UNDELETED HEADER SUBJECT "%s")' % (itip_event['uid']))
+ for num in reversed(data[0].split()):
+ typ, data = imap.imap.m.fetch(num, '(RFC822)')
+
+ event_message = message_from_string(data[0][1])
+
+ if event_message.is_multipart():
+ for part in event_message.walk():
+ if part.get_content_type() == "application/calendar+xml":
+ payload = part.get_payload(decode=True)
+ event = event_from_string(payload)
+ setattr(event, '_imap_folder', folder)
+ break
+
+ if event and event.uid == itip_event['uid']:
+ return event
+
+ return event
+
+
+def check_availability(itip_event, receiving_user):
+ """
+ For the receiving user, determine if the event in question is in conflict.
+ """
+
+ start = time.time()
+ num_messages = 0
+ conflict = False
+
+ # return previously detected conflict
+ if itip_event.has_key('_conflicts'):
+ return not itip_event['_conflicts']
+
+ for folder in list_user_calendars(receiving_user):
+ log.debug(_("Listing events from folder %r") % (folder), level=8)
+ imap.imap.m.select(imap.folder_utf7(folder))
+
+ typ, data = imap.imap.m.search(None, '(UNDELETED HEADER X-Kolab-Type "application/x-vnd.kolab.event")')
+ num_messages += len(data[0].split())
+
+ for num in reversed(data[0].split()):
+ event = None
+ typ, data = imap.imap.m.fetch(num, '(RFC822)')
+
+ event_message = message_from_string(data[0][1])
+
+ if event_message.is_multipart():
+ for part in event_message.walk():
+ if part.get_content_type() == "application/calendar+xml":
+ payload = part.get_payload(decode=True)
+ event = event_from_string(payload)
+ break
+
+ if event and event.uid:
+ conflict = check_event_conflict(event, itip_event)
+ if conflict:
+ log.info(_("Existing event %r conflicts with invitation %r") % (event.uid, itip_event['uid']))
+ break
+
+ if conflict:
+ break
+
+ end = time.time()
+ log.debug(_("start: %r, end: %r, total: %r, messages: %d") % (start, end, (end-start), num_messages), level=9)
+
+ # remember the result of this check for further iterations
+ itip_event['_conflicts'] = conflict
+
+ return not conflict
+
+
+def store_event(event, user_rec, targetfolder=None):
+ """
+ Append the given event object to the user's default calendar
+ """
+
+ # find default calendar folder to save event to
+ if targetfolder is None:
+ targetfolder = list_user_calendars(user_rec)[0]
+ if user_rec.has_key('_default_calendar'):
+ targetfolder = user_rec['_default_calendar']
+
+ if not targetfolder:
+ log.error(_("Failed to save event: no calendar folder found for user %r") % (user_rec['mail']))
+ return Fasle
+
+ log.debug(_("Save event %r to user calendar %r") % (event.uid, targetfolder), level=8)
+
+ try:
+ imap.imap.m.select(imap.folder_utf7(targetfolder))
+ result = imap.imap.m.append(
+ imap.folder_utf7(targetfolder),
+ None,
+ None,
+ event.to_message().as_string()
+ )
+ return result
+
+ except Exception, e:
+ log.error(_("Failed to save event to user calendar at %r: %r") % (
+ targetfolder, e
+ ))
+
+ return False
+
+
+def delete_event(existing):
+ """
+ Removes the IMAP object with the given UID from a user's calendar folder
+ """
+ targetfolder = existing._imap_folder
+ imap.imap.m.select(imap.folder_utf7(targetfolder))
+
+ typ, data = imap.imap.m.search(None, '(HEADER SUBJECT "%s")' % existing.uid)
+
+ log.debug(_("Delete event %r in %r: %r") % (
+ existing.uid, targetfolder, data
+ ), level=8)
+
+ for num in data[0].split():
+ imap.imap.m.store(num, '+FLAGS', '\\Deleted')
+
+ imap.imap.m.expunge()
+
+
+def invitation_response_text():
+ return _("""
+ %(name)s has %(status)s your invitation for %(summary)s.
+
+ *** This is an automated response sent by the Kolab Invitation system ***
+ """)
More information about the commits
mailing list