diff options
| -rw-r--r-- | CHANGES.txt | 17 | ||||
| -rw-r--r-- | docs/api/authentication.rst | 4 | ||||
| -rw-r--r-- | repoze/bfg/authentication.py | 320 | ||||
| -rw-r--r-- | repoze/bfg/interfaces.py | 2 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_authentication.py | 405 |
5 files changed, 708 insertions, 40 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 2562d23ab..3c8445fa1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,20 @@ +Next release +============ + +Features +-------- + +- Add an AuthTktAuthenticationPolicy. This policy retrieves + credentials from an auth_tkt cookie managed by the application + itself (instead of relying on an upstream data source for + authentication data). See the Security API chapter of the + documentation for more info. + +- Allow RemoteUserAuthenticationPolicy and + RepozeWho1AuthenticationPolicy to accept various constructor + arguments. See the Security API chapter of the documentation for + more info. + 0.9a5 (2009-05-28) ================== diff --git a/docs/api/authentication.rst b/docs/api/authentication.rst index f51eeb302..6514de510 100644 --- a/docs/api/authentication.rst +++ b/docs/api/authentication.rst @@ -8,7 +8,9 @@ Authentication Policies ~~~~~~~~~~~~~~~~~~~~~~~ -.. autoclass:: RepozeWho1AuthenticationPolicy +.. autoclass:: AuthTktAuthenticationPolicy .. autoclass:: RemoteUserAuthenticationPolicy +.. autoclass:: RepozeWho1AuthenticationPolicy + diff --git a/repoze/bfg/authentication.py b/repoze/bfg/authentication.py index 487a5e6a8..e7d4ef06e 100644 --- a/repoze/bfg/authentication.py +++ b/repoze/bfg/authentication.py @@ -1,13 +1,78 @@ +from codecs import utf_8_decode +from codecs import utf_8_encode +import crypt +import os +import stat +import StringIO +import time +import traceback + +from paste.request import get_cookies +from paste.auth import auth_tkt + from zope.interface import implements +from zope.component import queryUtility + from repoze.bfg.interfaces import IAuthenticationPolicy from repoze.bfg.security import Everyone from repoze.bfg.security import Authenticated -class RepozeWho1AuthenticationPolicy(object): +class CallbackAuthenticationPolicy(object): + """ Abstract class """ + def authenticated_userid(self, context, request): + userid = self._get_userid(request) + if userid is None: + return None + if self.callback is None: + return userid + if self.callback(userid) is not None: # is not None! + return userid + + def effective_principals(self, context, request): + effective_principals = [Everyone] + userid = self._get_userid(request) + if userid is None: + return effective_principals + if self.callback is None: + groups = [] + else: + groups = self.callback(userid) + if groups is None: # is None! + return effective_principals + effective_principals.append(Authenticated) + effective_principals.append(userid) + effective_principals.extend(groups) + + return effective_principals + + +class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy): """ A BFG authentication policy which obtains data from the - repoze.who 1.X WSGI API """ + repoze.who 1.X WSGI 'API' (the ``repoze.who.identity`` key in the + WSGI environment). + + Constructor Arguments + --------------------- + + ``identifier_name`` + + Default: ``auth_tkt``. The who plugin name that performs + remember/forget. Optional. + + ``callback`` + + Default: ``None``. A callback passed the repoze.who identity, + expected to return None if the user represented by the + identity doesn't exist or a sequence of group identifiers + (possibly empty) if the user does exist. If ``callback`` is + None, the userid will be assumed to exist with no groups. + + """ implements(IAuthenticationPolicy) - identifier_name = 'auth_tkt' + + def __init__(self, identifier_name='auth_tkt', callback=None): + self.identifier_name = identifier_name + self.callback = callback def _get_identity(self, request): return request.environ.get('repoze.who.identity') @@ -23,17 +88,24 @@ class RepozeWho1AuthenticationPolicy(object): identity = self._get_identity(request) if identity is None: return None - return identity['repoze.who.userid'] + if self.callback is None: + return identity['repoze.who.userid'] + if self.callback(identity) is not None: # is not None! + return identity['repoze.who.userid'] def effective_principals(self, context, request): effective_principals = [Everyone] identity = self._get_identity(request) if identity is None: return effective_principals - - effective_principals.append(Authenticated) + if self.callback is None: + groups = [] + else: + groups = self.callback(identity) + if groups is None: # is None! + return effective_principals userid = identity['repoze.who.userid'] - groups = identity.get('groups', []) + effective_principals.append(Authenticated) effective_principals.append(userid) effective_principals.extend(groups) @@ -53,34 +125,230 @@ class RepozeWho1AuthenticationPolicy(object): return [] identity = self._get_identity(request) return identifier.forget(request.environ, identity) - -class RemoteUserAuthenticationPolicy(object): + +class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy): """ A BFG authentication policy which obtains data from the - REMOTE_USER WSGI envvar """ - implements(IAuthenticationPolicy) + REMOTE_USER WSGI envvar. - def _get_identity(self, request): - return request.environ.get('REMOTE_USER') + Constructor Arguments + --------------------- - def authenticated_userid(self, context, request): - identity = self._get_identity(request) - if identity is None: - return None - return identity + ``environ_key`` - def effective_principals(self, context, request): - effective_principals = [Everyone] - identity = self._get_identity(request) - if identity is None: - return effective_principals + Default: ``REMOTE_USER``. The key in the WSGI environ which + provides the userid. - effective_principals.append(Authenticated) - effective_principals.append(identity) + ``callback`` - return effective_principals + Default: ``None``. A callback passed the userid, expected to return + None if the userid doesn't exist or a sequence of group identifiers + (possibly empty) if the user does exist. If ``callback`` is None, + the userid will be assumed to exist with no groups. + """ + implements(IAuthenticationPolicy) + + def __init__(self, environ_key='REMOTE_USER', callback=None): + self.environ_key = environ_key + self.callback = callback + + def _get_userid(self, request): + return request.environ.get(self.environ_key) def remember(self, context, request, principal, **kw): return [] def forget(self, context, request): return [] + +class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy): + """ A BFG authentication policy which obtains data from an + auth_tkt cookie. + + Constructor Arguments + --------------------- + + ``secret`` + + The secret (a string) used for auth_tkt cookie encryption. + Required. + + ``callback`` + + Default: ``None``. A callback passed the userid, expected to return + None if the userid doesn't exist or a sequence of group identifiers + (possibly empty) if the user does exist. If ``callback`` is None, + the userid will be assumed to exist with no groups. + + ``cookie_name`` + + Default: ``repoze.bfg.auth_tkt``. The cookie name used + (string). Optional. + + ``secure`` + + Default: ``False``. Only send the cookie back over a secure + conn. Optional. + + ``include_ip`` + + Default: ``False``. Make the requesting IP address part of + the authentication data in the cookie. Optional. + + """ + implements(IAuthenticationPolicy) + def __init__(self, + secret, + callback=None, + cookie_name='repoze.bfg.auth_tkt', + secure=False, + include_ip=False): + self.cookie = AuthTktCookieHelper( + secret, + cookie_name=cookie_name, + secure=secure, + include_ip=include_ip, + ) + self.callback = callback + + def _get_userid(self, request): + result = self.cookie.identify(request) + if result: + return result['userid'] + + def remember(self, context, request, principal, **kw): + return self.cookie.remember(request, principal) + + def forget(self, context, request): + return self.cookie.forget(request) + +class AuthTktCookieHelper(object): + userid_type_decoders = { + 'int':int, + 'unicode':lambda x: utf_8_decode(x)[0], + } + + userid_type_encoders = { + int: ('int', str), + long: ('int', str), + unicode: ('unicode', lambda x: utf_8_encode(x)[0]), + } + + def __init__(self, secret, cookie_name='auth_tkt', secure=False, + include_ip=False): + self.secret = secret + self.cookie_name = cookie_name + self.include_ip = include_ip + self.secure = secure + + # IIdentifier + def identify(self, request): + environ = request.environ + cookies = get_cookies(environ) + cookie = cookies.get(self.cookie_name) + + if cookie is None or not cookie.value: + return None + + if self.include_ip: + remote_addr = environ['REMOTE_ADDR'] + else: + remote_addr = '0.0.0.0' + + try: + timestamp, userid, tokens, user_data = auth_tkt.parse_ticket( + self.secret, cookie.value, remote_addr) + except auth_tkt.BadTicket: + return None + + userid_typename = 'userid_type:' + user_data_info = user_data.split('|') + for datum in filter(None, user_data_info): + if datum.startswith(userid_typename): + userid_type = datum[len(userid_typename):] + decoder = self.userid_type_decoders.get(userid_type) + if decoder: + userid = decoder(userid) + + environ['REMOTE_USER_TOKENS'] = tokens + environ['REMOTE_USER_DATA'] = user_data + environ['AUTH_TYPE'] = 'cookie' + + identity = {} + identity['timestamp'] = timestamp + identity['userid'] = userid + identity['tokens'] = tokens + identity['userdata'] = user_data + return identity + + def _get_cookies(self, environ, value): + cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME')) + wild_domain = '.' + cur_domain + cookies = [ + ('Set-Cookie', '%s="%s"; Path=/' % ( + self.cookie_name, value)), + ('Set-Cookie', '%s="%s"; Path=/; Domain=%s' % ( + self.cookie_name, value, cur_domain)), + ('Set-Cookie', '%s="%s"; Path=/; Domain=%s' % ( + self.cookie_name, value, wild_domain)) + ] + return cookies + + # IIdentifier + def forget(self, request): + # return a set of expires Set-Cookie headers + environ = request.environ + return self._get_cookies(environ, '""') + + # IIdentifier + def remember(self, request, userid, tokens='', userdata=''): + environ = request.environ + if self.include_ip: + remote_addr = environ['REMOTE_ADDR'] + else: + remote_addr = '0.0.0.0' + + cookies = get_cookies(environ) + old_cookie = cookies.get(self.cookie_name) + existing = cookies.get(self.cookie_name) + old_cookie_value = getattr(existing, 'value', None) + + timestamp, old_userid, old_tokens, old_userdata = None, '', '', '' + + if old_cookie_value: + try: + (timestamp,old_userid,old_tokens, + old_userdata) = auth_tkt.parse_ticket( + self.secret, old_cookie_value, remote_addr) + except auth_tkt.BadTicket: + pass + + encoding_data = self.userid_type_encoders.get(type(userid)) + if encoding_data: + encoding, encoder = encoding_data + userid = encoder(userid) + userdata = 'userid_type:%s' % encoding + + if not isinstance(tokens, basestring): + tokens = ','.join(tokens) + if not isinstance(old_tokens, basestring): + old_tokens = ','.join(old_tokens) + old_data = (old_userid, old_tokens, old_userdata) + new_data = (userid, tokens, userdata) + + if old_data != new_data: + ticket = auth_tkt.AuthTicket( + self.secret, + userid, + remote_addr, + tokens=tokens, + user_data=userdata, + cookie_name=self.cookie_name, + secure=self.secure) + new_cookie_value = ticket.cookie_value() + + cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME')) + wild_domain = '.' + cur_domain + if old_cookie_value != new_cookie_value: + # return a set of Set-Cookie headers + return self._get_cookies(environ, new_cookie_value) + diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index c4bafbe20..282e3756a 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -212,7 +212,7 @@ class IRoutesContextFactory(Interface): """ class IAuthenticationPolicy(Interface): - """ A multi-adapter on context and request """ + """ An object representing a BFG authentication policy. """ def authenticated_userid(context, request): """ Return the authenticated userid or ``None`` if no authenticated userid can be found. """ diff --git a/repoze/bfg/tests/test_authentication.py b/repoze/bfg/tests/test_authentication.py index a23ffeac2..b0b1e084a 100644 --- a/repoze/bfg/tests/test_authentication.py +++ b/repoze/bfg/tests/test_authentication.py @@ -5,8 +5,8 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): from repoze.bfg.authentication import RepozeWho1AuthenticationPolicy return RepozeWho1AuthenticationPolicy - def _makeOne(self): - return self._getTargetClass()() + def _makeOne(self, identifier_name='auth_tkt', callback=None): + return self._getTargetClass()(identifier_name, callback) def test_class_implements_IAuthenticationPolicy(self): from zope.interface.verify import verifyClass @@ -31,6 +31,24 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): policy = self._makeOne() self.assertEqual(policy.authenticated_userid(context, request), 'fred') + def test_authenticated_userid_with_callback_returns_None(self): + context = DummyContext() + request = DummyRequest( + {'repoze.who.identity':{'repoze.who.userid':'fred'}}) + def callback(identity): + return None + policy = self._makeOne(callback=callback) + self.assertEqual(policy.authenticated_userid(context, request), None) + + def test_authenticated_userid_with_callback_returns_something(self): + context = DummyContext() + request = DummyRequest( + {'repoze.who.identity':{'repoze.who.userid':'fred'}}) + def callback(identity): + return ['agroup'] + policy = self._makeOne(callback=callback) + self.assertEqual(policy.authenticated_userid(context, request), 'fred') + def test_effective_principals_None(self): from repoze.bfg.security import Everyone context = DummyContext() @@ -56,13 +74,27 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): request = DummyRequest( {'repoze.who.identity':{'repoze.who.userid':'fred', 'groups':['quux', 'biz']}}) - policy = self._makeOne() + def callback(identity): + return identity['groups'] + policy = self._makeOne(callback=callback) self.assertEqual(policy.effective_principals(context, request), [Everyone, Authenticated, 'fred', 'quux', 'biz']) + def test_effective_principals_userid_callback_returns_None(self): + from repoze.bfg.security import Everyone + context = DummyContext() + request = DummyRequest( + {'repoze.who.identity':{'repoze.who.userid':'fred', + 'groups':['quux', 'biz']}}) + def callback(identity): + return None + policy = self._makeOne(callback=callback) + self.assertEqual(policy.effective_principals(context, request), + [Everyone]) + def test_remember_no_plugins(self): context = DummyContext() - authtkt = DummyPlugin() + authtkt = DummyWhoPlugin() request = DummyRequest({}) policy = self._makeOne() result = policy.remember(context, request, 'fred') @@ -70,7 +102,7 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): def test_remember(self): context = DummyContext() - authtkt = DummyPlugin() + authtkt = DummyWhoPlugin() request = DummyRequest( {'repoze.who.plugins':{'auth_tkt':authtkt}}) policy = self._makeOne() @@ -80,7 +112,7 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): def test_forget_no_plugins(self): context = DummyContext() - authtkt = DummyPlugin() + authtkt = DummyWhoPlugin() request = DummyRequest({}) policy = self._makeOne() result = policy.forget(context, request) @@ -88,7 +120,7 @@ class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): def test_forget(self): context = DummyContext() - authtkt = DummyPlugin() + authtkt = DummyWhoPlugin() request = DummyRequest( {'repoze.who.plugins':{'auth_tkt':authtkt}, 'repoze.who.identity':{'repoze.who.userid':'fred'}, @@ -103,8 +135,8 @@ class TestRemoteUserAuthenticationPolicy(unittest.TestCase): from repoze.bfg.authentication import RemoteUserAuthenticationPolicy return RemoteUserAuthenticationPolicy - def _makeOne(self): - return self._getTargetClass()() + def _makeOne(self, environ_key='REMOTE_USER', callback=None): + return self._getTargetClass()(environ_key, callback) def test_class_implements_IAuthenticationPolicy(self): from zope.interface.verify import verifyClass @@ -147,7 +179,7 @@ class TestRemoteUserAuthenticationPolicy(unittest.TestCase): def test_remember(self): context = DummyContext() - authtkt = DummyPlugin() + authtkt = DummyWhoPlugin() request = DummyRequest({'REMOTE_USER':'fred'}) policy = self._makeOne() result = policy.remember(context, request, 'fred') @@ -155,12 +187,347 @@ class TestRemoteUserAuthenticationPolicy(unittest.TestCase): def test_forget(self): context = DummyContext() - authtkt = DummyPlugin() + authtkt = DummyWhoPlugin() request = DummyRequest({'REMOTE_USER':'fred'}) policy = self._makeOne() result = policy.forget(context, request) self.assertEqual(result, []) +class TestAutkTktAuthenticationPolicy(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.authentication import AuthTktAuthenticationPolicy + return AuthTktAuthenticationPolicy + + def _makeOne(self, callback, cookieidentity): + inst = self._getTargetClass()('secret', callback) + inst.cookie = DummyCookieHelper(cookieidentity) + return inst + + def test_class_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyClass(IAuthenticationPolicy, self._getTargetClass()) + + def test_instance_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyObject(IAuthenticationPolicy, self._makeOne(None, None)) + + def test_authenticated_userid_no_cookie_identity(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne(None, None) + self.assertEqual(policy.authenticated_userid(context, request), None) + + def test_authenticated_userid_callback_returns_None(self): + context = DummyContext() + request = DummyRequest({}) + def callback(userid): + return None + policy = self._makeOne(callback, {'userid':'fred'}) + self.assertEqual(policy.authenticated_userid(context, request), None) + + def test_authenticated_userid(self): + context = DummyContext() + request = DummyRequest({}) + def callback(userid): + return True + policy = self._makeOne(callback, {'userid':'fred'}) + self.assertEqual(policy.authenticated_userid(context, request), 'fred') + + def test_effective_principals_no_cookie_identity(self): + from repoze.bfg.security import Everyone + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne(None, None) + self.assertEqual(policy.effective_principals(context, request), + [Everyone]) + + def test_effective_principals_callback_returns_None(self): + from repoze.bfg.security import Everyone + context = DummyContext() + request = DummyRequest({}) + def callback(userid): + return None + policy = self._makeOne(callback, {'userid':'fred'}) + self.assertEqual(policy.effective_principals(context, request), + [Everyone]) + + def test_effective_principals(self): + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + context = DummyContext() + request = DummyRequest({}) + def callback(userid): + return ['group.foo'] + policy = self._makeOne(callback, {'userid':'fred'}) + self.assertEqual(policy.effective_principals(context, request), + [Everyone, Authenticated, 'fred', 'group.foo']) + + def test_remember(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne(None, None) + result = policy.remember(context, request, 'fred') + self.assertEqual(result, []) + + def test_forget(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne(None, None) + result = policy.forget(context, request) + self.assertEqual(result, []) + +class TestAuthTktCookieHelper(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.authentication import AuthTktCookieHelper + return AuthTktCookieHelper + + def _makeOne(self, *arg, **kw): + plugin = self._getTargetClass()(*arg, **kw) + return plugin + + def _makeRequest(self, kw=None): + environ = {'wsgi.version': (1,0)} + if kw is not None: + environ.update(kw) + environ['REMOTE_ADDR'] = '1.1.1.1' + environ['SERVER_NAME'] = 'localhost' + return DummyRequest(environ) + + def _makeTicket(self, userid='userid', remote_addr='0.0.0.0', + tokens = [], userdata='userdata', + cookie_name='auth_tkt', secure=False): + from paste.auth import auth_tkt + ticket = auth_tkt.AuthTicket( + 'secret', + userid, + remote_addr, + tokens=tokens, + user_data=userdata, + cookie_name=cookie_name, + secure=secure) + return ticket.cookie_value() + + def test_identify_nocookie(self): + plugin = self._makeOne('secret') + request = self._makeRequest() + result = plugin.identify(request) + self.assertEqual(result, None) + + def test_identify_good_cookie_include_ip(self): + plugin = self._makeOne('secret', include_ip=True) + val = self._makeTicket(remote_addr='1.1.1.1') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val}) + result = plugin.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ['']) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], 'userdata') + self.failUnless('timestamp' in result) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ['']) + self.assertEqual(environ['REMOTE_USER_DATA'],'userdata') + self.assertEqual(environ['AUTH_TYPE'],'cookie') + + def test_identify_good_cookie_dont_include_ip(self): + plugin = self._makeOne('secret', include_ip=False) + val = self._makeTicket() + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val}) + result = plugin.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ['']) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], 'userdata') + self.failUnless('timestamp' in result) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ['']) + self.assertEqual(environ['REMOTE_USER_DATA'],'userdata') + self.assertEqual(environ['AUTH_TYPE'],'cookie') + + def test_identify_good_cookie_int_useridtype(self): + plugin = self._makeOne('secret', include_ip=False) + val = self._makeTicket(userid='1', userdata='userid_type:int') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val}) + result = plugin.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ['']) + self.assertEqual(result['userid'], 1) + self.assertEqual(result['userdata'], 'userid_type:int') + self.failUnless('timestamp' in result) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ['']) + self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:int') + self.assertEqual(environ['AUTH_TYPE'],'cookie') + + def test_identify_good_cookie_unknown_useridtype(self): + plugin = self._makeOne('secret', include_ip=False) + val = self._makeTicket(userid='userid', userdata='userid_type:unknown') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val}) + result = plugin.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ['']) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], 'userid_type:unknown') + self.failUnless('timestamp' in result) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ['']) + self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:unknown') + self.assertEqual(environ['AUTH_TYPE'],'cookie') + + def test_identify_bad_cookie(self): + plugin = self._makeOne('secret', include_ip=True) + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) + result = plugin.identify(request) + self.assertEqual(result, None) + + def test_remember_creds_same(self): + plugin = self._makeOne('secret') + val = self._makeTicket(userid='userid') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val}) + result = plugin.remember(request, 'userid', userdata='userdata') + self.assertEqual(result, None) + + def test_remember_creds_different(self): + plugin = self._makeOne('secret') + old_val = self._makeTicket(userid='userid') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) + new_val = self._makeTicket(userid='other', userdata='userdata') + result = plugin.remember(request, 'other', userdata='userdata') + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + self.assertEqual(result[1], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=localhost' + % new_val)) + self.assertEqual(result[2], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=.localhost' + % new_val)) + + def test_remember_creds_different_include_ip(self): + plugin = self._makeOne('secret', include_ip=True) + old_val = self._makeTicket(userid='userid', remote_addr='1.1.1.1') + request = self._makeRequest({'HTTP_COOKIE': 'auth_tkt=%s' % old_val}) + new_val = self._makeTicket(userid='other', + userdata='userdata', + remote_addr='1.1.1.1') + result = plugin.remember(request, 'other', userdata='userdata') + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + self.assertEqual(result[1], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=localhost' + % new_val)) + self.assertEqual(result[2], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=.localhost' + % new_val)) + + def test_remember_creds_different_bad_old_cookie(self): + plugin = self._makeOne('secret') + old_val = 'BOGUS' + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) + new_val = self._makeTicket(userid='other', userdata='userdata') + result = plugin.remember(request, userid='other', userdata='userdata') + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + self.assertEqual(result[1], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=localhost' + % new_val)) + self.assertEqual(result[2], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=.localhost' + % new_val)) + + def test_remember_creds_different_with_nonstring_tokens(self): + plugin = self._makeOne('secret') + old_val = self._makeTicket(userid='userid') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) + new_val = self._makeTicket(userid='other', + userdata='userdata', + tokens='foo,bar', + ) + result = plugin.remember(request, 'other', + userdata='userdata', + tokens=['foo', 'bar'], + ) + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + self.assertEqual(result[1], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=localhost' + % new_val)) + self.assertEqual(result[2], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/; Domain=.localhost' + % new_val)) + + def test_remember_creds_different_int_userid(self): + plugin = self._makeOne('secret') + old_val = self._makeTicket(userid='userid') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) + new_val = self._makeTicket(userid='1', userdata='userid_type:int') + result = plugin.remember(request, 1) + + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + + def test_remember_creds_different_long_userid(self): + plugin = self._makeOne('secret') + old_val = self._makeTicket(userid='userid') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) + new_val = self._makeTicket(userid='1', userdata='userid_type:int') + result = plugin.remember(request, long(1)) + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + + def test_remember_creds_different_unicode_userid(self): + plugin = self._makeOne('secret') + old_val = self._makeTicket(userid='userid') + request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) + userid = unicode('\xc2\xa9', 'utf-8') + new_val = self._makeTicket(userid=userid.encode('utf-8'), + userdata='userid_type:unicode') + result = plugin.remember(request, userid) + self.assertEqual(type(result[0][1]), str) + self.assertEqual(len(result), 3) + self.assertEqual(result[0], + ('Set-Cookie', + 'auth_tkt="%s"; Path=/' % new_val)) + + def test_forget(self): + plugin = self._makeOne('secret') + request = self._makeRequest() + headers = plugin.forget(request) + self.assertEqual(len(headers), 3) + header = headers[0] + name, value = header + self.assertEqual(name, 'Set-Cookie') + self.assertEqual(value, 'auth_tkt=""""; Path=/') + header = headers[1] + name, value = header + self.assertEqual(name, 'Set-Cookie') + self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=localhost') + header = headers[2] + name, value = header + self.assertEqual(name, 'Set-Cookie') + self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=.localhost') + + class DummyContext: pass @@ -168,9 +535,23 @@ class DummyRequest: def __init__(self, environ): self.environ = environ -class DummyPlugin: +class DummyWhoPlugin: def remember(self, environ, identity): return environ, identity def forget(self, environ, identity): return environ, identity + +class DummyCookieHelper: + def __init__(self, result): + self.result = result + + def identify(self, *arg, **kw): + return self.result + + def remember(self, *arg, **kw): + return [] + + def forget(self, *arg): + return [] + |
