summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.txt57
-rw-r--r--docs/narr/security.rst25
-rw-r--r--repoze/bfg/authentication.py207
-rw-r--r--repoze/bfg/request.py5
-rw-r--r--repoze/bfg/router.py12
-rw-r--r--repoze/bfg/tests/test_authentication.py388
-rw-r--r--repoze/bfg/tests/test_request.py20
-rw-r--r--repoze/bfg/tests/test_router.py21
-rw-r--r--repoze/bfg/tests/test_zcml.py3
-rw-r--r--repoze/bfg/zcml.py7
10 files changed, 437 insertions, 308 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index cfe7da159..0f6818e3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,8 +4,7 @@ Next release
Features
--------
-- Add ``path_info``, ``accept``, and ``header`` view configuration
- predicate.
+- Add ``path_info`` view configuration predicate.
- ``paster bfgshell`` now supports IPython if it's available for
import. Thanks to Daniel Holth for the initial patch.
@@ -18,12 +17,19 @@ Features
- A new exception exists: ``repoze.bfg.exceptions.Respond``. This
exception can be raised during view execution return a response.
This is effectively a goto, useable by code that has no capability
- to otherwise return a response.
+ to otherwise return a response. It is documented in the
+ ``repoze.bfg.exceptions`` API documentation.
- The name ``root`` is available as an attribute of the request
slightly earlier now (before a NewRequest event is emitted).
``root`` is the result of the application "root factory".
+- Added ``max_age`` parameter to ``authtktauthenticationpolicy`` ZCML
+ directive. If this value is set, it must be an integer representing
+ the number of seconds which the auth tkt cookie will survive.
+ Mainly, its existence allows the auth_tkt cookie to survive across
+ browser sessions.
+
Bug Fixes
---------
@@ -36,6 +42,12 @@ Bug Fixes
used in ZCML) introduced in 1.1a7. Symptom: ``AttributeError:
object has no attribute __provides__`` raised at startup time.
+- The ``reissue_time`` argument to the ``authtktauthenticationpolicy``
+ ZCML directive now actually works. When it is set to an integer
+ value, an authticket set-cookie header is appended to the response
+ whenever a request requires authentication and 'now' minus the
+ authticket's timestamp is greater than ``reissue_time`` seconds.
+
Documentation
-------------
@@ -48,20 +60,39 @@ Documentation
- Fix route_url documentation (``_query`` argument documented as
``query`` and ``_anchor`` argument documented as ``anchor``).
+Backwards Incompatibilities
+---------------------------
+
+- The ``authtkt`` authentication policy ``remember`` method now no
+ longer honors ``token`` or ``userdata`` keyword arguments.
+
Internal
--------
- Change how ``bfg_view`` decorator works when used as a class method
- decorator. In 1.1a7, it actually tried to grope every class in
- scanned package at startup time looking for methods, which led to
- some strange symptoms (e.g. ``AttributeError: object has no
- attribute __provides__``). Now, instead of groping methods at
- startup time, we just cause the ``bfg_view`` decorator itself to
- populate its class' __dict__ when its used inside a class as a
- method decorator. This is essentially a reversion back to 1.1a6
- "grokking" behavior plus some special magic for using the
- ``bfg_view`` decorator as method decorator inside the ``bfg_view``
- class itself.
+ decorator. In 1.1a7, the``scan``directive actually tried to grope
+ every class in scanned package at startup time, calling ``dir``
+ against each found class, and subsequently invoking ``getattr``
+ against each thing found by ``dir`` to see if it was a method. This
+ led to some strange symptoms (e.g. ``AttributeError: object has no
+ attribute __provides__``), and was generally just a bad idea. Now,
+ instead of groping classes for methods at startup time, we just
+ cause the ``bfg_view`` decorator itself to populate the method's
+ class' ``__dict__`` when it is used as a method decorator. This
+ also requires a nasty _getframe thing but it's slightly less nasty
+ than the startup time groping behavior. This is essentially a
+ reversion back to 1.1a6 "grokking" behavior plus some special magic
+ for using the ``bfg_view`` decorator as method decorator inside the
+ ``bfg_view`` class itself.
+
+- The router now checks for a ``global_response_headers`` attribute of
+ the request object before returning a response. If this value
+ exists, it is presumed to be a sequence of two-tuples, representing
+ a set of headers to append to the 'normal' response headers. This
+ feature is internal, rather than exposed internally, because it's
+ unclear whether it will stay around in the long term. It was added
+ to support the ``reissue_time`` feature of the authtkt
+ authentication policy.
1.1a7 (2009-10-18)
==================
diff --git a/docs/narr/security.rst b/docs/narr/security.rst
index 36c247037..90ead339c 100644
--- a/docs/narr/security.rst
+++ b/docs/narr/security.rst
@@ -422,6 +422,7 @@ An example of its usage, with all attributes fully expanded:
include_ip="false"
timeout="86400"
reissue_time="600"
+ max_age="31536000"
/>
The ``secret`` is a string that will be used to encrypt the data
@@ -449,16 +450,30 @@ requesting user agent, the cookie is considered invalid. It defaults
to "false".
``timeout`` is an integer value. It represents the maximum age in
-seconds allowed for a cookie to live. If ``timeout`` is specified,
-you must also set ``reissue_time`` to a lower value. It defaults to
-``None``, meaning that the cookie will only live for the duration of
-the user's browser session.
+seconds which the auth_tkt ticket will be considered valid. If
+``timeout`` is specified, and ``reissue_time`` is also specified,
+``reissue_time`` must be a smaller value than ``timeout``. It
+defaults to ``None``, meaning that the ticket will be considered valid
+forever.
``reissue_time`` is an integer value. If ``reissue_time`` is
specified, when we encounter a cookie that is older than the reissue
time (in seconds), but younger that the ``timeout``, a new cookie will
be issued. It defaults to ``None``, meaning that authentication
-cookies are never reissued.
+cookies are never reissued. A value of ``0`` means reissue a cookie
+in the response to every request that requires authentication.
+
+``max_age`` is the maximum age of the auth_tkt *cookie*, in seconds.
+This differs from ``timeout`` inasmuch as ``timeout`` represents the
+lifetime of the ticket contained in the cookie, while this value
+represents the lifetime of the cookie itself. When this value is set,
+the cookie's ``Max-Age`` and ``Expires`` settings will be set,
+allowing the auth_tkt cookie to last between browser sessions. It is
+typically nonsenical to set this to a value that is lower than
+``timeout`` or ``reissue_time``, although it is not explicitly
+prevented. It defaults to ``None``, meaning (on all major browser
+platforms) that auth_tkt cookies will last for the lifetime of the
+user's browser session.
``remoteuserauthenticationpolicy``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/repoze/bfg/authentication.py b/repoze/bfg/authentication.py
index 5fa653a74..e88952f86 100644
--- a/repoze/bfg/authentication.py
+++ b/repoze/bfg/authentication.py
@@ -10,6 +10,7 @@ from zope.interface import implements
from repoze.bfg.interfaces import IAuthenticationPolicy
+from repoze.bfg.request import add_global_response_headers
from repoze.bfg.security import Authenticated
from repoze.bfg.security import Everyone
@@ -173,7 +174,7 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
expected to return None if the userid doesn't exist or a sequence
of group identifiers (possibly empty) if the user does exist. If
``callback`` is None, the userid will be assumed to exist with no
- groups.
+ groups. Optional.
``cookie_name``
@@ -192,16 +193,39 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
``timeout``
- Default: ``None``. Maximum age in seconds allowed for a cookie
- to live. If ``timeout`` is specified, you must also set
- ``reissue_time`` to a lower value.
+ Default: ``None``. Maximum number of seconds after which a
+ newly issued ticket will be considered valid. After this
+ amount of time, the ticket will expire (effectively logging the
+ user out). If this value is ``None``, the token never expires.
+ Optional.
``reissue_time``
- Default: ``None``. If ``reissue_time`` is specified, when we
- encounter a cookie that is older than the reissue time (in
- seconds), but younger that the ``timeout``, a new cookie will
- be issued.
+ Default: ``None``. If this parameter is set, it represents the
+ number of seconds that must pass before an authentication token
+ cookie is reissued. The duration is measured as the number of
+ seconds since the last auth_tkt cookie was issued and 'now'.
+ If the ``timeout`` value is ``None``, this parameter has no
+ effect. If this parameter is provided, and the value of
+ ``timeout`` is not ``None``, the value of ``reissue_time`` must
+ be smaller than value of ``timeout``. A good rule of thumb: if
+ you want auto-reissued cookies: set this to the ``timeout``
+ value divided by ten. If this value is ``0``, a new ticket
+ cookie will be reissued on every request which needs
+ authentication. Optional.
+
+ ``max_age``
+
+ Default: ``None``. The max age of the auth_tkt cookie, in
+ seconds. This differs from ``timeout`` inasmuch as ``timeout``
+ represents the lifetime of the ticket contained in the cookie,
+ while this value represents the lifetime of the cookie itself.
+ When this value is set, the cookie's ``Max-Age`` and ``Expires``
+ settings will be set, allowing the auth_tkt cookie to last
+ between browser sessions. It is typically nonsenical to set
+ this to a value that is lower than ``timeout`` or
+ ``reissue_time``, although it is not explicitly prevented.
+ Optional.
"""
implements(IAuthenticationPolicy)
def __init__(self,
@@ -211,7 +235,8 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
secure=False,
include_ip=False,
timeout=None,
- reissue_time=None):
+ reissue_time=None,
+ max_age=None):
self.cookie = AuthTktCookieHelper(
secret,
cookie_name=cookie_name,
@@ -219,6 +244,7 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
include_ip=include_ip,
timeout=timeout,
reissue_time=reissue_time,
+ max_age=max_age,
)
self.callback = callback
@@ -228,38 +254,77 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
return result['userid']
def remember(self, request, principal, **kw):
- """ Accepts the following kw args: ``tokens``, ``userdata``,
- ``max_age``."""
+ """ Accepts the following kw args: ``max_age``."""
return self.cookie.remember(request, principal, **kw)
def forget(self, request):
return self.cookie.forget(request)
-
+
+def b64encode(v):
+ return v.encode('base64').strip().replace('\n', '')
+
+def b64decode(v):
+ return v.decode('base64')
+
+EXPIRE = object()
+
class AuthTktCookieHelper(object):
+ auth_tkt = auth_tkt # for tests
+
userid_type_decoders = {
'int':int,
- 'unicode':lambda x: utf_8_decode(x)[0],
+ 'unicode':lambda x: utf_8_decode(x)[0], # bw compat for old cookies
+ 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
+ 'b64str': lambda x: b64decode(x),
}
userid_type_encoders = {
int: ('int', str),
long: ('int', str),
- unicode: ('unicode', lambda x: utf_8_encode(x)[0]),
+ unicode: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
+ str: ('b64str', lambda x: b64encode(x)),
}
def __init__(self, secret, cookie_name='auth_tkt', secure=False,
- include_ip=False, timeout=None, reissue_time=None):
+ include_ip=False, timeout=None, reissue_time=None,
+ max_age=None):
self.secret = secret
self.cookie_name = cookie_name
self.include_ip = include_ip
self.secure = secure
- if timeout and ( (not reissue_time) or (reissue_time > timeout) ):
- raise ValueError('When timeout is specified, reissue_time must '
- 'be set to a lower value')
self.timeout = timeout
+ if reissue_time is not None and timeout is not None:
+ if reissue_time > timeout:
+ raise ValueError('reissue_time must be lower than timeout')
self.reissue_time = reissue_time
+ self.max_age = max_age
+
+ def _get_cookies(self, environ, value, max_age=None):
+ if max_age is EXPIRE:
+ max_age = "; Max-Age=0; Expires=Wed, 31-Dec-97 23:59:59 GMT"
+ elif max_age is not None:
+ later = datetime.datetime.utcnow() + datetime.timedelta(
+ seconds=int(max_age))
+ # Wdy, DD-Mon-YY HH:MM:SS GMT
+ expires = later.strftime('%a, %d %b %Y %H:%M:%S GMT')
+ # the Expires header is *required* at least for IE7 (IE7 does
+ # not respect Max-Age)
+ max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
+ else:
+ max_age = ''
+
+ cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
+ wild_domain = '.' + cur_domain
+ cookies = [
+ ('Set-Cookie', '%s="%s"; Path=/%s' % (
+ self.cookie_name, value, max_age)),
+ ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s' % (
+ self.cookie_name, value, cur_domain, max_age)),
+ ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s' % (
+ self.cookie_name, value, wild_domain, max_age))
+ ]
+ return cookies
- # IIdentifier
def identify(self, request):
environ = request.environ
cookies = get_cookies(environ)
@@ -274,12 +339,14 @@ class AuthTktCookieHelper(object):
remote_addr = '0.0.0.0'
try:
- timestamp, userid, tokens, user_data = auth_tkt.parse_ticket(
+ timestamp, userid, tokens, user_data = self.auth_tkt.parse_ticket(
self.secret, cookie.value, remote_addr)
- except auth_tkt.BadTicket:
+ except self.auth_tkt.BadTicket:
return None
- if self.timeout and ( (timestamp + self.timeout) < time.time() ):
+ now = time.time()
+
+ if self.timeout and ( (timestamp + self.timeout) < now ):
return None
userid_typename = 'userid_type:'
@@ -290,7 +357,15 @@ class AuthTktCookieHelper(object):
decoder = self.userid_type_decoders.get(userid_type)
if decoder:
userid = decoder(userid)
+
+ reissue = self.reissue_time is not None
+ if not hasattr(request, '_authtkt_reissued'):
+ if reissue and ( (now - timestamp) > self.reissue_time):
+ headers = self.remember(request, userid, max_age=self.max_age)
+ add_global_response_headers(request, headers)
+ request._authtkt_reissued = True
+
environ['REMOTE_USER_TOKENS'] = tokens
environ['REMOTE_USER_DATA'] = user_data
environ['AUTH_TYPE'] = 'cookie'
@@ -302,90 +377,38 @@ class AuthTktCookieHelper(object):
identity['userdata'] = user_data
return identity
- def _get_cookies(self, environ, value, max_age=None):
- if max_age is not None:
- later = datetime.datetime.now() + datetime.timedelta(
- seconds=int(max_age))
- # Wdy, DD-Mon-YY HH:MM:SS GMT
- expires = later.strftime('%a, %d %b %Y %H:%M:%S')
- # the Expires header is *required* at least for IE7 (IE7 does
- # not respect Max-Age)
- max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
- else:
- max_age = ''
-
- cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
- wild_domain = '.' + cur_domain
- cookies = [
- ('Set-Cookie', '%s="%s"; Path=/%s' % (
- self.cookie_name, value, max_age)),
- ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s' % (
- self.cookie_name, value, cur_domain, max_age)),
- ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s' % (
- self.cookie_name, value, wild_domain, max_age))
- ]
- return cookies
-
- # IIdentifier
def forget(self, request):
# return a set of expires Set-Cookie headers
environ = request.environ
- return self._get_cookies(environ, '""')
+ return self._get_cookies(environ, '', max_age=EXPIRE)
- # IIdentifier
- def remember(self, request, userid, tokens='', userdata='', max_age=None):
+ def remember(self, request, userid, max_age=None):
+ max_age = max_age or self.max_age
environ = request.environ
+
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
- cookies = get_cookies(environ)
- old_cookie = cookies.get(self.cookie_name)
- existing = cookies.get(self.cookie_name)
- old_cookie_value = getattr(existing, 'value', None)
-
- timestamp, old_userid, old_tokens, old_userdata = None, '', '', ''
-
- expired = False
-
- if old_cookie_value:
- try:
- (timestamp,old_userid,old_tokens,
- old_userdata) = auth_tkt.parse_ticket(
- self.secret, old_cookie_value, remote_addr)
- now = time.time()
- expired = self.timeout and ((timestamp + self.timeout) < now)
- except auth_tkt.BadTicket:
- expired = False
+ user_data = ''
encoding_data = self.userid_type_encoders.get(type(userid))
if encoding_data:
encoding, encoder = encoding_data
userid = encoder(userid)
- userdata = 'userid_type:%s' % encoding
+ user_data = 'userid_type:%s' % encoding
- if not isinstance(tokens, basestring):
- tokens = ','.join(tokens)
- if not isinstance(old_tokens, basestring):
- old_tokens = ','.join(old_tokens)
- old_data = (old_userid, old_tokens, old_userdata)
- new_data = (userid, tokens, userdata)
-
- if old_data != new_data or expired:
- ticket = auth_tkt.AuthTicket(
- self.secret,
- userid,
- remote_addr,
- tokens=tokens,
- user_data=userdata,
- cookie_name=self.cookie_name,
- secure=self.secure)
- new_cookie_value = ticket.cookie_value()
-
- cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
- wild_domain = '.' + cur_domain
- if old_cookie_value != new_cookie_value:
- # return a set of Set-Cookie headers
- return self._get_cookies(environ, new_cookie_value, max_age)
+ ticket = self.auth_tkt.AuthTicket(
+ self.secret,
+ userid,
+ remote_addr,
+ user_data=user_data,
+ cookie_name=self.cookie_name,
+ secure=self.secure)
+
+ cookie_value = ticket.cookie_value()
+ cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
+ wild_domain = '.' + cur_domain
+ return self._get_cookies(environ, cookie_value, max_age)
diff --git a/repoze/bfg/request.py b/repoze/bfg/request.py
index 1c6bae1da..416413c33 100644
--- a/repoze/bfg/request.py
+++ b/repoze/bfg/request.py
@@ -36,6 +36,11 @@ def create_route_request_factory(name):
return RouteRequest
+def add_global_response_headers(request, headerlist):
+ attrs = request.environ.setdefault('webob.adhoc_attrs', {})
+ response_headers = attrs.setdefault('global_response_headers', [])
+ response_headers.extend(headerlist)
+
from repoze.bfg.threadlocal import get_current_request as get_request # b/c
deprecated('get_request',
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 4e5722be2..03f93857c 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -121,13 +121,21 @@ class Router(object):
registry.has_listeners and registry.notify(NewResponse(response))
try:
- start_response(response.status, response.headerlist)
- return response.app_iter
+ headers = response.headerlist
+ app_iter = response.app_iter
+ status = response.status
except AttributeError:
raise ValueError(
'Non-response object returned from view named %s '
'(and no renderer): %r' % (view_name, response))
+ if 'global_response_headers' in attrs:
+ headers = list(headers)
+ headers.extend(attrs['global_response_headers'])
+
+ start_response(response.status, headers)
+ return response.app_iter
+
finally:
manager.pop()
diff --git a/repoze/bfg/tests/test_authentication.py b/repoze/bfg/tests/test_authentication.py
index 9420df1a1..b12a0ea19 100644
--- a/repoze/bfg/tests/test_authentication.py
+++ b/repoze/bfg/tests/test_authentication.py
@@ -269,6 +269,7 @@ class TestAuthTktCookieHelper(unittest.TestCase):
def _makeOne(self, *arg, **kw):
plugin = self._getTargetClass()(*arg, **kw)
+ plugin.auth_tkt = DummyAuthTktModule()
return plugin
def _makeRequest(self, kw=None):
@@ -279,21 +280,21 @@ class TestAuthTktCookieHelper(unittest.TestCase):
environ['SERVER_NAME'] = 'localhost'
return DummyRequest(environ)
- def _makeTicket(self, userid='userid', remote_addr='0.0.0.0',
- tokens = [], userdata='userdata',
- cookie_name='auth_tkt', secure=False,
- time=None):
- from paste.auth import auth_tkt
- ticket = auth_tkt.AuthTicket(
- 'secret',
- userid,
- remote_addr,
- tokens=tokens,
- user_data=userdata,
- time=time,
- cookie_name=cookie_name,
- secure=secure)
- return ticket.cookie_value()
+ def _cookieValue(self, cookie):
+ return eval(cookie.value)
+
+ def _parseHeaders(self, headers):
+ return [ self._parseHeader(header) for header in headers ]
+
+ def _parseHeader(self, header):
+ cookie = self._parseCookie(header[1])
+ return cookie
+
+ def _parseCookie(self, cookie):
+ from Cookie import SimpleCookie
+ cookies = SimpleCookie()
+ cookies.load(cookie)
+ return cookies.get('auth_tkt')
def test_identify_nocookie(self):
plugin = self._makeOne('secret')
@@ -303,272 +304,240 @@ class TestAuthTktCookieHelper(unittest.TestCase):
def test_identify_good_cookie_include_ip(self):
plugin = self._makeOne('secret', include_ip=True)
- val = self._makeTicket(remote_addr='1.1.1.1')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'})
result = plugin.identify(request)
self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['tokens'], ())
self.assertEqual(result['userid'], 'userid')
- self.assertEqual(result['userdata'], 'userdata')
- self.failUnless('timestamp' in result)
+ self.assertEqual(result['userdata'], '')
+ self.assertEqual(result['timestamp'], 0)
+ self.assertEqual(plugin.auth_tkt.value, 'ticket')
+ self.assertEqual(plugin.auth_tkt.remote_addr, '1.1.1.1')
+ self.assertEqual(plugin.auth_tkt.secret, 'secret')
environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
- self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'],'')
self.assertEqual(environ['AUTH_TYPE'],'cookie')
def test_identify_good_cookie_dont_include_ip(self):
plugin = self._makeOne('secret', include_ip=False)
- val = self._makeTicket()
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'})
result = plugin.identify(request)
self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['tokens'], ())
self.assertEqual(result['userid'], 'userid')
- self.assertEqual(result['userdata'], 'userdata')
- self.failUnless('timestamp' in result)
+ self.assertEqual(result['userdata'], '')
+ self.assertEqual(result['timestamp'], 0)
+ self.assertEqual(plugin.auth_tkt.value, 'ticket')
+ self.assertEqual(plugin.auth_tkt.remote_addr, '0.0.0.0')
+ self.assertEqual(plugin.auth_tkt.secret, 'secret')
environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
- self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'],'')
self.assertEqual(environ['AUTH_TYPE'],'cookie')
def test_identify_good_cookie_int_useridtype(self):
plugin = self._makeOne('secret', include_ip=False)
- val = self._makeTicket(userid='1', userdata='userid_type:int')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ plugin.auth_tkt.userid = '1'
+ plugin.auth_tkt.user_data = 'userid_type:int'
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'})
result = plugin.identify(request)
self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], [''])
+ self.assertEqual(result['tokens'], ())
self.assertEqual(result['userid'], 1)
self.assertEqual(result['userdata'], 'userid_type:int')
- self.failUnless('timestamp' in result)
+ self.assertEqual(result['timestamp'], 0)
environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:int')
self.assertEqual(environ['AUTH_TYPE'],'cookie')
def test_identify_good_cookie_unknown_useridtype(self):
plugin = self._makeOne('secret', include_ip=False)
- val = self._makeTicket(userid='userid', userdata='userid_type:unknown')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
+ plugin.auth_tkt.userid = 'abc'
+ plugin.auth_tkt.user_data = 'userid_type:unknown'
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'})
result = plugin.identify(request)
self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], [''])
- self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 'abc')
self.assertEqual(result['userdata'], 'userid_type:unknown')
- self.failUnless('timestamp' in result)
+ self.assertEqual(result['timestamp'], 0)
environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:unknown')
self.assertEqual(environ['AUTH_TYPE'],'cookie')
+ def test_identify_good_cookie_b64str_useridtype(self):
+ plugin = self._makeOne('secret', include_ip=False)
+ plugin.auth_tkt.userid = 'encoded'.encode('base64').strip()
+ plugin.auth_tkt.user_data = 'userid_type:b64str'
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'})
+ result = plugin.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 'encoded')
+ self.assertEqual(result['userdata'], 'userid_type:b64str')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:b64str')
+ self.assertEqual(environ['AUTH_TYPE'],'cookie')
+
+ def test_identify_good_cookie_b64unicode_useridtype(self):
+ plugin = self._makeOne('secret', include_ip=False)
+ plugin.auth_tkt.userid = '\xc3\xa9ncoded'.encode('base64').strip()
+ plugin.auth_tkt.user_data = 'userid_type:b64unicode'
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'})
+ result = plugin.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], unicode('\xc3\xa9ncoded', 'utf-8'))
+ self.assertEqual(result['userdata'], 'userid_type:b64unicode')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:b64unicode')
+ self.assertEqual(environ['AUTH_TYPE'],'cookie')
+
def test_identify_bad_cookie(self):
plugin = self._makeOne('secret', include_ip=True)
+ plugin.auth_tkt.parse_raise = True
request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'})
result = plugin.identify(request)
self.assertEqual(result, None)
- def test_remember_creds_same(self):
- plugin = self._makeOne('secret')
- val = self._makeTicket(userid='userid')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
- result = plugin.remember(request, 'userid', userdata='userdata')
+ def test_identify_cookie_timed_out(self):
+ plugin = self._makeOne('secret', timeout=1)
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'})
+ result = plugin.identify(request)
self.assertEqual(result, None)
- def test_remember_creds_different(self):
+ def test_identify_cookie_reissue(self):
+ import time
+ plugin = self._makeOne('secret', timeout=5, reissue_time=0)
+ plugin.auth_tkt.timestamp = time.time()
+ request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'})
+ result = plugin.identify(request)
+ self.failUnless(result)
+ attrs = request.environ['webob.adhoc_attrs']
+ response_headers = attrs['global_response_headers']
+ self.assertEqual(len(response_headers), 3)
+ self.assertEqual(response_headers[0][0], 'Set-Cookie')
+
+ def test_remember(self):
plugin = self._makeOne('secret')
- old_val = self._makeTicket(userid='userid')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
- result = plugin.remember(request, 'other', userdata='userdata')
+ request = self._makeRequest()
+ result = plugin.remember(request, 'userid')
self.assertEqual(len(result), 3)
self.assertEqual(result[0][0], 'Set-Cookie')
self.failUnless(result[0][1].endswith('; Path=/'))
self.failUnless(result[0][1].startswith('auth_tkt='))
- self.failIf(result[0][1].startswith('auth_tkt="%s"' % old_val))
self.assertEqual(result[1][0], 'Set-Cookie')
self.failUnless(result[1][1].endswith('; Path=/; Domain=localhost'))
self.failUnless(result[1][1].startswith('auth_tkt='))
- self.failIf(result[1][1].startswith('auth_tkt="%s"' % old_val))
self.assertEqual(result[2][0], 'Set-Cookie')
self.failUnless(result[2][1].endswith('; Path=/; Domain=.localhost'))
self.failUnless(result[2][1].startswith('auth_tkt='))
- self.failIf(result[2][1].startswith('auth_tkt="%s"' % old_val))
- def test_remember_creds_different_include_ip(self):
+ def test_remember_include_ip(self):
plugin = self._makeOne('secret', include_ip=True)
- old_val = self._makeTicket(userid='userid', remote_addr='1.1.1.1')
- request = self._makeRequest({'HTTP_COOKIE': 'auth_tkt=%s' % old_val})
- new_val = self._makeTicket(userid='other',
- userdata='userdata',
- remote_addr='1.1.1.1')
- result = plugin.remember(request, 'other', userdata='userdata')
- self.assertEqual(len(result), 3)
- self.assertEqual(result[0],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/' % new_val))
- self.assertEqual(result[1],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/; Domain=localhost'
- % new_val))
- self.assertEqual(result[2],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/; Domain=.localhost'
- % new_val))
-
- def test_remember_creds_different_bad_old_cookie(self):
- plugin = self._makeOne('secret')
- old_val = 'BOGUS'
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
- new_val = self._makeTicket(userid='other', userdata='userdata')
- result = plugin.remember(request, userid='other', userdata='userdata')
+ request = self._makeRequest()
+ result = plugin.remember(request, 'other')
self.assertEqual(len(result), 3)
- self.assertEqual(result[0],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/' % new_val))
- self.assertEqual(result[1],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/; Domain=localhost'
- % new_val))
- self.assertEqual(result[2],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/; Domain=.localhost'
- % new_val))
-
- def test_remember_creds_different_with_nonstring_tokens(self):
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.failUnless(result[0][1].endswith('; Path=/'))
+ self.failUnless(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.failUnless(result[1][1].endswith('; Path=/; Domain=localhost'))
+ self.failUnless(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.failUnless(result[2][1].endswith('; Path=/; Domain=.localhost'))
+ self.failUnless(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_string_userid(self):
plugin = self._makeOne('secret')
- old_val = self._makeTicket(userid='userid')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
- new_val = self._makeTicket(userid='other',
- userdata='userdata',
- tokens='foo,bar',
- )
- result = plugin.remember(request, 'other',
- userdata='userdata',
- tokens=['foo', 'bar'],
- )
+ request = self._makeRequest()
+ result = plugin.remember(request, 'userid')
+ values = self._parseHeaders(result)
self.assertEqual(len(result), 3)
- self.assertEqual(result[0],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/' % new_val))
- self.assertEqual(result[1],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/; Domain=localhost'
- % new_val))
- self.assertEqual(result[2],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/; Domain=.localhost'
- % new_val))
-
- def test_remember_creds_different_int_userid(self):
+ val = self._cookieValue(values[0])
+ self.assertEqual(val['userid'], 'userid'.encode('base64').strip())
+ self.assertEqual(val['user_data'], 'userid_type:b64str')
+
+ def test_remember_int_userid(self):
plugin = self._makeOne('secret')
- old_val = self._makeTicket(userid='userid')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
- new_val = self._makeTicket(userid='1', userdata='userid_type:int')
+ request = self._makeRequest()
result = plugin.remember(request, 1)
-
+ values = self._parseHeaders(result)
self.assertEqual(len(result), 3)
- self.assertEqual(result[0],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/' % new_val))
+ val = self._cookieValue(values[0])
+ self.assertEqual(val['userid'], '1')
+ self.assertEqual(val['user_data'], 'userid_type:int')
- def test_remember_creds_different_long_userid(self):
+ def test_remember_long_userid(self):
plugin = self._makeOne('secret')
- old_val = self._makeTicket(userid='userid')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
- new_val = self._makeTicket(userid='1', userdata='userid_type:int')
+ request = self._makeRequest()
result = plugin.remember(request, long(1))
+ values = self._parseHeaders(result)
self.assertEqual(len(result), 3)
- self.assertEqual(result[0],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/' % new_val))
+ val = self._cookieValue(values[0])
+ self.assertEqual(val['userid'], '1')
+ self.assertEqual(val['user_data'], 'userid_type:int')
- def test_remember_creds_different_unicode_userid(self):
+ def test_remember_unicode_userid(self):
plugin = self._makeOne('secret')
- old_val = self._makeTicket(userid='userid')
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
+ request = self._makeRequest()
userid = unicode('\xc2\xa9', 'utf-8')
- new_val = self._makeTicket(userid=userid.encode('utf-8'),
- userdata='userid_type:unicode')
result = plugin.remember(request, userid)
- self.assertEqual(type(result[0][1]), str)
+ values = self._parseHeaders(result)
self.assertEqual(len(result), 3)
- self.assertEqual(result[0],
- ('Set-Cookie',
- 'auth_tkt="%s"; Path=/' % new_val))
+ val = self._cookieValue(values[0])
+ self.assertEqual(val['userid'],
+ userid.encode('utf-8').encode('base64').strip())
+ self.assertEqual(val['user_data'], 'userid_type:b64unicode')
def test_remember_max_age(self):
plugin = self._makeOne('secret')
- environ = {'HTTP_HOST':'example.com'}
- tkt = self._makeTicket(userid='chris', userdata='')
- request = self._makeRequest(environ)
- result = plugin.remember(request, 'chris', max_age='500')
-
- name,value = result.pop(0)
- self.assertEqual('Set-Cookie', name)
- self.failUnless(
- value.startswith('auth_tkt="%s"; Path=/; Max-Age=500' % tkt),
- value)
- self.failUnless('; Expires=' in value)
-
- name,value = result.pop(0)
- self.assertEqual('Set-Cookie', name)
- self.failUnless(
- value.startswith(
- 'auth_tkt="%s"; Path=/; Domain=example.com; Max-Age=500'
- % tkt), value)
- self.failUnless('; Expires=' in value)
-
- name,value = result.pop(0)
- self.assertEqual('Set-Cookie', name)
- self.failUnless(
- value.startswith(
- 'auth_tkt="%s"; Path=/; Domain=.example.com; Max-Age=500' % tkt),
- value)
- self.failUnless('; Expires=' in value)
-
- def test_remember_reissue_expired_cookie(self):
- import time
- plugin = self._makeOne('secret', timeout=2, reissue_time=1)
- old_val = self._makeTicket(userid='userid', time=time.time()-3)
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
- result = plugin.remember(request, 'userid', userdata='userdata')
- self.failIf(result is None, 'not re-issued?')
+ request = self._makeRequest()
+ userid = unicode('\xc2\xa9', 'utf-8')
+ result = plugin.remember(request, 'userid', max_age='500')
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+ self.assertEqual(values[0]['max-age'], '500')
+ self.assertEqual(values[0]['expires'], 'Fri,') # SimpleCookie problem
+
def test_forget(self):
plugin = self._makeOne('secret')
request = self._makeRequest()
headers = plugin.forget(request)
self.assertEqual(len(headers), 3)
- header = headers[0]
- name, value = header
+ name, value = headers[0]
self.assertEqual(name, 'Set-Cookie')
- self.assertEqual(value, 'auth_tkt=""""; Path=/')
- header = headers[1]
- name, value = header
+ self.assertEqual(value,
+ 'auth_tkt=""; Path=/; Max-Age=0; Expires=Wed, 31-Dec-97 23:59:59 GMT')
+ name, value = headers[1]
self.assertEqual(name, 'Set-Cookie')
- self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=localhost')
- header = headers[2]
- name, value = header
+ self.assertEqual(value,
+ 'auth_tkt=""; Path=/; Domain=localhost; Max-Age=0; '
+ 'Expires=Wed, 31-Dec-97 23:59:59 GMT')
+ name, value = headers[2]
self.assertEqual(name, 'Set-Cookie')
- self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=.localhost')
-
- def test_timeout_no_reissue(self):
- self.assertRaises(ValueError, self._makeOne, 'userid', timeout=1)
+ self.assertEqual(value,
+ 'auth_tkt=""; Path=/; Domain=.localhost; Max-Age=0; '
+ 'Expires=Wed, 31-Dec-97 23:59:59 GMT')
def test_timeout_lower_than_reissue(self):
self.assertRaises(ValueError, self._makeOne, 'userid', timeout=1,
reissue_time=2)
- def test_identify_bad_cookie_expired(self):
- import time
- helper = self._makeOne('secret', timeout=2, reissue_time=1)
- val = self._makeTicket(userid='userid', time=time.time()-3)
- request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % val})
- result = helper.identify(request)
- self.assertEqual(result, None)
-
class DummyContext:
pass
@@ -597,3 +566,38 @@ class DummyCookieHelper:
def forget(self, *arg):
return []
+class DummyAuthTktModule(object):
+ def __init__(self, timestamp=0, userid='userid', tokens=(), user_data='',
+ parse_raise=False):
+ self.timestamp = timestamp
+ self.userid = userid
+ self.tokens = tokens
+ self.user_data = user_data
+ self.parse_raise = parse_raise
+ def parse_ticket(secret, value, remote_addr):
+ self.secret = secret
+ self.value = value
+ self.remote_addr = remote_addr
+ if self.parse_raise:
+ raise self.BadTicket()
+ return self.timestamp, self.userid, self.tokens, self.user_data
+ self.parse_ticket = parse_ticket
+
+ class AuthTicket(object):
+ def __init__(self, secret, userid, remote_addr, **kw):
+ self.secret = secret
+ self.userid = userid
+ self.remote_addr = remote_addr
+ self.kw = kw
+
+ def cookie_value(self):
+ result = {'secret':self.secret, 'userid':self.userid,
+ 'remote_addr':self.remote_addr}
+ result.update(self.kw)
+ result = repr(result)
+ return result
+ self.AuthTicket = AuthTicket
+
+ class BadTicket(Exception):
+ pass
+
diff --git a/repoze/bfg/tests/test_request.py b/repoze/bfg/tests/test_request.py
index 8aeeff42d..78564e57e 100644
--- a/repoze/bfg/tests/test_request.py
+++ b/repoze/bfg/tests/test_request.py
@@ -99,6 +99,26 @@ class Test_create_route_request_factory(unittest.TestCase):
self.failUnless(IRouteRequest.implementedBy(factory))
self.failUnless(IRequest.implementedBy(factory))
+class Test_add_global_response_headers(unittest.TestCase):
+ def _callFUT(self, request, headerlist):
+ from repoze.bfg.request import add_global_response_headers
+ return add_global_response_headers(request, headerlist)
+
+ def test_no_adhoc_attrs(self):
+ request = DummyRequest()
+ headers = [('a', 1), ('b', 2)]
+ self._callFUT(request, headers)
+ attrs = request.environ['webob.adhoc_attrs']
+ self.assertEqual(attrs['global_response_headers'], headers)
+
+ def test_with_adhoc_attrs(self):
+ request = DummyRequest()
+ headers = [('a', 1), ('b', 2)]
+ attrs = request.environ['webob.adhoc_attrs'] = {}
+ attrs['global_response_headers'] = headers[:]
+ self._callFUT(request, [('c', 1)])
+ self.assertEqual(attrs['global_response_headers'], headers + [('c', 1)])
+
class DummyRoute:
def __init__(self, name):
self.name = name
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index 76b33d204..9306640b4 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -391,6 +391,27 @@ class TestRouter(unittest.TestCase):
response = router(environ, start_response)
self.assertEqual(start_response.status, '201 Created')
+ def test_call_request_has_global_response_headers(self):
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ self._registerTraverserFactory(context, subpath=[''])
+ response = DummyResponse('200 OK')
+ response.headerlist = [('a', 1)]
+ view = DummyView(response)
+ environ = self._makeEnviron()
+ environ['webob.adhoc_attrs'] = {'global_response_headers':[('b', 2)]}
+ self._registerView(view, '', IContext, IRequest)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ response = router(environ, start_response)
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(start_response.headers, [('a', 1), ('b', 2)])
+
def test_call_eventsends(self):
context = DummyContext()
self._registerTraverserFactory(context)
diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py
index 8bd50842c..49b8d8cd1 100644
--- a/repoze/bfg/tests/test_zcml.py
+++ b/repoze/bfg/tests/test_zcml.py
@@ -1036,8 +1036,7 @@ class TestViewDirective(unittest.TestCase):
context = DummyContext()
view = lambda *arg: None
sm = getSiteManager()
- def view(context, request):
- return '123'
+ def view(context, request): pass
self._callFUT(context, None, IFoo, view=view, path_info='/boo')
actions = context.actions
self.assertEqual(len(actions), 1)
diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py
index 11331ab9e..e2ce7a030 100644
--- a/repoze/bfg/zcml.py
+++ b/repoze/bfg/zcml.py
@@ -499,6 +499,7 @@ class IAuthTktAuthenticationPolicyDirective(Interface):
include_ip = Bool(title=u"include_ip", required=False, default=False)
timeout = Int(title=u"timeout", required=False, default=None)
reissue_time = Int(title=u"reissue_time", required=False, default=None)
+ max_age = Int(title=u"max_age", required=False, default=None)
def authtktauthenticationpolicy(_context,
secret,
@@ -507,7 +508,8 @@ def authtktauthenticationpolicy(_context,
secure=False,
include_ip=False,
timeout=None,
- reissue_time=None):
+ reissue_time=None,
+ max_age=None):
try:
policy = AuthTktAuthenticationPolicy(secret,
callback=callback,
@@ -515,7 +517,8 @@ def authtktauthenticationpolicy(_context,
secure=secure,
include_ip = include_ip,
timeout = timeout,
- reissue_time = reissue_time)
+ reissue_time = reissue_time,
+ max_age=max_age)
except ValueError, why:
raise ConfigurationError(str(why))
# authentication policies must be registered eagerly so they can