summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.txt17
-rw-r--r--docs/api/authentication.rst4
-rw-r--r--repoze/bfg/authentication.py320
-rw-r--r--repoze/bfg/interfaces.py2
-rw-r--r--repoze/bfg/tests/test_authentication.py405
5 files changed, 708 insertions, 40 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 2562d23ab..3c8445fa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,20 @@
+Next release
+============
+
+Features
+--------
+
+- Add an AuthTktAuthenticationPolicy. This policy retrieves
+ credentials from an auth_tkt cookie managed by the application
+ itself (instead of relying on an upstream data source for
+ authentication data). See the Security API chapter of the
+ documentation for more info.
+
+- Allow RemoteUserAuthenticationPolicy and
+ RepozeWho1AuthenticationPolicy to accept various constructor
+ arguments. See the Security API chapter of the documentation for
+ more info.
+
0.9a5 (2009-05-28)
==================
diff --git a/docs/api/authentication.rst b/docs/api/authentication.rst
index f51eeb302..6514de510 100644
--- a/docs/api/authentication.rst
+++ b/docs/api/authentication.rst
@@ -8,7 +8,9 @@
Authentication Policies
~~~~~~~~~~~~~~~~~~~~~~~
-.. autoclass:: RepozeWho1AuthenticationPolicy
+.. autoclass:: AuthTktAuthenticationPolicy
.. autoclass:: RemoteUserAuthenticationPolicy
+.. autoclass:: RepozeWho1AuthenticationPolicy
+
diff --git a/repoze/bfg/authentication.py b/repoze/bfg/authentication.py
index 487a5e6a8..e7d4ef06e 100644
--- a/repoze/bfg/authentication.py
+++ b/repoze/bfg/authentication.py
@@ -1,13 +1,78 @@
+from codecs import utf_8_decode
+from codecs import utf_8_encode
+import crypt
+import os
+import stat
+import StringIO
+import time
+import traceback
+
+from paste.request import get_cookies
+from paste.auth import auth_tkt
+
from zope.interface import implements
+from zope.component import queryUtility
+
from repoze.bfg.interfaces import IAuthenticationPolicy
from repoze.bfg.security import Everyone
from repoze.bfg.security import Authenticated
-class RepozeWho1AuthenticationPolicy(object):
+class CallbackAuthenticationPolicy(object):
+ """ Abstract class """
+ def authenticated_userid(self, context, request):
+ userid = self._get_userid(request)
+ if userid is None:
+ return None
+ if self.callback is None:
+ return userid
+ if self.callback(userid) is not None: # is not None!
+ return userid
+
+ def effective_principals(self, context, request):
+ effective_principals = [Everyone]
+ userid = self._get_userid(request)
+ if userid is None:
+ return effective_principals
+ if self.callback is None:
+ groups = []
+ else:
+ groups = self.callback(userid)
+ if groups is None: # is None!
+ return effective_principals
+ effective_principals.append(Authenticated)
+ effective_principals.append(userid)
+ effective_principals.extend(groups)
+
+ return effective_principals
+
+
+class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy):
""" A BFG authentication policy which obtains data from the
- repoze.who 1.X WSGI API """
+ repoze.who 1.X WSGI 'API' (the ``repoze.who.identity`` key in the
+ WSGI environment).
+
+ Constructor Arguments
+ ---------------------
+
+ ``identifier_name``
+
+ Default: ``auth_tkt``. The who plugin name that performs
+ remember/forget. Optional.
+
+ ``callback``
+
+ Default: ``None``. A callback passed the repoze.who identity,
+ expected to return None if the user represented by the
+ identity doesn't exist or a sequence of group identifiers
+ (possibly empty) if the user does exist. If ``callback`` is
+ None, the userid will be assumed to exist with no groups.
+
+ """
implements(IAuthenticationPolicy)
- identifier_name = 'auth_tkt'
+
+ def __init__(self, identifier_name='auth_tkt', callback=None):
+ self.identifier_name = identifier_name
+ self.callback = callback
def _get_identity(self, request):
return request.environ.get('repoze.who.identity')
@@ -23,17 +88,24 @@ class RepozeWho1AuthenticationPolicy(object):
identity = self._get_identity(request)
if identity is None:
return None
- return identity['repoze.who.userid']
+ if self.callback is None:
+ return identity['repoze.who.userid']
+ if self.callback(identity) is not None: # is not None!
+ return identity['repoze.who.userid']
def effective_principals(self, context, request):
effective_principals = [Everyone]
identity = self._get_identity(request)
if identity is None:
return effective_principals
-
- effective_principals.append(Authenticated)
+ if self.callback is None:
+ groups = []
+ else:
+ groups = self.callback(identity)
+ if groups is None: # is None!
+ return effective_principals
userid = identity['repoze.who.userid']
- groups = identity.get('groups', [])
+ effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
@@ -53,34 +125,230 @@ class RepozeWho1AuthenticationPolicy(object):
return []
identity = self._get_identity(request)
return identifier.forget(request.environ, identity)
-
-class RemoteUserAuthenticationPolicy(object):
+
+class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A BFG authentication policy which obtains data from the
- REMOTE_USER WSGI envvar """
- implements(IAuthenticationPolicy)
+ REMOTE_USER WSGI envvar.
- def _get_identity(self, request):
- return request.environ.get('REMOTE_USER')
+ Constructor Arguments
+ ---------------------
- def authenticated_userid(self, context, request):
- identity = self._get_identity(request)
- if identity is None:
- return None
- return identity
+ ``environ_key``
- def effective_principals(self, context, request):
- effective_principals = [Everyone]
- identity = self._get_identity(request)
- if identity is None:
- return effective_principals
+ Default: ``REMOTE_USER``. The key in the WSGI environ which
+ provides the userid.
- effective_principals.append(Authenticated)
- effective_principals.append(identity)
+ ``callback``
- return effective_principals
+ Default: ``None``. A callback passed the userid, expected to return
+ None if the userid doesn't exist or a sequence of group identifiers
+ (possibly empty) if the user does exist. If ``callback`` is None,
+ the userid will be assumed to exist with no groups.
+ """
+ implements(IAuthenticationPolicy)
+
+ def __init__(self, environ_key='REMOTE_USER', callback=None):
+ self.environ_key = environ_key
+ self.callback = callback
+
+ def _get_userid(self, request):
+ return request.environ.get(self.environ_key)
def remember(self, context, request, principal, **kw):
return []
def forget(self, context, request):
return []
+
+class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
+ """ A BFG authentication policy which obtains data from an
+ auth_tkt cookie.
+
+ Constructor Arguments
+ ---------------------
+
+ ``secret``
+
+ The secret (a string) used for auth_tkt cookie encryption.
+ Required.
+
+ ``callback``
+
+ Default: ``None``. A callback passed the userid, expected to return
+ None if the userid doesn't exist or a sequence of group identifiers
+ (possibly empty) if the user does exist. If ``callback`` is None,
+ the userid will be assumed to exist with no groups.
+
+ ``cookie_name``
+
+ Default: ``repoze.bfg.auth_tkt``. The cookie name used
+ (string). Optional.
+
+ ``secure``
+
+ Default: ``False``. Only send the cookie back over a secure
+ conn. Optional.
+
+ ``include_ip``
+
+ Default: ``False``. Make the requesting IP address part of
+ the authentication data in the cookie. Optional.
+
+ """
+ implements(IAuthenticationPolicy)
+ def __init__(self,
+ secret,
+ callback=None,
+ cookie_name='repoze.bfg.auth_tkt',
+ secure=False,
+ include_ip=False):
+ self.cookie = AuthTktCookieHelper(
+ secret,
+ cookie_name=cookie_name,
+ secure=secure,
+ include_ip=include_ip,
+ )
+ self.callback = callback
+
+ def _get_userid(self, request):
+ result = self.cookie.identify(request)
+ if result:
+ return result['userid']
+
+ def remember(self, context, request, principal, **kw):
+ return self.cookie.remember(request, principal)
+
+ def forget(self, context, request):
+ return self.cookie.forget(request)
+
+class AuthTktCookieHelper(object):
+ userid_type_decoders = {
+ 'int':int,
+ 'unicode':lambda x: utf_8_decode(x)[0],
+ }
+
+ userid_type_encoders = {
+ int: ('int', str),
+ long: ('int', str),
+ unicode: ('unicode', lambda x: utf_8_encode(x)[0]),
+ }
+
+ def __init__(self, secret, cookie_name='auth_tkt', secure=False,
+ include_ip=False):
+ self.secret = secret
+ self.cookie_name = cookie_name
+ self.include_ip = include_ip
+ self.secure = secure
+
+ # IIdentifier
+ def identify(self, request):
+ environ = request.environ
+ cookies = get_cookies(environ)
+ cookie = cookies.get(self.cookie_name)
+
+ if cookie is None or not cookie.value:
+ return None
+
+ if self.include_ip:
+ remote_addr = environ['REMOTE_ADDR']
+ else:
+ remote_addr = '0.0.0.0'
+
+ try:
+ timestamp, userid, tokens, user_data = auth_tkt.parse_ticket(
+ self.secret, cookie.value, remote_addr)
+ except auth_tkt.BadTicket:
+ return None
+
+ userid_typename = 'userid_type:'
+ user_data_info = user_data.split('|')
+ for datum in filter(None, user_data_info):
+ if datum.startswith(userid_typename):
+ userid_type = datum[len(userid_typename):]
+ decoder = self.userid_type_decoders.get(userid_type)
+ if decoder:
+ userid = decoder(userid)
+
+ environ['REMOTE_USER_TOKENS'] = tokens
+ environ['REMOTE_USER_DATA'] = user_data
+ environ['AUTH_TYPE'] = 'cookie'
+
+ identity = {}
+ identity['timestamp'] = timestamp
+ identity['userid'] = userid
+ identity['tokens'] = tokens
+ identity['userdata'] = user_data
+ return identity
+
+ def _get_cookies(self, environ, value):
+ cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
+ wild_domain = '.' + cur_domain
+ cookies = [
+ ('Set-Cookie', '%s="%s"; Path=/' % (
+ self.cookie_name, value)),
+ ('Set-Cookie', '%s="%s"; Path=/; Domain=%s' % (
+ self.cookie_name, value, cur_domain)),
+ ('Set-Cookie', '%s="%s"; Path=/; Domain=%s' % (
+ self.cookie_name, value, wild_domain))
+ ]
+ return cookies
+
+ # IIdentifier
+ def forget(self, request):
+ # return a set of expires Set-Cookie headers
+ environ = request.environ
+ return self._get_cookies(environ, '""')
+
+ # IIdentifier
+ def remember(self, request, userid, tokens='', userdata=''):
+ environ = request.environ
+ if self.include_ip:
+ remote_addr = environ['REMOTE_ADDR']
+ else:
+ remote_addr = '0.0.0.0'
+
+ cookies = get_cookies(environ)
+ old_cookie = cookies.get(self.cookie_name)
+ existing = cookies.get(self.cookie_name)
+ old_cookie_value = getattr(existing, 'value', None)
+
+ timestamp, old_userid, old_tokens, old_userdata = None, '', '', ''
+
+ if old_cookie_value:
+ try:
+ (timestamp,old_userid,old_tokens,
+ old_userdata) = auth_tkt.parse_ticket(
+ self.secret, old_cookie_value, remote_addr)
+ except auth_tkt.BadTicket:
+ pass
+
+ encoding_data = self.userid_type_encoders.get(type(userid))
+ if encoding_data:
+ encoding, encoder = encoding_data
+ userid = encoder(userid)
+ userdata = 'userid_type:%s' % encoding
+
+ if not isinstance(tokens, basestring):
+ tokens = ','.join(tokens)
+ if not isinstance(old_tokens, basestring):
+ old_tokens = ','.join(old_tokens)
+ old_data = (old_userid, old_tokens, old_userdata)
+ new_data = (userid, tokens, userdata)
+
+ if old_data != new_data:
+ ticket = auth_tkt.AuthTicket(
+ self.secret,
+ userid,
+ remote_addr,
+ tokens=tokens,
+ user_data=userdata,
+ cookie_name=self.cookie_name,
+ secure=self.secure)
+ new_cookie_value = ticket.cookie_value()
+
+ cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
+ wild_domain = '.' + cur_domain
+ if old_cookie_value != new_cookie_value:
+ # return a set of Set-Cookie headers
+ return self._get_cookies(environ, new_cookie_value)
+
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index c4bafbe20..282e3756a 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -212,7 +212,7 @@ class IRoutesContextFactory(Interface):
"""
class IAuthenticationPolicy(Interface):
- """ A multi-adapter on context and request """
+ """ An object representing a BFG authentication policy. """
def authenticated_userid(context, request):
""" Return the authenticated userid or ``None`` if no
authenticated userid can be found. """
diff --git a/repoze/bfg/tests/test_authentication.py b/repoze/bfg/tests/test_authentication.py
index a23ffeac2..b0b1e084a 100644
--- a/repoze/bfg/tests/test_authentication.py
+++ b/repoze/bfg/tests/test_authentication.py
@@ -5,8 +5,8 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
from repoze.bfg.authentication import RepozeWho1AuthenticationPolicy
return RepozeWho1AuthenticationPolicy
- def _makeOne(self):
- return self._getTargetClass()()
+ def _makeOne(self, identifier_name='auth_tkt', callback=None):
+ return self._getTargetClass()(identifier_name, callback)
def test_class_implements_IAuthenticationPolicy(self):
from zope.interface.verify import verifyClass
@@ -31,6 +31,24 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
policy = self._makeOne()
self.assertEqual(policy.authenticated_userid(context, request), 'fred')
+ def test_authenticated_userid_with_callback_returns_None(self):
+ context = DummyContext()
+ request = DummyRequest(
+ {'repoze.who.identity':{'repoze.who.userid':'fred'}})
+ def callback(identity):
+ return None
+ policy = self._makeOne(callback=callback)
+ self.assertEqual(policy.authenticated_userid(context, request), None)
+
+ def test_authenticated_userid_with_callback_returns_something(self):
+ context = DummyContext()
+ request = DummyRequest(
+ {'repoze.who.identity':{'repoze.who.userid':'fred'}})
+ def callback(identity):
+ return ['agroup']
+ policy = self._makeOne(callback=callback)
+ self.assertEqual(policy.authenticated_userid(context, request), 'fred')
+
def test_effective_principals_None(self):
from repoze.bfg.security import Everyone
context = DummyContext()
@@ -56,13 +74,27 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
request = DummyRequest(
{'repoze.who.identity':{'repoze.who.userid':'fred',
'groups':['quux', 'biz']}})
- policy = self._makeOne()
+ def callback(identity):
+ return identity['groups']
+ policy = self._makeOne(callback=callback)
self.assertEqual(policy.effective_principals(context, request),
[Everyone, Authenticated, 'fred', 'quux', 'biz'])
+ def test_effective_principals_userid_callback_returns_None(self):
+ from repoze.bfg.security import Everyone
+ context = DummyContext()
+ request = DummyRequest(
+ {'repoze.who.identity':{'repoze.who.userid':'fred',
+ 'groups':['quux', 'biz']}})
+ def callback(identity):
+ return None
+ policy = self._makeOne(callback=callback)
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone])
+
def test_remember_no_plugins(self):
context = DummyContext()
- authtkt = DummyPlugin()
+ authtkt = DummyWhoPlugin()
request = DummyRequest({})
policy = self._makeOne()
result = policy.remember(context, request, 'fred')
@@ -70,7 +102,7 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
def test_remember(self):
context = DummyContext()
- authtkt = DummyPlugin()
+ authtkt = DummyWhoPlugin()
request = DummyRequest(
{'repoze.who.plugins':{'auth_tkt':authtkt}})
policy = self._makeOne()
@@ -80,7 +112,7 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
def test_forget_no_plugins(self):
context = DummyContext()
- authtkt = DummyPlugin()
+ authtkt = DummyWhoPlugin()
request = DummyRequest({})
policy = self._makeOne()
result = policy.forget(context, request)
@@ -88,7 +120,7 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
def test_forget(self):
context = DummyContext()
- authtkt = DummyPlugin()
+ authtkt = DummyWhoPlugin()
request = DummyRequest(
{'repoze.who.plugins':{'auth_tkt':authtkt},
'repoze.who.identity':{'repoze.who.userid':'fred'},
@@ -103,8 +135,8 @@ class TestRemoteUserAuthenticationPolicy(unittest.TestCase):
from repoze.bfg.authentication import RemoteUserAuthenticationPolicy
return RemoteUserAuthenticationPolicy
- def _makeOne(self):
- return self._getTargetClass()()
+ def _makeOne(self, environ_key='REMOTE_USER', callback=None):
+ return self._getTargetClass()(environ_key, callback)
def test_class_implements_IAuthenticationPolicy(self):
from zope.interface.verify import verifyClass
@@ -147,7 +179,7 @@ class TestRemoteUserAuthenticationPolicy(unittest.TestCase):
def test_remember(self):
context = DummyContext()
- authtkt = DummyPlugin()
+ authtkt = DummyWhoPlugin()
request = DummyRequest({'REMOTE_USER':'fred'})
policy = self._makeOne()
result = policy.remember(context, request, 'fred')
@@ -155,12 +187,347 @@ class TestRemoteUserAuthenticationPolicy(unittest.TestCase):
def test_forget(self):
context = DummyContext()
- authtkt = DummyPlugin()
+ authtkt = DummyWhoPlugin()
request = DummyRequest({'REMOTE_USER':'fred'})
policy = self._makeOne()
result = policy.forget(context, request)
self.assertEqual(result, [])
+class TestAutkTktAuthenticationPolicy(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.authentication import AuthTktAuthenticationPolicy
+ return AuthTktAuthenticationPolicy
+
+ def _makeOne(self, callback, cookieidentity):
+ inst = self._getTargetClass()('secret', callback)
+ inst.cookie = DummyCookieHelper(cookieidentity)
+ return inst
+
+ def test_class_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyClass(IAuthenticationPolicy, self._getTargetClass())
+
+ def test_instance_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyObject(IAuthenticationPolicy, self._makeOne(None, None))
+
+ def test_authenticated_userid_no_cookie_identity(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne(None, None)
+ self.assertEqual(policy.authenticated_userid(context, request), None)
+
+ def test_authenticated_userid_callback_returns_None(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ def callback(userid):
+ return None
+ policy = self._makeOne(callback, {'userid':'fred'})
+ self.assertEqual(policy.authenticated_userid(context, request), None)
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ def callback(userid):
+ return True
+ policy = self._makeOne(callback, {'userid':'fred'})
+ self.assertEqual(policy.authenticated_userid(context, request), 'fred')
+
+ def test_effective_principals_no_cookie_identity(self):
+ from repoze.bfg.security import Everyone
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne(None, None)
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone])
+
+ def test_effective_principals_callback_returns_None(self):
+ from repoze.bfg.security import Everyone
+ context = DummyContext()
+ request = DummyRequest({})
+ def callback(userid):
+ return None
+ policy = self._makeOne(callback, {'userid':'fred'})
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone])
+
+ def test_effective_principals(self):
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ context = DummyContext()
+ request = DummyRequest({})
+ def callback(userid):
+ return ['group.foo']
+ policy = self._makeOne(callback, {'userid':'fred'})
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone, Authenticated, 'fred', 'group.foo'])
+
+ def test_remember(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne(None, None)
+ result = policy.remember(context, request, 'fred')
+ self.assertEqual(result, [])
+
+ def test_forget(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne(None, None)
+ result = policy.forget(context, request)
+ self.assertEqual(result, [])
+
+class TestAuthTktCookieHelper(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.authentication import AuthTktCookieHelper
+ return AuthTktCookieHelper
+
+ def _makeOne(self, *arg, **kw):
+ plugin = self._getTargetClass()(*arg, **kw)
+ return plugin
+
+ def _makeRequest(self, kw=None):
+ environ = {'wsgi.version': (1,0)}
+ if kw is not None:
+ environ.update(kw)
+ environ['REMOTE_ADDR'] = '1.1.1.1'
+ environ['SERVER_NAME'] = 'localhost'
+ return DummyRequest(environ)
+
+ def _makeTicket(self, userid='userid', remote_addr='0.0.0.0',
+ tokens = [], userdata='userdata',
+ cookie_name='auth_tkt', secure=False):
+ from paste.auth import auth_tkt
+ ticket = auth_tkt.AuthTicket(
+ 'secret',
+ userid,
+ remote_addr,
+ tokens=tokens,
+ user_data=userdata,
+ cookie_name=cookie_name,
+ secure=secure)
+ return ticket.cookie_value()
+
+ def test_identify_nocookie(self):
+ plugin = self._makeOne('secret')
+ request = self._makeRequest()
+ result = plugin.identify(request)
+ self.assertEqual(result, None)
+
+ def test_identify_good_cookie_include_ip(self):
+ plugin = self._makeOne('secret', include_ip=True)
+ val = self._makeTicket(remote_addr='1.1.1.1')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ result = plugin.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['userdata'], 'userdata')
+ self.failUnless('timestamp' in result)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
+ self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
+ self.assertEqual(environ['AUTH_TYPE'],'cookie')
+
+ def test_identify_good_cookie_dont_include_ip(self):
+ plugin = self._makeOne('secret', include_ip=False)
+ val = self._makeTicket()
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ result = plugin.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['userdata'], 'userdata')
+ self.failUnless('timestamp' in result)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
+ self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
+ self.assertEqual(environ['AUTH_TYPE'],'cookie')
+
+ def test_identify_good_cookie_int_useridtype(self):
+ plugin = self._makeOne('secret', include_ip=False)
+ val = self._makeTicket(userid='1', userdata='userid_type:int')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ result = plugin.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['userid'], 1)
+ self.assertEqual(result['userdata'], 'userid_type:int')
+ self.failUnless('timestamp' in result)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
+ self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:int')
+ self.assertEqual(environ['AUTH_TYPE'],'cookie')
+
+ def test_identify_good_cookie_unknown_useridtype(self):
+ plugin = self._makeOne('secret', include_ip=False)
+ val = self._makeTicket(userid='userid', userdata='userid_type:unknown')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ result = plugin.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['userdata'], 'userid_type:unknown')
+ self.failUnless('timestamp' in result)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
+ self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:unknown')
+ self.assertEqual(environ['AUTH_TYPE'],'cookie')
+
+ def test_identify_bad_cookie(self):
+ plugin = self._makeOne('secret', include_ip=True)
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'})
+ result = plugin.identify(request)
+ self.assertEqual(result, None)
+
+ def test_remember_creds_same(self):
+ plugin = self._makeOne('secret')
+ val = self._makeTicket(userid='userid')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ result = plugin.remember(request, 'userid', userdata='userdata')
+ self.assertEqual(result, None)
+
+ def test_remember_creds_different(self):
+ plugin = self._makeOne('secret')
+ old_val = self._makeTicket(userid='userid')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ new_val = self._makeTicket(userid='other', userdata='userdata')
+ result = plugin.remember(request, 'other', userdata='userdata')
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+ self.assertEqual(result[1],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=localhost'
+ % new_val))
+ self.assertEqual(result[2],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=.localhost'
+ % new_val))
+
+ def test_remember_creds_different_include_ip(self):
+ plugin = self._makeOne('secret', include_ip=True)
+ old_val = self._makeTicket(userid='userid', remote_addr='1.1.1.1')
+ request = self._makeRequest({'HTTP_COOKIE': 'auth_tkt=%s' % old_val})
+ new_val = self._makeTicket(userid='other',
+ userdata='userdata',
+ remote_addr='1.1.1.1')
+ result = plugin.remember(request, 'other', userdata='userdata')
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+ self.assertEqual(result[1],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=localhost'
+ % new_val))
+ self.assertEqual(result[2],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=.localhost'
+ % new_val))
+
+ def test_remember_creds_different_bad_old_cookie(self):
+ plugin = self._makeOne('secret')
+ old_val = 'BOGUS'
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ new_val = self._makeTicket(userid='other', userdata='userdata')
+ result = plugin.remember(request, userid='other', userdata='userdata')
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+ self.assertEqual(result[1],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=localhost'
+ % new_val))
+ self.assertEqual(result[2],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=.localhost'
+ % new_val))
+
+ def test_remember_creds_different_with_nonstring_tokens(self):
+ plugin = self._makeOne('secret')
+ old_val = self._makeTicket(userid='userid')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ new_val = self._makeTicket(userid='other',
+ userdata='userdata',
+ tokens='foo,bar',
+ )
+ result = plugin.remember(request, 'other',
+ userdata='userdata',
+ tokens=['foo', 'bar'],
+ )
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+ self.assertEqual(result[1],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=localhost'
+ % new_val))
+ self.assertEqual(result[2],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/; Domain=.localhost'
+ % new_val))
+
+ def test_remember_creds_different_int_userid(self):
+ plugin = self._makeOne('secret')
+ old_val = self._makeTicket(userid='userid')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ new_val = self._makeTicket(userid='1', userdata='userid_type:int')
+ result = plugin.remember(request, 1)
+
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+
+ def test_remember_creds_different_long_userid(self):
+ plugin = self._makeOne('secret')
+ old_val = self._makeTicket(userid='userid')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ new_val = self._makeTicket(userid='1', userdata='userid_type:int')
+ result = plugin.remember(request, long(1))
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+
+ def test_remember_creds_different_unicode_userid(self):
+ plugin = self._makeOne('secret')
+ old_val = self._makeTicket(userid='userid')
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ userid = unicode('\xc2\xa9', 'utf-8')
+ new_val = self._makeTicket(userid=userid.encode('utf-8'),
+ userdata='userid_type:unicode')
+ result = plugin.remember(request, userid)
+ self.assertEqual(type(result[0][1]), str)
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0],
+ ('Set-Cookie',
+ 'auth_tkt="%s"; Path=/' % new_val))
+
+ def test_forget(self):
+ plugin = self._makeOne('secret')
+ request = self._makeRequest()
+ headers = plugin.forget(request)
+ self.assertEqual(len(headers), 3)
+ header = headers[0]
+ name, value = header
+ self.assertEqual(name, 'Set-Cookie')
+ self.assertEqual(value, 'auth_tkt=""""; Path=/')
+ header = headers[1]
+ name, value = header
+ self.assertEqual(name, 'Set-Cookie')
+ self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=localhost')
+ header = headers[2]
+ name, value = header
+ self.assertEqual(name, 'Set-Cookie')
+ self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=.localhost')
+
+
class DummyContext:
pass
@@ -168,9 +535,23 @@ class DummyRequest:
def __init__(self, environ):
self.environ = environ
-class DummyPlugin:
+class DummyWhoPlugin:
def remember(self, environ, identity):
return environ, identity
def forget(self, environ, identity):
return environ, identity
+
+class DummyCookieHelper:
+ def __init__(self, result):
+ self.result = result
+
+ def identify(self, *arg, **kw):
+ return self.result
+
+ def remember(self, *arg, **kw):
+ return []
+
+ def forget(self, *arg):
+ return []
+