2 commits - tests/functional tests/unit wallace/module_resources.py
Thomas Brüderli
bruederli at kolabsys.com
Tue Feb 25 18:31:20 CET 2014
tests/functional/test_wallace/test_005_resource_invitation.py | 145 +++
tests/unit/test-011-wallace_resources.py | 13
wallace/module_resources.py | 480 ++++------
3 files changed, 375 insertions(+), 263 deletions(-)
New commits:
commit 66baddf10c099c8b0c287c267e58afeec387d520
Author: Thomas Bruederli <bruederli at kolabsys.com>
Date: Wed Feb 19 17:54:03 2014 -0500
Refactored wallace/module_resource:
- moved functional blocks into actual functions for better testing
- limit processing to the resource that received the given message
(each resource will receive this invitation and respond individually)
- simplified the code, reduced duplication
diff --git a/tests/functional/test_wallace/test_005_resource_invitation.py b/tests/functional/test_wallace/test_005_resource_invitation.py
index cf1cfc6..4b6900e 100644
--- a/tests/functional/test_wallace/test_005_resource_invitation.py
+++ b/tests/functional/test_wallace/test_005_resource_invitation.py
@@ -59,8 +59,17 @@ END:VCALENDAR
class TestResourceInvitation(unittest.TestCase):
+ john = None
+
+ @classmethod
+ def setUp(self):
+ """ Compatibility for twisted.trial.unittest
+ """
+ if not self.john:
+ self.setup_class()
+
@classmethod
- def setUpClass(self):
+ def setup_class(self, *args, **kw):
from tests.functional.purge_users import purge_users
purge_users()
diff --git a/tests/unit/test-011-wallace_resources.py b/tests/unit/test-011-wallace_resources.py
index 106c2f3..f6b2d40 100644
--- a/tests/unit/test-011-wallace_resources.py
+++ b/tests/unit/test-011-wallace_resources.py
@@ -47,6 +47,8 @@ DESCRIPTION:test
ORGANIZER;CN=3D"Doe, John":mailto:john.doe at example.org
ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailt=
o:resource-collection-car at example.org
+ATTENDEE;ROLE=3DOPTIONAL;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailto:anoth=
+er-resource at example.org
TRANSP:OPAQUE
END:VEVENT
END:VCALENDAR
@@ -305,15 +307,20 @@ class TestWallaceResources(unittest.TestCase):
def test_002_resource_record_from_email_address(self):
res = module_resources.resource_record_from_email_address("doe at example.org")
- # assert call to (pathced) pykolab.auth.Auth.find_resource()
+ # assert call to (patched) pykolab.auth.Auth.find_resource()
self.assertEqual(len(res), 1);
self.assertEqual("uid=doe,dc=example,dc=org", res[0]);
def test_003_resource_records_from_itip_events(self):
- itips = module_resources.itip_events_from_message(message_from_string(itip_multipart))
+ message = message_from_string(itip_multipart)
+ itips = module_resources.itip_events_from_message(message)
+
res = module_resources.resource_records_from_itip_events(itips)
- self.assertEqual(len(res), 1);
+ self.assertEqual(len(res), 2, "Return all attendee resources");
+
+ res = module_resources.resource_records_from_itip_events(itips, message['To'])
+ self.assertEqual(len(res), 1, "Return only recipient resource");
self.assertEqual("uid=resource-collection-car,dc=example,dc=org", res[0]);
diff --git a/wallace/module_resources.py b/wallace/module_resources.py
index 8b0fccb..0dfe1df 100644
--- a/wallace/module_resources.py
+++ b/wallace/module_resources.py
@@ -125,9 +125,8 @@ def execute(*args, **kw):
os.rename(filepath, new_filepath)
filepath = new_filepath
- # parse message headers
- # @TODO: make sure we can use True as the 2nd argument here
- message = Parser().parse(open(filepath, 'r'), True)
+ # parse full message
+ message = Parser().parse(open(filepath, 'r'))
recipients = [address for displayname,address in getaddresses(message.get_all('X-Kolab-To'))]
@@ -165,6 +164,7 @@ def execute(*args, **kw):
if possibly_any_resources:
for recipient in recipients:
if not len(resource_record_from_email_address(recipient)) == 0:
+ resource_recipient = recipient
any_resources = True
if any_resources:
@@ -187,7 +187,7 @@ def execute(*args, **kw):
# A simple list of merely resource entry IDs that hold any relevance to the
# iTip events
- resource_records = resource_records_from_itip_events(itip_events)
+ resource_records = resource_records_from_itip_events(itip_events, resource_recipient)
# Get the resource details, which includes details on the IMAP folder
resources = {}
@@ -222,102 +222,31 @@ def execute(*args, **kw):
start = time.time()
for resource in resources.keys():
+ # skip this for resource collections
if not resources[resource].has_key('kolabtargetfolder'):
continue
- mailbox = resources[resource]['kolabtargetfolder']
-
- resources[resource]['conflict'] = False
- resources[resource]['conflicting_events'] = []
-
- log.debug(
- _("Checking events in resource folder %r") % (mailbox),
- level=8
- )
-
+ # sets the 'conflicting' flag and adds a list of conflicting events found
try:
- imap.imap.m.select(mailbox)
- except:
- log.error(_("Mailbox for resource %r doesn't exist") % (resource))
+ read_resource_calendar(resources[resource], itip_events, imap)
+ except Exception, e:
+ log.error(_("Failed to read resource calendar for %r: %r") % (resource, e))
continue
- typ, data = imap.imap.m.search(None, 'ALL')
-
- num_messages = len(data[0].split())
-
- for num in data[0].split():
- # For efficiency, makes the routine non-deterministic
- if resources[resource]['conflict']:
- continue
-
- log.debug(
- _("Fetching message UID %r from folder %r") % (num, mailbox),
- level=9
- )
-
- typ, data = imap.imap.m.fetch(num, '(RFC822)')
-
- event_message = message_from_string(data[0][1])
-
- if event_message.is_multipart():
- for part in event_message.walk():
- if part.get_content_type() == "application/calendar+xml":
- payload = part.get_payload(decode=True)
- event = pykolab.xml.event_from_string(payload)
-
- for itip in itip_events:
- _es = to_dt(event.get_start())
- _is = to_dt(itip['start'].dt)
-
- _ee = to_dt(event.get_end())
- _ie = to_dt(itip['end'].dt)
-
- if _es < _is:
- if _es <= _ie:
- if _ee <= _is:
- conflict = False
- else:
- conflict = True
- else:
- conflict = True
- elif _es == _is:
- conflict = True
- else: # _es > _is
- if _es <= _ie:
- conflict = True
- else:
- conflict = False
-
- if conflict:
- log.info(
- _("Event %r conflicts with event " + \
- "%r") % (
- itip['xml'].get_uid(),
- event.get_uid()
- )
- )
-
- resources[resource]['conflicting_events'].append(event)
- resources[resource]['conflict'] = True
-
end = time.time()
- log.debug(
- _("start: %r, end: %r, total: %r, messages: %r") % (
- start, end, (end-start), num_messages
- ),
- level=1
- )
+ log.debug(_("start: %r, end: %r, total: %r") % (start, end, (end-start)), level=1)
+
+ done = False
for resource in resources.keys():
log.debug(_("Polling for resource %r") % (resource), level=9)
- if not resources.has_key(resource):
- log.debug(
- _("Resource %r has been popped from the list") % (resource),
- level=9
- )
+ if done:
+ break
+ if not resources.has_key(resource):
+ log.debug(_("Resource %r has been popped from the list") % (resource), level=9)
continue
if not resources[resource].has_key('conflicting_events'):
@@ -329,13 +258,8 @@ def execute(*args, **kw):
for itip_event in itip_events:
# Now we have the event that was conflicting
if resources[resource]['mail'] in [a.get_email() for a in itip_event['xml'].get_attendees()]:
- itip_event['xml'].set_attendee_participant_status(
- [a for a in itip_event['xml'].get_attendees() if a.get_email() == resources[resource]['mail']][0],
- "DECLINED"
- )
-
- send_response(resources[resource]['mail'], itip_events)
- # TODO: Accept the message to the other attendees
+ decline_reservation_request(itip_event, resources[resource])
+ done = True
else:
# This must have been a resource collection originally.
@@ -344,46 +268,19 @@ def execute(*args, **kw):
if resources[resource].has_key('memberof'):
original_resource = resources[resources[resource]['memberof']]
- _target_resource = resources[original_resource['uniquemember'][random.randint(0,(len(original_resource['uniquemember'])-1))]]
-
- # unset all
- for _r in original_resource['uniquemember']:
- del resources[_r]
+ # TODO: shuffle existing bookings of collection members in order
+ # to make one availale for the requested time
if original_resource['mail'] in [a.get_email() for a in itip_event['xml'].get_attendees()]:
- itip_event['xml'].set_attendee_participant_status(
- [a for a in itip_event['xml'].get_attendees() if a.get_email() == original_resource['mail']][0],
- "DECLINED"
- )
-
- send_response(original_resource['mail'], itip_events)
- # TODO: Accept the message to the other attendees
+ decline_reservation_request(itip_event, original_resource)
+ done = True
else:
# No conflicts, go accept
for itip_event in itip_events:
if resources[resource]['mail'] in [a.get_email() for a in itip_event['xml'].get_attendees()]:
- itip_event['xml'].set_attendee_participant_status(
- [a for a in itip_event['xml'].get_attendees() if a.get_email() == resources[resource]['mail']][0],
- "ACCEPTED"
- )
-
- log.debug(
- _("Adding event to %r") % (
- resources[resource]['kolabtargetfolder']
- ),
- level=9
- )
-
- imap.imap.m.setacl(resources[resource]['kolabtargetfolder'], "cyrus-admin", "lrswipkxtecda")
- imap.imap.m.append(
- resources[resource]['kolabtargetfolder'],
- None,
- None,
- itip_event['xml'].to_message().as_string()
- )
-
- send_response(resources[resource]['mail'], itip_event)
+ accept_reservation_request(itip_event, resources[resource], imap)
+ done = True
else:
# This must have been a resource collection originally.
@@ -392,14 +289,9 @@ def execute(*args, **kw):
if resources[resource].has_key('memberof'):
original_resource = resources[resources[resource]['memberof']]
- # Randomly selects a target resource from the resource
- # collection.
+ # Randomly selects a target resource from the resource collection.
_target_resource = resources[original_resource['uniquemember'][random.randint(0,(len(original_resource['uniquemember'])-1))]]
- # Remove all resources from the collection.
- for _r in original_resource['uniquemember']:
- del resources[_r]
-
if original_resource['mail'] in [a.get_email() for a in itip_event['xml'].get_attendees()]:
#
# Delegate:
@@ -407,35 +299,10 @@ def execute(*args, **kw):
# - delegator: the original resource collection
# - delegatee: the target resource
#
+ itip_event['xml'].delegate(original_resource['mail'], _target_resource['mail'])
- itip_event['xml'].delegate(
- original_resource['mail'],
- _target_resource['mail']
- )
-
- itip_event['xml'].set_attendee_participant_status(
- [a for a in itip_event['xml'].get_attendees() if a.get_email() == _target_resource['mail']][0],
- "ACCEPTED"
- )
-
- log.debug(
- _("Adding event to %r") % (
- _target_resource['kolabtargetfolder']
- ),
- level=9
- )
-
- # TODO: The Cyrus IMAP (or Dovecot) Administrator login
- # name comes from configuration.
- imap.imap.m.setacl(_target_resource['kolabtargetfolder'], "cyrus-admin", "lrswipkxtecda")
- imap.imap.m.append(
- _target_resource['kolabtargetfolder'],
- None,
- None,
- itip_event['xml'].to_message().as_string()
- )
-
- send_response(original_resource['mail'], itip_event)
+ accept_reservation_request(itip_event, _target_resource, imap)
+ done = True
auth.disconnect()
del auth
@@ -446,6 +313,130 @@ def execute(*args, **kw):
os.unlink(filepath)
+
+def read_resource_calendar(resource_rec, itip_events, imap):
+ """
+ Read all booked events from the given resource's calendar
+ and check for conflicts with the given list if itip events
+ """
+
+ resource_rec['conflict'] = False
+ resource_rec['conflicting_events'] = []
+
+ mailbox = resource_rec['kolabtargetfolder']
+
+ log.debug(
+ _("Checking events in resource folder %r") % (mailbox),
+ level=8
+ )
+
+ # might raise an exception, let that bubble
+ imap.imap.m.select(mailbox)
+ typ, data = imap.imap.m.search(None, 'ALL')
+
+ num_messages = len(data[0].split())
+
+ for num in data[0].split():
+ # For efficiency, makes the routine non-deterministic
+ if resource_rec['conflict']:
+ continue
+
+ log.debug(
+ _("Fetching message UID %r from folder %r") % (num, mailbox),
+ level=9
+ )
+
+ typ, data = imap.imap.m.fetch(num, '(RFC822)')
+
+ event_message = message_from_string(data[0][1])
+
+ if event_message.is_multipart():
+ for part in event_message.walk():
+ if part.get_content_type() == "application/calendar+xml":
+ payload = part.get_payload(decode=True)
+ event = pykolab.xml.event_from_string(payload)
+
+ for itip in itip_events:
+ _es = to_dt(event.get_start())
+ _is = to_dt(itip['start'].dt)
+
+ _ee = to_dt(event.get_end())
+ _ie = to_dt(itip['end'].dt)
+
+ if _es < _is:
+ if _es <= _ie:
+ if _ee <= _is:
+ conflict = False
+ else:
+ conflict = True
+ else:
+ conflict = True
+ elif _es == _is:
+ conflict = True
+ else: # _es > _is
+ if _es <= _ie:
+ conflict = True
+ else:
+ conflict = False
+
+ if conflict:
+ log.info(
+ _("Event %r conflicts with event %r") % (
+ itip['xml'].get_uid(),
+ event.get_uid()
+ )
+ )
+
+ resource_rec['conflicting_events'].append(event)
+ resource_rec['conflict'] = True
+
+ return resource_rec['conflict']
+
+
+def accept_reservation_request(itip_event, resource, imap):
+ """
+ Accepts the given iTip event by booking it into the resource's
+ calendar. Then set the attendee status of the given resource to
+ ACCEPTED and sends an iTip reply message to the organizer.
+ """
+
+ itip_event['xml'].set_attendee_participant_status(
+ itip_event['xml'].get_attendee_by_email(resource['mail']),
+ "ACCEPTED"
+ )
+
+ log.debug(
+ _("Adding event to %r") % (resource['kolabtargetfolder']),
+ level=9
+ )
+
+ # TODO: The Cyrus IMAP (or Dovecot) Administrator login
+ # name comes from configuration.
+ imap.imap.m.setacl(resource['kolabtargetfolder'], "cyrus-admin", "lrswipkxtecda")
+ imap.imap.m.append(
+ resource['kolabtargetfolder'],
+ None,
+ None,
+ itip_event['xml'].to_message().as_string()
+ )
+
+ send_response(resource['mail'], itip_event)
+
+
+def decline_reservation_request(itip_event, resource):
+ """
+ Set the attendee status of the given resource to
+ DECLINED and send an according iTip reply to the organizer.
+ """
+
+ itip_event['xml'].set_attendee_participant_status(
+ itip_event['xml'].get_attendee_by_email(resource['mail']),
+ "DECLINED"
+ )
+
+ send_response(resource, itip_event)
+
+
def itip_events_from_message(message):
"""
Obtain the iTip payload from email.message <message>
@@ -561,48 +552,39 @@ def reject(filepath):
filepath = new_filepath
exec('modules.cb_action_REJECT(%r, %r)' % ('resources',filepath))
+
def resource_record_from_email_address(email_address):
+ """
+ Resolves the given email address to a resource entity
+ """
+
auth = Auth()
auth.connect()
resource_records = []
log.debug(
- _("Checking if email address %r belongs to a resource (collection)") % (
- email_address
- ),
- level=8
- )
+ _("Checking if email address %r belongs to a resource (collection)") % (email_address),
+ level=8
+ )
resource_records = auth.find_resource(email_address)
if isinstance(resource_records, list):
- if len(resource_records) == 0:
- log.debug(
- _("No resource (collection) records found for %r") % (
- email_address
- ),
- level=9
- )
-
+ if len(resource_records) > 0:
+ log.debug(_("Resource record(s): %r") % (resource_records), level=8)
else:
- log.debug(
- _("Resource record(s): %r") % (resource_records),
- level=8
- )
+ log.debug(_("No resource (collection) records found for %r") % (email_address), level=9)
elif isinstance(resource_records, basestring):
- log.debug(
- _("Resource record: %r") % (resource_records),
- level=8
- )
-
resource_records = [ resource_records ]
+ log.debug(_("Resource record: %r") % (resource_records), level=8)
auth.disconnect()
return resource_records
-def resource_records_from_itip_events(itip_events):
+
+def resource_records_from_itip_events(itip_events, recipient_email=None):
"""
Given a list of itip_events, determine which resources have been
invited as attendees and/or resources.
@@ -641,103 +623,81 @@ def resource_records_from_itip_events(itip_events):
#
attendees = [x.split(':')[-1] for x in attendees_raw]
+ # Limit the attendee resources to the one that is actually invited
+ # with the current message. Considering all invited resources would result in
+ # duplicate responses from every iTip message sent to a resource.
+ if recipient_email is not None:
+ attendees = [a for a in attendees if a == recipient_email]
+
for attendee in attendees:
- log.debug(
- _("Checking if attendee %r is a resource (collection)") % (
- attendee
- ),
- level=8
- )
+ log.debug(_("Checking if attendee %r is a resource (collection)") % (attendee), level=8)
_resource_records = auth.find_resource(attendee)
if isinstance(_resource_records, list):
- if len(_resource_records) == 0:
- log.debug(
- _("No resource (collection) records found for %r") % (
- attendee
- ),
- level=9
- )
-
+ if len(_resource_records) > 0:
+ resource_records.extend(_resource_records)
+ log.debug(_("Resource record(s): %r") % (_resource_records), level=8)
else:
- log.debug(
- _("Resource record(s): %r") % (_resource_records),
- level=8
- )
+ log.debug(_("No resource (collection) records found for %r") % (attendee), level=9)
- resource_records.extend(_resource_records)
elif isinstance(_resource_records, basestring):
- log.debug(
- _("Resource record: %r") % (_resource_records),
- level=8
- )
-
resource_records.append(_resource_records)
+ log.debug(_("Resource record: %r") % (_resource_records), level=8)
+
else:
- log.warning(
- _("Resource reservation made but no resource records found")
- )
+ log.warning(_("Resource reservation made but no resource records found"))
- # TODO: Escape the non-implementation of the free-form, undefined RESOURCES
- # list(s) in iTip. We don't know how to handle this yet!
- resources_raw = []
+ # Escape the non-implementation of the free-form, undefined RESOURCES
+ # list(s) in iTip.
+ if len(resource_records) == 0:
- # TODO: We expect the format of an resource line to literally be:
- #
- # RESOURCES:MAILTO:resource-car at kolabsys.com
- #
- # which makes the resources_raw contain:
- #
- # MAILTO:resource-car at kolabsys.com
- #
- resources = [x.split(':')[-1] for x in resources_raw]
- for resource in resources:
- log.debug(
- _("Checking if resource %r is a resource (collection)") % (
- resource
- ),
- level=8
- )
+ # TODO: We don't know how to handle this yet!
+ # We expect the format of an resource line to literally be:
+ # RESOURCES:MAILTO:resource-car at kolabsys.com
+ resources_raw = []
- _resource_records = auth.find_resource(resource)
- if isinstance(_resource_records, list):
- if len(_resource_records) == 0:
- log.debug(
- _("No resource (collection) records found for %r") % (
- resource
- ),
- level=8
- )
+ resources = [x.split(':')[-1] for x in resources_raw]
+
+ # Limit the attendee resources to the one that is actually invited
+ # with the current message.
+ if recipient_email is not None:
+ resources = [a for a in resources if a == recipient_email]
+
+ for resource in resources:
+ log.debug(_("Checking if resource %r is a resource (collection)") % (resource), level=8)
+
+ _resource_records = auth.find_resource(resource)
+ if isinstance(_resource_records, list):
+ if len(_resource_records) > 0:
+ resource_records.extend(_resource_records)
+ log.debug(_("Resource record(s): %r") % (_resource_records), level=8)
+ else:
+ log.debug(_("No resource (collection) records found for %r") % (resource), level=8)
+
+ elif isinstance(_resource_records, basestring):
+ resource_records.append(_resource_records)
+ log.debug(_("Resource record: %r") % (_resource_records), level=8)
else:
- log.debug(
- _("Resource record(s): %r") % (_resource_records),
- level=8
- )
+ log.warning(_("Resource reservation made but no resource records found"))
- resource_records.extend(_resource_records)
- elif isinstance(_resource_records, basestring):
- resource_records.append(_resource_records)
- log.debug(_("Resource record: %r") % (_resource_records), level=8)
- else:
- log.warning(
- _("Resource reservation made but no resource records found")
- )
- log.debug(
- _("The following resources are being referred to in the " + \
- "iTip: %r") % (
- resource_records
- ),
- level=8
- )
+ log.debug(_("The following resources are being referred to in the " + \
+ "iTip: %r") % (resource_records), level=8)
auth.disconnect()
return resource_records
+
def send_response(from_address, itip_events):
+ """
+ Send the given iCal events as a valid iTip response to the organizer.
+ In case the invited resource coolection was delegated to a concrete
+ resource, this will send an additional DELEGATED response message.
+ """
+
import smtplib
smtp = smtplib.SMTP("localhost", 10027)
@@ -748,12 +708,12 @@ def send_response(from_address, itip_events):
itip_events = [ itip_events ]
for itip_event in itip_events:
- attendee = itip_event['xml'].get_attendee(from_address)
+ attendee = itip_event['xml'].get_attendee_by_email(from_address)
participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee)
if participant_status == "DELEGATED":
# Extra actions to take
- delegator = itip_event['xml'].get_attendee(from_address)
+ delegator = itip_event['xml'].get_attendee_by_email(from_address)
delegatee = [a for a in itip_event['xml'].get_attendees() if from_address in [b.email() for b in a.get_delegated_from()]][0]
message = itip_event['xml'].to_message_itip(delegatee.get_email(), method="REPLY", participant_status=itip_event['xml'].get_ical_attendee_participant_status(delegatee))
commit e6deec8a14428b9550c38ba09338c8222ed9035c
Author: Thomas Bruederli <bruederli at kolabsys.com>
Date: Wed Feb 19 16:26:00 2014 -0500
Add functional test to validate the resource invitation process run by wallace
diff --git a/tests/functional/test_wallace/test_005_resource_invitation.py b/tests/functional/test_wallace/test_005_resource_invitation.py
new file mode 100644
index 0000000..cf1cfc6
--- /dev/null
+++ b/tests/functional/test_wallace/test_005_resource_invitation.py
@@ -0,0 +1,136 @@
+import time
+import pykolab
+import smtplib
+import email
+
+from pykolab import wap_client
+from pykolab.auth import Auth
+from pykolab.imap import IMAP
+from wallace import module_resources
+
+from icalendar import Calendar
+from email import message_from_string
+from twisted.trial import unittest
+
+import tests.functional.resource_func as funcs
+
+conf = pykolab.getConf()
+
+itip_invitation = """MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="=_c8894dbdb8baeedacae836230e3436fd"
+From: "Doe, John" <john.doe at example.org>
+Date: Tue, 25 Feb 2014 13:54:14 +0100
+Message-ID: <240fe7ae7e139129e9eb95213c1016d7 at example.org>
+User-Agent: Roundcube Webmail/0.9-0.3.el6.kolab_3.0
+To: %s
+Subject: "test" has been created
+
+--=_c8894dbdb8baeedacae836230e3436fd
+Content-Type: text/plain; charset=UTF-8; format=flowed
+Content-Transfer-Encoding: quoted-printable
+
+*test*
+
+--=_c8894dbdb8baeedacae836230e3436fd
+Content-Type: text/calendar; charset=UTF-8; method=REQUEST; name=event.ics
+Content-Disposition: attachment; filename=event.ics
+Content-Transfer-Encoding: 8bit
+
+BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN
+CALSCALE:GREGORIAN
+METHOD:REQUEST
+BEGIN:VEVENT
+UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271
+DTSTAMP:20120713T1254140
+DTSTART;TZID=Europe/London:20140713T100000
+DTEND;TZID=Europe/London:20140713T160000
+SUMMARY:test
+DESCRIPTION:test
+ORGANIZER;CN="Doe, John":mailto:john.doe at example.org
+ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=NEEDS-ACTION;RSVP=TRUE:mailto:%s
+TRANSP:OPAQUE
+END:VEVENT
+END:VCALENDAR
+--=_c8894dbdb8baeedacae836230e3436fd--
+"""
+
+class TestResourceInvitation(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(self):
+ from tests.functional.purge_users import purge_users
+ purge_users()
+
+ self.john = {
+ 'displayname': 'John Doe',
+ 'mail': 'john.doe at example.org',
+ 'sender': 'John Doe <john.doe at example.org>'
+ }
+
+ from tests.functional.user_add import user_add
+ user_add("John", "Doe")
+
+ funcs.purge_resources()
+ self.audi = funcs.resource_add("car", "Audi A4")
+ self.passat = funcs.resource_add("car", "VW Passat")
+ self.boxter = funcs.resource_add("car", "Porsche Boxter S")
+ self.cars = funcs.resource_add("collection", "Company Cars", [ self.audi['dn'], self.passat['dn'], self.boxter['dn'] ])
+
+ time.sleep(1)
+ from tests.functional.synchronize import synchronize_once
+ synchronize_once()
+
+ def send_message(self, msg_source, to_addr, from_addr=None):
+ if from_addr is None:
+ from_addr = self.john['mail']
+
+ smtp = smtplib.SMTP('localhost', 10026)
+ smtp.sendmail(from_addr, to_addr, msg_source)
+
+ def send_itip_invitation(self, resource_email):
+ self.send_message(itip_invitation % (resource_email, resource_email), resource_email)
+
+ def check_message_received(self, subject):
+ imap = IMAP()
+ imap.connect()
+ imap.set_acl("user/john.doe at example.org", "cyrus-admin", "lrs")
+ imap.imap.m.select("user/john.doe at example.org")
+
+ found = None
+ max_tries = 20
+
+ while not found and max_tries > 0:
+ max_tries -= 1
+
+ typ, data = imap.imap.m.search(None, 'ALL')
+ for num in data[0].split():
+ typ, msg = imap.imap.m.fetch(num, '(RFC822)')
+ message = message_from_string(msg[0][1])
+ if message['Subject'] == subject:
+ found = message
+ break
+
+ time.sleep(1)
+
+ return found
+
+
+ def test_001_resource_from_email_address(self):
+ resource = module_resources.resource_record_from_email_address(self.audi['mail'])
+ self.assertEqual(len(resource), 1)
+ self.assertEqual(resource[0], self.audi['dn'])
+
+ collection = module_resources.resource_record_from_email_address(self.cars['mail'])
+ self.assertEqual(len(collection), 1)
+ self.assertEqual(collection[0], self.cars['dn'])
+
+
+ def test_002_invite_resource(self):
+ self.send_itip_invitation(self.audi['mail'])
+
+ response = self.check_message_received("Meeting Request ACCEPTED")
+ self.assertIsInstance(response, email.message.Message)
+
More information about the commits
mailing list