[Kolab-devel] some python code for cyrusimap: annotation and sieve
Alain Spineux
aspineux at gmail.com
Fri Jan 19 02:09:45 CET 2007
Hello I use Kolab as a base for a new service I want to propose.
I'm writing the application in python 2.4 using turbogears framework.
python imaplib support already very well ACL with cyrus, but
I had to extend the python imaplib library to include annotations to
manage the "expire" option
I also extended a sieve library I found on the web to manage STARTTLS
http://www.crazy-compilers.com/py-lib/managesieve.html
If someone want to add some extension, I recommend the intensive use of tcpdump
to compare a working tools like cyradm or sieveshell with your code (as I did)
Here is my code, first imaplib and last managesieve
Hope this help
============ cyrusimap.py =============
"""IMAP4 Client
Extend default python imaplib to include ANNOTATION as described in
http://www.potaroo.net/ietf/all-ids/draft-daboo-imap-annotatemore-00.txt
This extention is used by cyrus imap, for exemple to manage automatique
removale of expired mail based on '/vendor/cmu/cyrus-imapd/expir' annotation
"""
"""
Some usage
>>> i.getannotation(' user/alain.spineux at asxnet.loc', '"*"', '"value.shared"')
>>> i.getannotation('user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '" value.shared"')
('OK', ['"user/alain.spineux at asxnet.loc"
"/vendor/cmu/cyrus-imapd/expire" ("value.shared" "44")'])
>>> i.getannotation('user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '("value.shared")')
('OK', ['" user/alain.spineux at asxnet.loc"
"/vendor/cmu/cyrus-imapd/expire" ("value.shared" "44")'])
>>> i.getannotation(' user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '("*")')
('OK', ['"user/alain.spineux at asxnet.loc"
"/vendor/cmu/cyrus-imapd/expire" (" value.shared" "44"
"content-type.shared" "text/plain" "size.shared" "2"
"modifiedsince.shared" "1156264470")'])
>>> i.getannotation ('user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '("value.shared"
"content-type.shared")')
('OK', ['" user/alain.spineux at asxnet.loc"
"/vendor/cmu/cyrus-imapd/expire" ("value.shared" "44"
"content-type.shared" "text/plain")'])
>>> i.setannotation('user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '("value.shared" "44")')
('OK', [None])
>>> i.setannotation('user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '("value.shared" NIL)')
('OK', [None])
>>> i.getannotation('user/alain.spineux at asxnet.loc',
'/vendor/cmu/cyrus-imapd/expire', '("value.shared")')
('OK', [None])
some more
>>> i=cyrusimap.CYRUSIMAP4('localhost')
>>> i.login('manager', 'password')
('OK', ['User logged in'])
>>> i.getcyrusexpire("user/alain.spineux at asxnet.loc")
('OK', 0)
>>> i.setcyrusexpire("user/alain.spineux at asxnet.loc ", 88)
('OK', [None])
>>> i.getcyrusexpire("user/alain.spineux at asxnet.loc")
('OK', 88)
>>> i.setcyrusexpire ("user/alain.spineux at asxnet.loc", None)
('OK', [None])
>>> i.getcyrusexpire("user/alain.spineux at asxnet.loc ")
('OK', 0)
"""
import imaplib
imaplib.Commands.update(
{
'GETANNOTATION': ('AUTH', 'SELECTED'),
'SETANNOTATION': ('AUTH', 'SELECTED'),
})
class CYRUSIMAP4(imaplib.IMAP4):
def getannotation(self, root, entry, attrib):
"""Get annotation
(typ, [data]) = <instance>.getannotation(self, root, entry, attrib)
"""
typ, dat = self._simple_command('GETANNOTATION', root, entry, attrib)
return self._untagged_response(typ, dat, 'ANNOTATION')
def setannotation(self, root, entry, value):
"""Set annotation value.
(typ, [data]) = <instance>.setannotation(root, limits)
"""
typ, dat = self._simple_command('SETANNOTATION', root, entry, value)
return self._untagged_response(typ, dat, 'ANNOTATION')
def getcyrusexpire(self, root):
"""Get cyrus 'expire' annotation value.
(typ, [data]) = <instance>.getcyrusexpire(root)
"""
typ, dat=self.getannotation(root,
'/vendor/cmu/cyrus-imapd/expire', '("value.shared")')
if typ!='OK':
return typ, dat
if dat[0]==None:
return typ, 0
# ['"user/alain.spineux at asxnet.loc"
"/vendor/cmu/cyrus-imapd/expire" (" value.shared" "44")'])
v=int(dat[0].split(None,2)[2].strip('()').split(None,1)[1].strip('"'))
return typ, v
def setcyrusexpire(self, root, value):
"""Get cyrus 'expire' annotation value.
(typ, [data]) = <instance>.setcyrusexpire(root, value)
"""
if value==None or value==0:
v='NIL'
else:
v='"%d"' % (value,)
return self.setannotation(root,
'/vendor/cmu/cyrus-imapd/expire', '("value.shared" %s)'%(v,))
==========================================================================
====================== managesieve.py =======================
"""Sieve management client.
A Protocol for Remotely Managing Sieve Scripts
Based on <draft-martin-managesieve-04.txt>
"""
__version__ = "0.4"
__author__ = """
Alain Spineux <alain_dot_spineux_at_gmail_com> August 2006 : STARTTLS
Hartmut Goebel < h.goebel at crazy-compilers.com>
Ulrich Eck <ueck at net-labs.de> April 2001
"""
import binascii, re, socket, time, random, sys
__all__ = [ 'MANAGESIEVE', 'SIEVE_PORT', 'OK', 'NO', 'BYE', 'Debug']
#from imaplib import _log, _mesg
def _mesg(s, secs=None):
if secs is None:
secs = time.time()
tm = time.strftime('%M:%S', time.localtime(secs))
sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s))
sys.stderr.flush()
def _log(line):
pass
Debug = 0
CRLF = '\r\n'
SIEVE_PORT = 2000
OK = 'OK'
NO = 'NO'
BYE = 'BYE'
# todo: return results or raise exceptions?
# todo: on result 'BYE' quit immediatly
# todo: raise exception on 'BYE'?
# Commands
commands = {
# name valid states
'AUTHENTICATE': ('NONAUTH',),
'LOGOUT': ('NONAUTH', 'AUTH', 'LOGOUT'),
'CABABILTY': ('NONAUTH', 'AUTH'),
'GETSCRIPT': ('AUTH', ),
'PUTSCRIPT': ('AUTH', ),
'SETACTIVE': ('AUTH', ),
'DELETESCRIPT': ('AUTH', ),
'LISTSCRIPTS': ('AUTH', ),
'HAVESPACE': ('AUTH', ),
}
### needed
Oknobye = re.compile(r'(?P<type>(OK|NO|BYE))'
r'( \((?P<code>.*)\))?'
r'( (?P<data>.*))?')
# draft-martin-managesieve-04.txt defines the size tag of literals to
# contain a '+' (plus sign) behind teh digits, but timsieved does not
# send one. Thus we are less strikt here:
Literal = re.compile(r'.*{(?P<size>\d+)\+?}$')
re_dquote = re.compile(r'"(([^"\\]|\\.)*)"')
re_esc_quote = re.compile(r'\\([\\"])')
def sieve_name(name):
# todo: correct quoting
return '"%s"' % name
def sieve_string(string):
return '{%d+}%s%s' % ( len(string), CRLF, string )
class MANAGESIEVE:
"""Sieve client class.
Instantiate with: MANAGESIEVE(host [, port])
host - host's name (default: localhost)
port - port number (default: standard Sieve port).
All Sieve commands are supported by methods of the same
name (in lower-case).
Each command returns a tuple: (type, [data, ...]) where 'type'
is usually 'OK' or 'NO', and 'data' is either the text from the
tagged response, or untagged results from command.
All arguments to commands are converted to strings, except for
AUTHENTICATE.
"""
"""
However, the 'password' argument to the LOGIN command is always
quoted. If you want to avoid having an argument string quoted (eg:
the 'flags' argument to STORE) then enclose the string in
parentheses (eg: "(\Deleted)").
Errors raise the exception class <instance>.error("<reason>").
IMAP4 server errors raise <instance>.abort("<reason>"),
which is a sub-class of 'error'. Mailbox status changes
from READ-WRITE to READ-ONLY raise the exception class
<instance>.readonly("<reason>"), which is a sub-class of 'abort'.
"error" exceptions imply a program error.
"abort" exceptions imply the connection should be reset, and
the command re-tried.
"readonly" exceptions imply the command should be re-tried.
Note: to use this module, you must read the RFCs pertaining
to the IMAP4 protocol, as the semantics of the arguments to
each IMAP4 command are left to the invoker, not to mention
the results.
"""
class error(Exception): """Logical errors - debug required"""
class abort(error): """Service errors - close and retry"""
def __init__(self, host='', port=SIEVE_PORT):
self.host = host
self.port = port
self.debug = Debug
self.state = 'NONAUTH'
self.response_text = self.response_code = None
self.capabilities = []
self.loginmechs = []
self.implementation = ''
self.supports_tls = 0
# Open socket to server.
self._open(host, port)
# Get server welcome message,
# request and store CAPABILITY response.
if __debug__:
if self.debug >= 1:
_mesg('managesieve version %s' % __version__)
typ, data = self._get_response()
if typ == 'OK':
self._parse_capabilities(data)
return
def _parse_capabilities(self, lines):
for line in lines:
#ASX[ for "STARTTLS"
try:
typ, data=line
except:
typ, data=line[0], ""
#]ASX
if __debug__:
if self.debug >= 3:
_mesg('%s: %r' % (typ, data))
if typ == "IMPLEMENTATION":
self.implementation = data
elif typ == "SASL":
self.loginmechs = data.split()
elif typ == "SIEVE":
self.capabilities = data.split()
elif typ == "STARTTLS":
self.supports_tls = 1
else:
# A client implementation MUST ignore any other
# capabilities given that it does not understand.
pass
return
def __getattr__(self, attr):
# Allow UPPERCASE variants of MANAGESIEVE command methods.
if commands.has_key(attr):
return getattr(self, attr.lower())
raise AttributeError("Unknown MANAGESIEVE command: '%s'" % attr)
#### Private methods ###
def _open(self, host, port):
"""Setup 'self.sock' and 'self.file'."""
self.sock = socket.socket(socket.AF_INET , socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self.file = self.sock.makefile('r')
def _close(self):
self.file.close()
self.sock.close()
def _read(self, size):
"""Read 'size' bytes from remote."""
return self.file.read(size)
def _readline(self):
"""Read line from remote."""
return self.file.readline()
def _send(self, data):
return self.sock.send(data)
def _get_line(self):
line = self._readline()
if not line:
raise self.abort('socket error: EOF')
# Protocol mandates all lines terminated by CRLF
line = line[:-2]
if __debug__:
if self.debug >= 4:
_mesg('< %s' % line)
else:
_log('< %s' % line)
return line
def _simple_command(self, *args):
"""Execute a command which does only return status.
Returns (typ) with
typ = response type
The responce code and text may be found in <instance>.response_code
and <instance>.response_text, respectivly.
"""
return self._command(*args)[0] # only return typ, ignore data
def _command(self, name, arg1=None, arg2=None, *options):
"""
Returns (typ, data) with
typ = response type
data = list of lists of strings read (only meaningfull if OK)
The responce code and text may be found in <instance>.response_code
and <instance>.response_text, respectivly.
"""
if self.state not in commands[name]:
raise self.error(
'Command %s illegal in state %s' % (name, self.state))
# concatinate command and arguments (if any)
data = " ".join(filter(None, (name, arg1, arg2)))
if __debug__:
if self.debug >= 4: _mesg('> %r' % data)
else: _log('> %s' % data)
try:
self._send('%s%s' % (data, CRLF))
for o in options:
if __debug__:
if self.debug >= 4: _mesg('> %r' % o)
else: _log('> %r' % data)
self._send('%s%s' % (o, CRLF))
except (socket.error, OSError), val:
raise self.abort('socket error: %s' % val)
return self._get_response()
def _readstring(self, data):
if data[0] == ' ': # space -> error
raise self.error('Unexpected space: %r' % data)
elif data[0] == '"': # handle double quote:
if not self._match(re_dquote, data):
raise self.error('Unmatched quote: %r' % data)
snippet = self.mo.group(1)
return re_esc_quote.sub(r'\1', snippet), data[ self.mo.end():]
elif self._match(Literal, data):
# read a 'literal' string
size = int(self.mo.group('size'))
if __debug__:
if self.debug >= 4:
_mesg('read literal size %s' % size)
return self._read(size), self._get_line()
else:
for i in range(len(data)):
if data[i] == ' ':
return data[:i], data[i+1:]
else:
return data, ''
def _get_response(self):
"""
Returns (typ, data) with
typ = response type
data = list of lists of strings read (only meaningfull if OK)
The responce code and text may be found in <instance>.response_code
and <instance>.response_text, respectivly.
"""
"""
response-deletescript = response-oknobye
response-authenticate = *(string CRLF) (response-oknobye)
response-capability = *(string [SP string] CRLF) response-oknobye
response-listscripts = *(string [SP "ACTIVE"] CRLF) response-oknobye
response-oknobye = ("OK" / "NO" / "BYE") [SP "(" resp-code
")"] [SP string] CRLF
string = quoted / literal
quoted = <"> *QUOTED-CHAR <">
literal = "{" number "+}" CRLF *OCTET
;; The number represents the number of octets
;; MUST be literal-utf8 except for values
--> a response either starts with a quote-charakter, a left-bracket or
OK, NO, BYE
"quoted" CRLF
"quoted" SP "quoted" CRLF
{size} CRLF *OCTETS CRLF
{size} CRLF *OCTETS CRLF
[A-Z-]+ CRLF
"""
data = [] ; dat = None
resp = self._get_line()
while 1:
if self._match(Oknobye, resp):
typ, code, dat = self.mo.group('type','code','data')
if __debug__:
if self.debug >= 1:
_mesg('%s response: %s %s' % (typ, code, dat))
self.response_code = code
self.response_text = None
if dat:
self.response_text = self._readstring(dat)[0]
return typ, data
## elif 0:
## dat2 = None
## dat, resp = self._readstring(resp)
## if resp.startswith(' '):
## dat2, resp = self._readstring(resp[1:])
## data.append( (dat, dat2))
## resp = self._get_line()
else:
dat = []
while 1:
dat1, resp = self._readstring(resp)
if __debug__:
if self.debug >= 4:
_mesg('read: %r' % (dat1,))
if self.debug >= 5:
_mesg('rest: %r' % (resp,))
dat.append(dat1)
if not resp.startswith(' '):
break
resp = resp[1:]
data.append(dat)
resp = self._get_line()
return self.error('Should not come here')
def _match(self, cre, s):
# Run compiled regular expression match method on 's'.
# Save result, return success.
self.mo = cre.match(s)
if __debug__:
if self.mo is not None and self.debug >= 5:
_mesg("\tmatched r'%s' => %s" % (cre.pattern,
`self.mo.groups()`))
return self.mo is not None
### Public methods ###
def authenticate(self, mechanism, *authobjects):
"""Authenticate command - requires response processing."""
# command-authenticate = "AUTHENTICATE" SP auth-type [SP
string] *(CRLF string)
# response-authenticate = *(string CRLF) (response-oknobye)
mech = mechanism.upper()
if not mech in self.loginmechs:
raise self.error("Server doesn't allow %s authentication." % mech)
if mech=='PLAIN':
st=binascii.b2a_base64('\0'.join(authobjects))[:-1]
typ, data = self._command('AUTHENTICATE',
sieve_name(mech), '{%d+}' % len(st), st)
else: # LOGIN
authobjects = [ sieve_name(binascii.b2a_base64(ao)[:-1])
for ao in authobjects ]
typ, data = self._command('AUTHENTICATE',
sieve_name(mech), *authobjects)
if typ == 'OK':
self.state = 'AUTH'
return typ
def login(self, auth, user, password):
""" Login to the Sieve server using mechanism LOGIN. """
if 'PLAIN' in self.loginmechs:
return self.authenticate('PLAIN', auth, user, password)
return self.authenticate('LOGIN', user, password)
def logout(self):
"""Terminate connection to server."""
# command-logout = "LOGOUT" CRLF
# response-logout = response-oknobye
typ = self._simple_command('LOGOUT')
self.state = 'LOGOUT'
self._close()
return typ
def listscripts(self):
"""Get a list of scripts on the server.
(typ, [data]) = <instance>.listscripts()
if 'typ' is 'OK', 'data' is list of (scriptname, active) tuples.
"""
# command-listscripts = "LISTSCRIPTS" CRLF
# response-listscripts = *(sieve-name [SP "ACTIVE"] CRLF)
response-oknobye
typ, data = self._command('LISTSCRIPTS')
if typ != 'OK': return typ, data
scripts = []
for dat in data:
if __debug__:
if not len(dat) in (1, 2):
self.error("Unexpected result from LISTSCRIPTS: %r" (dat,))
scripts.append( (dat[0], len(dat) == 2) )
return typ, scripts
def getscript(self, scriptname):
"""Get a script from the server.
(typ, scriptdata) = <instance>.getscript(scriptname)
'scriptdata' is the script data.
"""
# command-getscript = "GETSCRIPT" SP sieve-name CRLF
# response-getscript = [string CRLF] response-oknobye
typ, data = self._command('GETSCRIPT', sieve_name(scriptname))
if typ != 'OK': return typ, data
if len(data) != 1:
self.error('GETSCRIPT returned more than one string/script')
# todo: decode data?
return typ, data[0][0]
def putscript(self, scriptname, scriptdata):
"""Put a script onto the server."""
# command-putscript = "PUTSCRIPT" SP sieve-name SP string CRLF
# response-putscript = response-oknobye
return self._simple_command('PUTSCRIPT',
sieve_name(scriptname),
sieve_string(scriptdata)
)
def deletescript(self, scriptname):
"""Delete a scripts at the server."""
# command-deletescript = "DELETESCRIPT" SP sieve-name CRLF
# response-deletescript = response-oknobye
return self._simple_command('DELETESCRIPT', sieve_name(scriptname))
def setactive(self, scriptname):
"""Mark a script as the 'active' one."""
# command-setactive = "SETACTIVE" SP sieve-name CRLF
# response-setactive = response-oknobye
return self._simple_command('SETACTIVE', sieve_name(scriptname))
def havespace(self, scriptname, size):
# command-havespace = "HAVESPACE" SP sieve-name SP number CRLF
# response-havespace = response-oknobye
return self._simple_command('HAVESPACE',
sieve_name(scriptname),
str(size))
def capability(self):
"""
Isse a CAPABILITY command and return the result.
As a side-effect, on succes these attributes are (re)set:
self.implementation
self.loginmechs
self.capabilities
self.supports_tls
"""
# command-capability = "CAPABILITY" CRLF
# response-capability = *(string [SP string] CRLF) response-oknobye
typ, data = self._command('CAPABILITY')
if typ == 'OK':
self._parse_capabilities(data)
return typ, data
### not yet implemented: ###
# command-starttls = "STARTTLS" CRLF
# response-starttls = response-oknobye
===============================================
And finaly some usage example :
# -----------------------------------------------------------------
@staticmethod
def SieveLogin(user_name):
"""return a connected sieve session"""
auth_name, password=imap_login, imap_password
# connect to sieve server
sieve=managesieve.MANAGESIEVE(imap_server)
if not sieve:
msg='Cannot connect to sieve server "%s"' % imap_server
log.default.error('SieveLogin: %s', msg)
raise EgSieveError, msg
# login to sieve server like user_name but using auth_name credential
typ=sieve.login(user_name, auth_name, password)
if typ!='OK':
msg='Cannot login to sieve server like "%s" using "%s"
credential (%s, %s, %s)' % (auth_name, user_name, typ,
sieve.response_text, sieve.response_code)
log.default.error('SieveLogin: %s', msg)
sieve.logout()
raise EgSieveError, msg
return sieve
# -----------------------------------------------------------------
@staticmethod
def SieveInstallActiveScript(user_name, script_name, sieve_script):
"""Install and activate a sieve script
require "fileinto";
redirect " first.last at mydomain.com"; keep;
"""
sieve=Kolab.SieveLogin(user_name)
typ=sieve.putscript(script_name, sieve_script)
if typ!='OK':
msg='Cannot put script "%s" for user %s (%s, %s, %s)' %
(script_name, user_name, typ, sieve.response_text,
sieve.response_code)
log.default.error ('SieveInstallActiveScript: %s', msg)
sieve.logout()
raise EgSieveError, msg
typ=sieve.setactive(script_name)
if typ!='OK':
msg='Cannot activate sieve script "%s" for user %s (%s,
%s, %s)' % (script_name, user_name, typ, sieve.response_text,
sieve.response_code)
log.default.error('SieveInstallActiveScript: %s', msg)
sieve.logout()
raise EgSieveError, msg
# -----------------------------------------------------------------
@staticmethod
def SieveGetForwardInfo(user_name):
"""retrieve forward info from installed sieve script"""
sieve=Kolab.SieveLogin(user_name)
# get scripts list
typ, dat=sieve.listscripts()
if typ!='OK':
msg='Cannot get scripts list for user %s (%s, %s, %s)' %
(user_name, typ, sieve.response_text, sieve.response_code)
log.default.error('SieveGetForwardInfo: %s', msg)
sieve.logout()
raise EgSieveError, msg
# retrieve info
enable, forward_addr, keep=False, '', True
for scriptname, activ in dat:
if scriptname!='kolab-forward.siv':
continue
typ, dat=sieve.getscript(scriptname)
if typ!='OK':
msg='Cannot get scripts "%s" for user %s (%s %s %s)' %
(scriptname, user_name, typ, sieve.response_text, sieve.response_code)
log.default.error('SieveGetForwardInfo: %s', msg)
sieve.logout()
raise EgSieveError, msg
try:
# extract data from script
(forward_addr,
keep)=re.match('.*\nredirect\s+\"(.*)\"\s*;\s*(keep\s*.*;){0,1}',
dat).groups()
keep=(keep!=None)
enable=activ
except:
msg='Forwad scripts "%s", invalid format : "%s"' %
(scriptname, dat)
log.default.error('SieveGetForwardInfo: %s', msg)
sieve.logout ()
raise EgSieveError, msg
break
sieve.logout()
return enable, forward_addr, keep
# -----------------------------------------------------------------
@staticmethod
def SieveSetForwardInfo(user_name, enable, forward_addr, keep):
"""set forward info into sieve script
require "fileinto";
redirect " first.last at mydomain.com"; keep;
"""
sieve=Kolab.SieveLogin(user_name)
scriptname='kolab-forward.siv'
if enable:
script='require "fileinto";\nredirect "%s";' % forward_addr
if keep:
script+=' keep;'
typ=sieve.putscript(scriptname, script)
if typ!='OK':
msg='Cannot put script "%s" for user %s (%s %s %s)' %
(scriptname, user_name, typ, sieve.response_text, sieve.response_code)
log.default.error('SieveSetForwardInfo: %s', msg)
sieve.logout()
raise EgSieveError, msg
else:
scriptname=''
typ=sieve.setactive(scriptname)
if typ!='OK' and scriptname!='':
msg='Cannot activate sieve script "%s" for user %s (%s %s
%s)' % (scriptname, user_name, typ, sieve.response_text,
sieve.response_code)
log.default.error('SieveSetForwardInfo: %s', msg)
sieve.logout()
raise EgSieveError, msg
sieve.logout()
--
Alain Spineux
aspineux gmail com
May the sources be with you
More information about the devel
mailing list