summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Merickel <michael@merickel.org>2013-10-05 03:48:52 -0500
committerMichael Merickel <michael@merickel.org>2013-10-05 03:48:52 -0500
commit4fade654a42b88ea1f042af974f76b97d326c455 (patch)
tree7dfafc390b39dae761cf7fe746863f56bd7a2765
parent3a6cbcce80b5292082b2f4e2f920d2df127e2774 (diff)
downloadpyramid-4fade654a42b88ea1f042af974f76b97d326c455.tar.gz
pyramid-4fade654a42b88ea1f042af974f76b97d326c455.tar.bz2
pyramid-4fade654a42b88ea1f042af974f76b97d326c455.zip
introduce SignedCookieSessionFactory
- Break apart UnencryptedCookieSessionFactoryConfig into a BaseCookieSessionFactory. - Add support for reissue_time in the base. Set the unencrypted class to use reissue_time=0 for bw-compat. - Add SignedCookieSessionFactory which wraps the base in a serializer that uses signing via a sha512+hmac with a secret derived using an 8-byte random salt.
-rw-r--r--pyramid/session.py400
-rw-r--r--pyramid/tests/test_session.py290
2 files changed, 508 insertions, 182 deletions
diff --git a/pyramid/session.py b/pyramid/session.py
index db86ca32b..e6635ca1b 100644
--- a/pyramid/session.py
+++ b/pyramid/session.py
@@ -21,20 +21,26 @@ from pyramid.interfaces import ISession
from pyramid.util import strings_differ
def manage_accessed(wrapped):
- """ Decorator which causes a cookie to be set when a wrapped
- method is called"""
+ """ Decorator which causes a cookie to be renewed when an accessor
+ method is called."""
def accessed(session, *arg, **kw):
- session.accessed = int(time.time())
- if not session._dirty:
- session._dirty = True
- def set_cookie_callback(request, response):
- session._set_cookie(response)
- session.request = None # explicitly break cycle for gc
- session.request.add_response_callback(set_cookie_callback)
+ session.accessed = now = int(time.time())
+ if now - session.renewed > session._reissue_time:
+ session.changed()
return wrapped(session, *arg, **kw)
accessed.__doc__ = wrapped.__doc__
return accessed
+def manage_changed(wrapped):
+ """ Decorator which causes a cookie to be set when a setter method
+ is called."""
+ def changed(session, *arg, **kw):
+ session.accessed = int(time.time())
+ session.changed()
+ return wrapped(session, *arg, **kw)
+ changed.__doc__ = wrapped.__doc__
+ return changed
+
def signed_serialize(data, secret):
""" Serialize any pickleable structure (``data``) and sign it
using the ``secret`` (must be a string). Return the
@@ -67,13 +73,13 @@ def signed_deserialize(serialized, secret, hmac=hmac):
"""
# hmac parameterized only for unit tests
try:
- input_sig, pickled = (serialized[:40],
+ input_sig, pickled = (bytes_(serialized[:40]),
base64.b64decode(bytes_(serialized[40:])))
except (binascii.Error, TypeError) as e:
# Badly formed data can make base64 die
raise ValueError('Badly formed base64 data: %s' % e)
- sig = hmac.new(bytes_(secret), pickled, sha1).hexdigest()
+ sig = bytes_(hmac.new(bytes_(secret), pickled, sha1).hexdigest())
# Avoid timing attacks (see
# http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
@@ -113,89 +119,108 @@ def check_csrf_token(request,
return False
return True
-def UnencryptedCookieSessionFactoryConfig(
- secret,
- timeout=1200,
+def BaseCookieSessionFactory(
+ serializer,
cookie_name='session',
- cookie_max_age=None,
- cookie_path='/',
- cookie_domain=None,
- cookie_secure=False,
- cookie_httponly=False,
- cookie_on_exception=True,
- signed_serialize=signed_serialize,
- signed_deserialize=signed_deserialize,
+ max_age=None,
+ path='/',
+ domain=None,
+ secure=False,
+ httponly=False,
+ timeout=1200,
+ reissue_time=0,
+ set_on_exception=True,
):
"""
- Configure a :term:`session factory` which will provide unencrypted
- (but signed) cookie-based sessions. The return value of this
- function is a :term:`session factory`, which may be provided as
- the ``session_factory`` argument of a
- :class:`pyramid.config.Configurator` constructor, or used
+ Configure a :term:`session factory` which will provide cookie-based
+ sessions. The return value of this function is a
+ :term:`session factory`, which may be provided as the ``session_factory``
+ argument of a :class:`pyramid.config.Configurator` constructor, or used
as the ``session_factory`` argument of the
- :meth:`pyramid.config.Configurator.set_session_factory`
- method.
+ :meth:`pyramid.config.Configurator.set_session_factory` method.
The session factory returned by this function will create sessions
which are limited to storing fewer than 4000 bytes of data (as the
payload must fit into a single cookie).
- Parameters:
+ .. warning:
- ``secret``
- A string which is used to sign the cookie.
+ This class provides no protection from tampering and is only intended
+ to be used by framework authors to create their own cookie-based
+ session factories.
- ``timeout``
- A number of seconds of inactivity before a session times out.
+ Parameters:
+
+ ``serializer``
+ An object with 2 methods, ``loads`` and ``dumps``, which will be used
+ to perform serialization and deserialization.
+ - ``dumps(value)`` should accept a Python object and return a
+ bytestring which can later be deserialized with ``loads``.
+ - ``loads(value)`` should expect to receive a bytestring, generated by
+ ``dumps`` and return a Python object.
``cookie_name``
- The name of the cookie used for sessioning.
+ The name of the cookie used for sessioning. Default: ``'session'``.
- ``cookie_max_age``
+ ``max_age``
The maximum age of the cookie used for sessioning (in seconds).
Default: ``None`` (browser scope).
- ``cookie_path``
- The path used for the session cookie.
+ ``path``
+ The path used for the session cookie. Default: ``'/'``.
- ``cookie_domain``
+ ``domain``
The domain used for the session cookie. Default: ``None`` (no domain).
- ``cookie_secure``
- The 'secure' flag of the session cookie.
+ ``secure``
+ The 'secure' flag of the session cookie. Default: ``False``.
- ``cookie_httponly``
- The 'httpOnly' flag of the session cookie.
+ ``httponly``
+ Hide the cookie from Javascript by setting the 'HttpOnly' flag of the
+ session cookie. Default: ``False``.
- ``cookie_on_exception``
+ ``timeout``
+ A number of seconds of inactivity before a session times out. If
+ ``None`` then the cookie never expires. Default: 1200.
+
+ ``reissue_time``
+ The number of seconds that must pass before the cookie is automatically
+ reissued as the result of a request which accesses the session. The
+ duration is measured as the number of seconds since the last session
+ cookie was issued and 'now'. If this value is ``0``, a new cookie
+ will be reissued on every request accesses the session. If ``None``
+ then the cookie's lifetime will never be extended.
+
+ A good rule of thumb: if you want auto-expired cookies based on
+ inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
+ ``reissue_time`` value to perhaps a tenth of the ``timeout`` value
+ (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower
+ than the ``reissue_time`` value, as the ticket will never be reissued.
+ However, such a configuration is not explicitly prevented.
+
+ Default: ``0``.
+
+ ``set_on_exception``
If ``True``, set a session cookie even if an exception occurs
- while rendering a view.
-
- ``signed_serialize``
- A callable which takes more or less arbitrary Python data structure and
- a secret and returns a signed serialization in bytes.
- Default: ``signed_serialize`` (using pickle).
+ while rendering a view. Default: ``True``.
- ``signed_deserialize``
- A callable which takes a signed and serialized data structure in bytes
- and a secret and returns the original data structure if the signature
- is valid. Default: ``signed_deserialize`` (using pickle).
+ .. versionadded: 1.5a3
"""
@implementer(ISession)
- class SignedCookieSession(dict):
+ class CookieSession(dict):
""" Dictionary-like session object """
# configuration parameters
_cookie_name = cookie_name
- _cookie_max_age = cookie_max_age
- _cookie_path = cookie_path
- _cookie_domain = cookie_domain
- _cookie_secure = cookie_secure
- _cookie_httponly = cookie_httponly
- _cookie_on_exception = cookie_on_exception
- _secret = secret
+ _cookie_max_age = max_age
+ _cookie_path = path
+ _cookie_domain = domain
+ _cookie_secure = secure
+ _cookie_httponly = httponly
+ _cookie_on_exception = set_on_exception
_timeout = timeout
+ _reissue_time = reissue_time
# dirty flag
_dirty = False
@@ -203,33 +228,40 @@ def UnencryptedCookieSessionFactoryConfig(
def __init__(self, request):
self.request = request
now = time.time()
- created = accessed = now
+ created = renewed = now
new = True
value = None
state = {}
cookieval = request.cookies.get(self._cookie_name)
if cookieval is not None:
try:
- value = signed_deserialize(cookieval, self._secret)
+ value = serializer.loads(bytes_(cookieval))
except ValueError:
value = None
if value is not None:
- accessed, created, state = value
- new = False
- if now - accessed > self._timeout:
+ try:
+ renewed, created, state = value
+ new = False
+ if now - renewed > self._timeout:
+ state = {}
+ except TypeError:
state = {}
self.created = created
- self.accessed = accessed
+ self.accessed = renewed
+ self.renewed = renewed
self.new = new
dict.__init__(self, state)
# ISession methods
def changed(self):
- """ This is intentionally a noop; the session is
- serialized on every access, so unnecessary"""
- pass
+ if not self._dirty:
+ self._dirty = True
+ def set_cookie_callback(request, response):
+ self._set_cookie(response)
+ self.request = None # explicitly break cycle for gc
+ self.request.add_response_callback(set_cookie_callback)
def invalidate(self):
self.clear() # XXX probably needs to unset cookie
@@ -251,22 +283,22 @@ def UnencryptedCookieSessionFactoryConfig(
has_key = manage_accessed(dict.has_key)
# modifying dictionary methods
- clear = manage_accessed(dict.clear)
- update = manage_accessed(dict.update)
- setdefault = manage_accessed(dict.setdefault)
- pop = manage_accessed(dict.pop)
- popitem = manage_accessed(dict.popitem)
- __setitem__ = manage_accessed(dict.__setitem__)
- __delitem__ = manage_accessed(dict.__delitem__)
+ clear = manage_changed(dict.clear)
+ update = manage_changed(dict.update)
+ setdefault = manage_changed(dict.setdefault)
+ pop = manage_changed(dict.pop)
+ popitem = manage_changed(dict.popitem)
+ __setitem__ = manage_changed(dict.__setitem__)
+ __delitem__ = manage_changed(dict.__delitem__)
# flash API methods
- @manage_accessed
+ @manage_changed
def flash(self, msg, queue='', allow_duplicate=True):
storage = self.setdefault('_f_' + queue, [])
if allow_duplicate or (msg not in storage):
storage.append(msg)
- @manage_accessed
+ @manage_changed
def pop_flash(self, queue=''):
storage = self.pop('_f_' + queue, [])
return storage
@@ -277,7 +309,7 @@ def UnencryptedCookieSessionFactoryConfig(
return storage
# CSRF API methods
- @manage_accessed
+ @manage_changed
def new_csrf_token(self):
token = text_(binascii.hexlify(os.urandom(20)))
self['_csrft_'] = token
@@ -296,9 +328,9 @@ def UnencryptedCookieSessionFactoryConfig(
exception = getattr(self.request, 'exception', None)
if exception is not None: # dont set a cookie during exceptions
return False
- cookieval = signed_serialize(
- (self.accessed, self.created, dict(self)), self._secret
- )
+ cookieval = native_(serializer.dumps(
+ (self.accessed, self.created, dict(self))
+ ))
if len(cookieval) > 4064:
raise ValueError(
'Cookie value is too long to store (%s bytes)' %
@@ -315,11 +347,10 @@ def UnencryptedCookieSessionFactoryConfig(
)
return True
- return SignedCookieSession
+ return CookieSession
-def SignedCookieSessionFactory(
+def UnencryptedCookieSessionFactoryConfig(
secret,
- hashalg='sha512',
timeout=1200,
cookie_name='session',
cookie_max_age=None,
@@ -328,6 +359,97 @@ def SignedCookieSessionFactory(
cookie_secure=False,
cookie_httponly=False,
cookie_on_exception=True,
+ signed_serialize=signed_serialize,
+ signed_deserialize=signed_deserialize,
+ ):
+ """
+ Configure a :term:`session factory` which will provide unencrypted
+ (but signed) cookie-based sessions. The return value of this
+ function is a :term:`session factory`, which may be provided as
+ the ``session_factory`` argument of a
+ :class:`pyramid.config.Configurator` constructor, or used
+ as the ``session_factory`` argument of the
+ :meth:`pyramid.config.Configurator.set_session_factory`
+ method.
+
+ The session factory returned by this function will create sessions
+ which are limited to storing fewer than 4000 bytes of data (as the
+ payload must fit into a single cookie).
+
+ Parameters:
+
+ ``secret``
+ A string which is used to sign the cookie.
+
+ ``timeout``
+ A number of seconds of inactivity before a session times out.
+
+ ``cookie_name``
+ The name of the cookie used for sessioning.
+
+ ``cookie_max_age``
+ The maximum age of the cookie used for sessioning (in seconds).
+ Default: ``None`` (browser scope).
+
+ ``cookie_path``
+ The path used for the session cookie.
+
+ ``cookie_domain``
+ The domain used for the session cookie. Default: ``None`` (no domain).
+
+ ``cookie_secure``
+ The 'secure' flag of the session cookie.
+
+ ``cookie_httponly``
+ The 'httpOnly' flag of the session cookie.
+
+ ``cookie_on_exception``
+ If ``True``, set a session cookie even if an exception occurs
+ while rendering a view.
+
+ ``signed_serialize``
+ A callable which takes more or less arbitrary Python data structure and
+ a secret and returns a signed serialization in bytes.
+ Default: ``signed_serialize`` (using pickle).
+
+ ``signed_deserialize``
+ A callable which takes a signed and serialized data structure in bytes
+ and a secret and returns the original data structure if the signature
+ is valid. Default: ``signed_deserialize`` (using pickle).
+ """
+
+ class _Serializer(object):
+ def dumps(self, appstruct):
+ return signed_serialize(appstruct, secret)
+
+ def loads(self, bstruct):
+ return signed_deserialize(bstruct, secret)
+
+ return BaseCookieSessionFactory(
+ _Serializer(),
+ cookie_name=cookie_name,
+ max_age=cookie_max_age,
+ path=cookie_path,
+ domain=cookie_domain,
+ secure=cookie_secure,
+ httponly=cookie_httponly,
+ timeout=timeout,
+ reissue_time=0, # to keep session.accessed == session.renewed
+ set_on_exception=cookie_on_exception,
+ )
+
+def SignedCookieSessionFactory(
+ secret,
+ cookie_name='session',
+ max_age=None,
+ path='/',
+ domain=None,
+ secure=False,
+ httponly=False,
+ set_on_exception=True,
+ timeout=1200,
+ reissue_time=0,
+ hashalg='sha512',
serializer=None,
):
"""
@@ -353,86 +475,106 @@ def SignedCookieSessionFactory(
The HMAC digest algorithm to use for signing. The algorithm must be
supported by the :mod:`hashlib` library. Default: ``'sha512'``.
- ``timeout``
- A number of seconds of inactivity before a session times out.
- Default: 1200.
-
``cookie_name``
The name of the cookie used for sessioning. Default: ``'session'``.
- ``cookie_max_age``
+ ``max_age``
The maximum age of the cookie used for sessioning (in seconds).
Default: ``None`` (browser scope).
- ``cookie_path``
+ ``path``
The path used for the session cookie. Default: ``'/'``.
- ``cookie_domain``
+ ``domain``
The domain used for the session cookie. Default: ``None`` (no domain).
- ``cookie_secure``
+ ``secure``
The 'secure' flag of the session cookie. Default: ``False``.
- ``cookie_httponly``
- The 'httpOnly' flag of the session cookie. Default: ``False``.
+ ``httponly``
+ Hide the cookie from Javascript by setting the 'HttpOnly' flag of the
+ session cookie. Default: ``False``.
- ``cookie_on_exception``
+ ``timeout``
+ A number of seconds of inactivity before a session times out. If
+ ``None`` then the cookie never expires. Default: 1200.
+
+ ``reissue_time``
+ The number of seconds that must pass before the cookie is automatically
+ reissued as the result of a request which accesses the session. The
+ duration is measured as the number of seconds since the last session
+ cookie was issued and 'now'. If this value is ``0``, a new cookie
+ will be reissued on every request accesses the session. If ``None``
+ then the cookie's lifetime will never be extended.
+
+ A good rule of thumb: if you want auto-expired cookies based on
+ inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
+ ``reissue_time`` value to perhaps a tenth of the ``timeout`` value
+ (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower
+ than the ``reissue_time`` value, as the ticket will never be reissued.
+ However, such a configuration is not explicitly prevented.
+
+ Default: ``0``.
+
+ ``set_on_exception``
If ``True``, set a session cookie even if an exception occurs
while rendering a view. Default: ``True``.
``serializer``
- An object which can convert data between Python objects and bytestrings.
- - ``loads`` should accept a bytestring and return a Python object.
- - ``dumps`` should accept a Python object and return a bytestring.
- The default implementation uses Python's pickle module.
+ An object with 2 methods, ``loads`` and ``dumps``, which will be used
+ to perform serialization and deserialization. The value generated from
+ serialization will be cryptographically signed to prevent tampering.
+ A ``ValueError`` should be raised if deserialization fails.
+
+ .. versionadded: 1.5a3
"""
if serializer is None:
serializer = _PickleSerializer()
- signer = _SignedSerializer(hashalg, serializer)
+ signed_serializer = _SignedSerializer(secret, hashalg, serializer)
- return UnencryptedCookieSessionFactoryConfig(
- secret,
- timeout=timeout,
+ return BaseCookieSessionFactory(
+ signed_serializer,
cookie_name=cookie_name,
- cookie_max_age=cookie_max_age,
- cookie_path=cookie_path,
- cookie_domain=cookie_domain,
- cookie_secure=cookie_secure,
- cookie_httponly=cookie_httponly,
- cookie_on_exception=cookie_on_exception,
- signed_serialize=signer.dumps,
- signed_deserialize=signer.loads,
+ max_age=max_age,
+ path=path,
+ domain=domain,
+ secure=secure,
+ httponly=httponly,
+ timeout=timeout,
+ reissue_time=reissue_time,
+ set_on_exception=set_on_exception,
)
class _PickleSerializer(object):
- def dumps(self, value):
- return pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
+ def dumps(self, appstruct):
+ return pickle.dumps(appstruct, pickle.HIGHEST_PROTOCOL)
- def loads(self, value):
- return pickle.loads(value)
+ def loads(self, bstruct):
+ return pickle.loads(bstruct)
class _SignedSerializer(object):
- def __init__(self, hashalg, serializer):
+ def __init__(self, secret, hashalg, serializer):
+ self.secret = secret
self.digestmod = lambda: hashlib.new(hashalg)
self.digest_size = self.digestmod().digest_size
self.salt_size = 8
self.serializer = serializer
- def derive_key(self, secret, salt):
- return hmac.new(secret, salt, self.digestmod).digest()
+ def derive_key(self, salt):
+ return hmac.new(self.secret, salt, self.digestmod).digest()
- def dumps(self, appstruct, secret):
+ def dumps(self, appstruct):
salt = os.urandom(self.salt_size)
- derived_secret = self.derive_key(secret, salt)
+ derived_secret = self.derive_key(salt)
cstruct = self.serializer.dumps(appstruct)
sig = hmac.new(derived_secret, cstruct, self.digestmod).digest()
- return native_(base64.b64encode(cstruct + salt + sig))
+ return base64.b64encode(cstruct + salt + sig)
- def loads(self, bstruct, secret):
+ def loads(self, bstruct):
try:
- fstruct = base64.b64decode(bytes_(bstruct))
+ fstruct = base64.b64decode(bstruct)
except (binascii.Error, TypeError) as e:
raise ValueError('Badly formed base64 data: %s' % e)
@@ -441,10 +583,10 @@ class _SignedSerializer(object):
raise ValueError('Input is too short.')
cstruct = fstruct[:cstruct_size]
- salt = fstruct[cstruct_size:self.salt_size]
- expected_sig = fstruct[:-self.digest_size]
+ salt = fstruct[cstruct_size:cstruct_size + self.salt_size]
+ expected_sig = fstruct[-self.digest_size:]
- derived_secret = self.derive_key(secret, salt)
+ derived_secret = self.derive_key(salt)
sig = hmac.new(derived_secret, cstruct, self.digestmod).digest()
if strings_differ(sig, expected_sig):
raise ValueError('Invalid signature')
diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py
index 35e2b5c27..0e1ed78a6 100644
--- a/pyramid/tests/test_session.py
+++ b/pyramid/tests/test_session.py
@@ -1,10 +1,8 @@
+import json
import unittest
from pyramid import testing
-class TestUnencryptedCookieSession(unittest.TestCase):
- def _makeOne(self, request, **kw):
- from pyramid.session import UnencryptedCookieSessionFactoryConfig
- return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request)
+class SharedCookieSessionTests(object):
def test_ctor_no_cookie(self):
request = testing.DummyRequest()
@@ -18,36 +16,47 @@ class TestUnencryptedCookieSession(unittest.TestCase):
session = self._makeOne(request)
verifyObject(ISession, session)
- def _serialize(self, accessed, state, secret='secret'):
- from pyramid.session import signed_serialize
- return signed_serialize((accessed, accessed, state), secret)
-
def test_ctor_with_cookie_still_valid(self):
import time
request = testing.DummyRequest()
- cookieval = self._serialize(time.time(), {'state':1})
+ cookieval = self._serialize((time.time(), 0, {'state': 1}))
request.cookies['session'] = cookieval
session = self._makeOne(request)
self.assertEqual(dict(session), {'state':1})
-
+
def test_ctor_with_cookie_expired(self):
request = testing.DummyRequest()
- cookieval = self._serialize(0, {'state':1})
+ cookieval = self._serialize((0, 0, {'state': 1}))
request.cookies['session'] = cookieval
session = self._makeOne(request)
self.assertEqual(dict(session), {})
- def test_ctor_with_bad_cookie(self):
+ def test_ctor_with_bad_cookie_cannot_deserialize(self):
+ request = testing.DummyRequest()
+ request.cookies['session'] = 'abc'
+ session = self._makeOne(request)
+ self.assertEqual(dict(session), {})
+
+ def test_ctor_with_bad_cookie_not_tuple(self):
request = testing.DummyRequest()
- cookieval = 'abc'
+ cookieval = self._serialize('abc')
request.cookies['session'] = cookieval
session = self._makeOne(request)
self.assertEqual(dict(session), {})
+ def test_timeout(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, timeout=1)
+ self.assertEqual(dict(session), {})
+
def test_changed(self):
request = testing.DummyRequest()
session = self._makeOne(request)
self.assertEqual(session.changed(), None)
+ self.assertTrue(session._dirty)
def test_invalidate(self):
request = testing.DummyRequest()
@@ -56,6 +65,15 @@ class TestUnencryptedCookieSession(unittest.TestCase):
self.assertEqual(session.invalidate(), None)
self.assertFalse('a' in session)
+ def test_reissue_triggered(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request)
+ self.assertEqual(session['state'], 1)
+ self.assertTrue(session._dirty)
+
def test__set_cookie_on_exception(self):
request = testing.DummyRequest()
request.exception = True
@@ -95,16 +113,16 @@ class TestUnencryptedCookieSession(unittest.TestCase):
request = testing.DummyRequest()
request.exception = None
session = self._makeOne(request,
- cookie_name = 'abc',
- cookie_path = '/foo',
- cookie_domain = 'localhost',
- cookie_secure = True,
- cookie_httponly = True,
+ cookie_name='abc',
+ path='/foo',
+ domain='localhost',
+ secure=True,
+ httponly=True,
)
session['abc'] = 'x'
response = Response()
self.assertEqual(session._set_cookie(response), True)
- cookieval= response.headerlist[-1][1]
+ cookieval = response.headerlist[-1][1]
val, domain, path, secure, httponly = [x.strip() for x in
cookieval.split(';')]
self.assertTrue(val.startswith('abc='))
@@ -205,6 +223,110 @@ class TestUnencryptedCookieSession(unittest.TestCase):
self.assertTrue(token)
self.assertTrue('_csrft_' in session)
+ def test_no_set_cookie_with_exception(self):
+ import webob
+ request = testing.DummyRequest()
+ request.exception = True
+ session = self._makeOne(request, set_on_exception=False)
+ session['a'] = 1
+ callbacks = request.response_callbacks
+ self.assertEqual(len(callbacks), 1)
+ response = webob.Response()
+ result = callbacks[0](request, response)
+ self.assertEqual(result, None)
+ self.assertFalse('Set-Cookie' in dict(response.headerlist))
+
+ def test_set_cookie_with_exception(self):
+ import webob
+ request = testing.DummyRequest()
+ request.exception = True
+ session = self._makeOne(request)
+ session['a'] = 1
+ callbacks = request.response_callbacks
+ self.assertEqual(len(callbacks), 1)
+ response = webob.Response()
+ result = callbacks[0](request, response)
+ self.assertEqual(result, None)
+ self.assertTrue('Set-Cookie' in dict(response.headerlist))
+
+ def test_cookie_is_set(self):
+ import webob
+ request = testing.DummyRequest()
+ session = self._makeOne(request)
+ session['a'] = 1
+ callbacks = request.response_callbacks
+ self.assertEqual(len(callbacks), 1)
+ response = webob.Response()
+ result = callbacks[0](request, response)
+ self.assertEqual(result, None)
+ self.assertTrue('Set-Cookie' in dict(response.headerlist))
+
+class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase):
+ def _makeOne(self, request, **kw):
+ from pyramid.session import BaseCookieSessionFactory
+ return BaseCookieSessionFactory(DummySerializer(), **kw)(request)
+
+ def _serialize(self, value):
+ return json.dumps(value)
+
+ def test_reissue_not_triggered(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}))
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, reissue_time=1)
+ self.assertEqual(session['state'], 1)
+ self.assertFalse(session._dirty)
+
+class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase):
+ def _makeOne(self, request, **kw):
+ from pyramid.session import SignedCookieSessionFactory
+ return SignedCookieSessionFactory('secret', **kw)(request)
+
+ def _serialize(self, value):
+ from pyramid.session import _SignedSerializer, _PickleSerializer
+ serializer = _PickleSerializer()
+ serializer = _SignedSerializer('secret', 'sha512', serializer)
+ return serializer.dumps(value)
+
+ def test_reissue_not_triggered(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}))
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, reissue_time=1)
+ self.assertEqual(session['state'], 1)
+ self.assertFalse(session._dirty)
+
+ def test_custom_serializer(self):
+ import time
+ from pyramid.session import _SignedSerializer
+ serializer = DummySerializer()
+ signer = _SignedSerializer('secret', 'sha512', serializer=serializer)
+ cookieval = signer.dumps((time.time(), 0, {'state': 1}))
+ request = testing.DummyRequest()
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, serializer=serializer)
+ self.assertEqual(session['state'], 1)
+
+class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase):
+ def _makeOne(self, request, **kw):
+ from pyramid.session import UnencryptedCookieSessionFactoryConfig
+ self._rename_cookie_var(kw, 'path', 'cookie_path')
+ self._rename_cookie_var(kw, 'domain', 'cookie_domain')
+ self._rename_cookie_var(kw, 'secure', 'cookie_secure')
+ self._rename_cookie_var(kw, 'httponly', 'cookie_httponly')
+ self._rename_cookie_var(kw, 'set_on_exception', 'cookie_on_exception')
+ return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request)
+
+ def _rename_cookie_var(self, kw, src, dest):
+ if src in kw:
+ kw.setdefault(dest, kw.pop(src))
+
+ def _serialize(self, value):
+ from pyramid.session import signed_serialize
+ return signed_serialize(value, 'secret')
+
def test_serialize_option(self):
from pyramid.response import Response
secret = 'secret'
@@ -255,54 +377,48 @@ class Test_manage_accessed(unittest.TestCase):
def test_accessed_set(self):
request = testing.DummyRequest()
session = DummySessionFactory(request)
- session.accessed = None
+ session.renewed = 0
wrapper = self._makeOne(session.__class__.get)
wrapper(session, 'a')
self.assertNotEqual(session.accessed, None)
-
- def test_already_dirty(self):
+ self.assertTrue(session._dirty)
+
+ def test_accessed_without_renew(self):
+ import time
request = testing.DummyRequest()
session = DummySessionFactory(request)
- session._dirty = True
- session['a'] = 1
+ session._reissue_time = 5
+ session.renewed = time.time()
wrapper = self._makeOne(session.__class__.get)
- self.assertEqual(wrapper.__doc__, session.get.__doc__)
- result = wrapper(session, 'a')
- self.assertEqual(result, 1)
- callbacks = request.response_callbacks
- self.assertEqual(len(callbacks), 0)
+ wrapper(session, 'a')
+ self.assertNotEqual(session.accessed, None)
+ self.assertFalse(session._dirty)
- def test_with_exception(self):
- import webob
+ def test_already_dirty(self):
request = testing.DummyRequest()
- request.exception = True
session = DummySessionFactory(request)
+ session.renewed = 0
+ session._dirty = True
session['a'] = 1
wrapper = self._makeOne(session.__class__.get)
self.assertEqual(wrapper.__doc__, session.get.__doc__)
result = wrapper(session, 'a')
self.assertEqual(result, 1)
callbacks = request.response_callbacks
- self.assertEqual(len(callbacks), 1)
- response = webob.Response()
- result = callbacks[0](request, response)
- self.assertEqual(result, None)
- self.assertFalse('Set-Cookie' in dict(response.headerlist))
+ self.assertEqual(len(callbacks), 0)
- def test_cookie_is_set(self):
+class Test_manage_changed(unittest.TestCase):
+ def _makeOne(self, wrapped):
+ from pyramid.session import manage_changed
+ return manage_changed(wrapped)
+
+ def test_it(self):
request = testing.DummyRequest()
session = DummySessionFactory(request)
- session['a'] = 1
- wrapper = self._makeOne(session.__class__.get)
- self.assertEqual(wrapper.__doc__, session.get.__doc__)
- result = wrapper(session, 'a')
- self.assertEqual(result, 1)
- callbacks = request.response_callbacks
- self.assertEqual(len(callbacks), 1)
- response = DummyResponse()
- result = callbacks[0](request, response)
- self.assertEqual(result, None)
- self.assertEqual(session.response, response)
+ wrapper = self._makeOne(session.__class__.__setitem__)
+ wrapper(session, 'a', 1)
+ self.assertNotEqual(session.accessed, None)
+ self.assertTrue(session._dirty)
def serialize(data, secret):
import hmac
@@ -354,7 +470,67 @@ class Test_signed_deserialize(unittest.TestCase):
def test_it_bad_encoding(self):
serialized = 'bad' + serialize('123', 'secret')
self.assertRaises(ValueError, self._callFUT, serialized, 'secret')
-
+
+class TestSignedSerializer(unittest.TestCase):
+ def _makeOne(self, secret='secret', hashalg='sha512'):
+ from pyramid.session import _SignedSerializer
+ serializer = DummySerializer()
+ return _SignedSerializer(secret, hashalg, serializer)
+
+ def test_it_same_serializer(self):
+ serializer = self._makeOne()
+ appstruct = {'state': 1}
+ cstruct = serializer.dumps(appstruct)
+ result = serializer.loads(cstruct)
+ self.assertEqual(result, appstruct)
+
+ def test_it_different_serializers(self):
+ serializer1 = self._makeOne()
+ appstruct = {'state': 1}
+ cstruct = serializer1.dumps(appstruct)
+
+ serializer2 = self._makeOne()
+ result = serializer2.loads(cstruct)
+ self.assertEqual(result, appstruct)
+
+ def test_invalid_signature_with_different_secret(self):
+ serializer1 = self._makeOne('secret1')
+ appstruct = {'state': 1}
+ cstruct = serializer1.dumps(appstruct)
+
+ serializer2 = self._makeOne('secret2')
+ try:
+ serializer2.loads(cstruct)
+ except ValueError as exc:
+ self.assertTrue('Invalid signature' in exc.args[0])
+ else: # pragma: no cover
+ self.fail()
+
+ def test_invalid_signature_after_tamper(self):
+ import base64
+ serializer = self._makeOne()
+ appstruct = {'state': 1}
+ cstruct = serializer.dumps(appstruct)
+ actual_val = base64.b64decode(cstruct)
+ test_val = base64.b64encode(actual_val[1:])
+ try:
+ serializer.loads(test_val)
+ except ValueError as exc:
+ self.assertTrue('Invalid signature' in exc.args[0])
+ else: # pragma: no cover
+ self.fail()
+
+ def test_invalid_data_size(self):
+ import base64
+ serializer = self._makeOne()
+ num_bytes = serializer.digest_size + serializer.salt_size - 1
+ try:
+ serializer.loads(base64.b64encode(b' ' * num_bytes))
+ except ValueError as exc:
+ self.assertTrue('Input is too short' in exc.args[0])
+ else: # pragma: no cover
+ self.fail()
+
class Test_check_csrf_token(unittest.TestCase):
def _callFUT(self, *args, **kwargs):
from ..session import check_csrf_token
@@ -390,6 +566,13 @@ class Test_check_csrf_token(unittest.TestCase):
result = self._callFUT(request, 'csrf_token', raises=False)
self.assertEqual(result, False)
+class DummySerializer(object):
+ def dumps(self, value):
+ return json.dumps(value).encode('utf-8')
+
+ def loads(self, value):
+ return json.loads(value.decode('utf-8'))
+
class DummySessionFactory(dict):
_dirty = False
_cookie_name = 'session'
@@ -399,13 +582,14 @@ class DummySessionFactory(dict):
_cookie_secure = False
_cookie_httponly = False
_timeout = 1200
- _secret = 'secret'
+ _reissue_time = 0
+
def __init__(self, request):
self.request = request
dict.__init__(self, {})
- def _set_cookie(self, response):
- self.response = response
+ def changed(self):
+ self._dirty = True
class DummyResponse(object):
def __init__(self):