diff options
| author | Michael Merickel <michael@merickel.org> | 2013-10-05 03:48:52 -0500 |
|---|---|---|
| committer | Michael Merickel <michael@merickel.org> | 2013-10-05 03:48:52 -0500 |
| commit | 4fade654a42b88ea1f042af974f76b97d326c455 (patch) | |
| tree | 7dfafc390b39dae761cf7fe746863f56bd7a2765 | |
| parent | 3a6cbcce80b5292082b2f4e2f920d2df127e2774 (diff) | |
| download | pyramid-4fade654a42b88ea1f042af974f76b97d326c455.tar.gz pyramid-4fade654a42b88ea1f042af974f76b97d326c455.tar.bz2 pyramid-4fade654a42b88ea1f042af974f76b97d326c455.zip | |
introduce SignedCookieSessionFactory
- Break apart UnencryptedCookieSessionFactoryConfig into a
BaseCookieSessionFactory.
- Add support for reissue_time in the base. Set the unencrypted class to
use reissue_time=0 for bw-compat.
- Add SignedCookieSessionFactory which wraps the base in a serializer
that uses signing via a sha512+hmac with a secret derived using an
8-byte random salt.
| -rw-r--r-- | pyramid/session.py | 400 | ||||
| -rw-r--r-- | pyramid/tests/test_session.py | 290 |
2 files changed, 508 insertions, 182 deletions
diff --git a/pyramid/session.py b/pyramid/session.py index db86ca32b..e6635ca1b 100644 --- a/pyramid/session.py +++ b/pyramid/session.py @@ -21,20 +21,26 @@ from pyramid.interfaces import ISession from pyramid.util import strings_differ def manage_accessed(wrapped): - """ Decorator which causes a cookie to be set when a wrapped - method is called""" + """ Decorator which causes a cookie to be renewed when an accessor + method is called.""" def accessed(session, *arg, **kw): - session.accessed = int(time.time()) - if not session._dirty: - session._dirty = True - def set_cookie_callback(request, response): - session._set_cookie(response) - session.request = None # explicitly break cycle for gc - session.request.add_response_callback(set_cookie_callback) + session.accessed = now = int(time.time()) + if now - session.renewed > session._reissue_time: + session.changed() return wrapped(session, *arg, **kw) accessed.__doc__ = wrapped.__doc__ return accessed +def manage_changed(wrapped): + """ Decorator which causes a cookie to be set when a setter method + is called.""" + def changed(session, *arg, **kw): + session.accessed = int(time.time()) + session.changed() + return wrapped(session, *arg, **kw) + changed.__doc__ = wrapped.__doc__ + return changed + def signed_serialize(data, secret): """ Serialize any pickleable structure (``data``) and sign it using the ``secret`` (must be a string). Return the @@ -67,13 +73,13 @@ def signed_deserialize(serialized, secret, hmac=hmac): """ # hmac parameterized only for unit tests try: - input_sig, pickled = (serialized[:40], + input_sig, pickled = (bytes_(serialized[:40]), base64.b64decode(bytes_(serialized[40:]))) except (binascii.Error, TypeError) as e: # Badly formed data can make base64 die raise ValueError('Badly formed base64 data: %s' % e) - sig = hmac.new(bytes_(secret), pickled, sha1).hexdigest() + sig = bytes_(hmac.new(bytes_(secret), pickled, sha1).hexdigest()) # Avoid timing attacks (see # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf) @@ -113,89 +119,108 @@ def check_csrf_token(request, return False return True -def UnencryptedCookieSessionFactoryConfig( - secret, - timeout=1200, +def BaseCookieSessionFactory( + serializer, cookie_name='session', - cookie_max_age=None, - cookie_path='/', - cookie_domain=None, - cookie_secure=False, - cookie_httponly=False, - cookie_on_exception=True, - signed_serialize=signed_serialize, - signed_deserialize=signed_deserialize, + max_age=None, + path='/', + domain=None, + secure=False, + httponly=False, + timeout=1200, + reissue_time=0, + set_on_exception=True, ): """ - Configure a :term:`session factory` which will provide unencrypted - (but signed) cookie-based sessions. The return value of this - function is a :term:`session factory`, which may be provided as - the ``session_factory`` argument of a - :class:`pyramid.config.Configurator` constructor, or used + Configure a :term:`session factory` which will provide cookie-based + sessions. The return value of this function is a + :term:`session factory`, which may be provided as the ``session_factory`` + argument of a :class:`pyramid.config.Configurator` constructor, or used as the ``session_factory`` argument of the - :meth:`pyramid.config.Configurator.set_session_factory` - method. + :meth:`pyramid.config.Configurator.set_session_factory` method. The session factory returned by this function will create sessions which are limited to storing fewer than 4000 bytes of data (as the payload must fit into a single cookie). - Parameters: + .. warning: - ``secret`` - A string which is used to sign the cookie. + This class provides no protection from tampering and is only intended + to be used by framework authors to create their own cookie-based + session factories. - ``timeout`` - A number of seconds of inactivity before a session times out. + Parameters: + + ``serializer`` + An object with 2 methods, ``loads`` and ``dumps``, which will be used + to perform serialization and deserialization. + - ``dumps(value)`` should accept a Python object and return a + bytestring which can later be deserialized with ``loads``. + - ``loads(value)`` should expect to receive a bytestring, generated by + ``dumps`` and return a Python object. ``cookie_name`` - The name of the cookie used for sessioning. + The name of the cookie used for sessioning. Default: ``'session'``. - ``cookie_max_age`` + ``max_age`` The maximum age of the cookie used for sessioning (in seconds). Default: ``None`` (browser scope). - ``cookie_path`` - The path used for the session cookie. + ``path`` + The path used for the session cookie. Default: ``'/'``. - ``cookie_domain`` + ``domain`` The domain used for the session cookie. Default: ``None`` (no domain). - ``cookie_secure`` - The 'secure' flag of the session cookie. + ``secure`` + The 'secure' flag of the session cookie. Default: ``False``. - ``cookie_httponly`` - The 'httpOnly' flag of the session cookie. + ``httponly`` + Hide the cookie from Javascript by setting the 'HttpOnly' flag of the + session cookie. Default: ``False``. - ``cookie_on_exception`` + ``timeout`` + A number of seconds of inactivity before a session times out. If + ``None`` then the cookie never expires. Default: 1200. + + ``reissue_time`` + The number of seconds that must pass before the cookie is automatically + reissued as the result of a request which accesses the session. The + duration is measured as the number of seconds since the last session + cookie was issued and 'now'. If this value is ``0``, a new cookie + will be reissued on every request accesses the session. If ``None`` + then the cookie's lifetime will never be extended. + + A good rule of thumb: if you want auto-expired cookies based on + inactivity: set the ``timeout`` value to 1200 (20 mins) and set the + ``reissue_time`` value to perhaps a tenth of the ``timeout`` value + (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower + than the ``reissue_time`` value, as the ticket will never be reissued. + However, such a configuration is not explicitly prevented. + + Default: ``0``. + + ``set_on_exception`` If ``True``, set a session cookie even if an exception occurs - while rendering a view. - - ``signed_serialize`` - A callable which takes more or less arbitrary Python data structure and - a secret and returns a signed serialization in bytes. - Default: ``signed_serialize`` (using pickle). + while rendering a view. Default: ``True``. - ``signed_deserialize`` - A callable which takes a signed and serialized data structure in bytes - and a secret and returns the original data structure if the signature - is valid. Default: ``signed_deserialize`` (using pickle). + .. versionadded: 1.5a3 """ @implementer(ISession) - class SignedCookieSession(dict): + class CookieSession(dict): """ Dictionary-like session object """ # configuration parameters _cookie_name = cookie_name - _cookie_max_age = cookie_max_age - _cookie_path = cookie_path - _cookie_domain = cookie_domain - _cookie_secure = cookie_secure - _cookie_httponly = cookie_httponly - _cookie_on_exception = cookie_on_exception - _secret = secret + _cookie_max_age = max_age + _cookie_path = path + _cookie_domain = domain + _cookie_secure = secure + _cookie_httponly = httponly + _cookie_on_exception = set_on_exception _timeout = timeout + _reissue_time = reissue_time # dirty flag _dirty = False @@ -203,33 +228,40 @@ def UnencryptedCookieSessionFactoryConfig( def __init__(self, request): self.request = request now = time.time() - created = accessed = now + created = renewed = now new = True value = None state = {} cookieval = request.cookies.get(self._cookie_name) if cookieval is not None: try: - value = signed_deserialize(cookieval, self._secret) + value = serializer.loads(bytes_(cookieval)) except ValueError: value = None if value is not None: - accessed, created, state = value - new = False - if now - accessed > self._timeout: + try: + renewed, created, state = value + new = False + if now - renewed > self._timeout: + state = {} + except TypeError: state = {} self.created = created - self.accessed = accessed + self.accessed = renewed + self.renewed = renewed self.new = new dict.__init__(self, state) # ISession methods def changed(self): - """ This is intentionally a noop; the session is - serialized on every access, so unnecessary""" - pass + if not self._dirty: + self._dirty = True + def set_cookie_callback(request, response): + self._set_cookie(response) + self.request = None # explicitly break cycle for gc + self.request.add_response_callback(set_cookie_callback) def invalidate(self): self.clear() # XXX probably needs to unset cookie @@ -251,22 +283,22 @@ def UnencryptedCookieSessionFactoryConfig( has_key = manage_accessed(dict.has_key) # modifying dictionary methods - clear = manage_accessed(dict.clear) - update = manage_accessed(dict.update) - setdefault = manage_accessed(dict.setdefault) - pop = manage_accessed(dict.pop) - popitem = manage_accessed(dict.popitem) - __setitem__ = manage_accessed(dict.__setitem__) - __delitem__ = manage_accessed(dict.__delitem__) + clear = manage_changed(dict.clear) + update = manage_changed(dict.update) + setdefault = manage_changed(dict.setdefault) + pop = manage_changed(dict.pop) + popitem = manage_changed(dict.popitem) + __setitem__ = manage_changed(dict.__setitem__) + __delitem__ = manage_changed(dict.__delitem__) # flash API methods - @manage_accessed + @manage_changed def flash(self, msg, queue='', allow_duplicate=True): storage = self.setdefault('_f_' + queue, []) if allow_duplicate or (msg not in storage): storage.append(msg) - @manage_accessed + @manage_changed def pop_flash(self, queue=''): storage = self.pop('_f_' + queue, []) return storage @@ -277,7 +309,7 @@ def UnencryptedCookieSessionFactoryConfig( return storage # CSRF API methods - @manage_accessed + @manage_changed def new_csrf_token(self): token = text_(binascii.hexlify(os.urandom(20))) self['_csrft_'] = token @@ -296,9 +328,9 @@ def UnencryptedCookieSessionFactoryConfig( exception = getattr(self.request, 'exception', None) if exception is not None: # dont set a cookie during exceptions return False - cookieval = signed_serialize( - (self.accessed, self.created, dict(self)), self._secret - ) + cookieval = native_(serializer.dumps( + (self.accessed, self.created, dict(self)) + )) if len(cookieval) > 4064: raise ValueError( 'Cookie value is too long to store (%s bytes)' % @@ -315,11 +347,10 @@ def UnencryptedCookieSessionFactoryConfig( ) return True - return SignedCookieSession + return CookieSession -def SignedCookieSessionFactory( +def UnencryptedCookieSessionFactoryConfig( secret, - hashalg='sha512', timeout=1200, cookie_name='session', cookie_max_age=None, @@ -328,6 +359,97 @@ def SignedCookieSessionFactory( cookie_secure=False, cookie_httponly=False, cookie_on_exception=True, + signed_serialize=signed_serialize, + signed_deserialize=signed_deserialize, + ): + """ + Configure a :term:`session factory` which will provide unencrypted + (but signed) cookie-based sessions. The return value of this + function is a :term:`session factory`, which may be provided as + the ``session_factory`` argument of a + :class:`pyramid.config.Configurator` constructor, or used + as the ``session_factory`` argument of the + :meth:`pyramid.config.Configurator.set_session_factory` + method. + + The session factory returned by this function will create sessions + which are limited to storing fewer than 4000 bytes of data (as the + payload must fit into a single cookie). + + Parameters: + + ``secret`` + A string which is used to sign the cookie. + + ``timeout`` + A number of seconds of inactivity before a session times out. + + ``cookie_name`` + The name of the cookie used for sessioning. + + ``cookie_max_age`` + The maximum age of the cookie used for sessioning (in seconds). + Default: ``None`` (browser scope). + + ``cookie_path`` + The path used for the session cookie. + + ``cookie_domain`` + The domain used for the session cookie. Default: ``None`` (no domain). + + ``cookie_secure`` + The 'secure' flag of the session cookie. + + ``cookie_httponly`` + The 'httpOnly' flag of the session cookie. + + ``cookie_on_exception`` + If ``True``, set a session cookie even if an exception occurs + while rendering a view. + + ``signed_serialize`` + A callable which takes more or less arbitrary Python data structure and + a secret and returns a signed serialization in bytes. + Default: ``signed_serialize`` (using pickle). + + ``signed_deserialize`` + A callable which takes a signed and serialized data structure in bytes + and a secret and returns the original data structure if the signature + is valid. Default: ``signed_deserialize`` (using pickle). + """ + + class _Serializer(object): + def dumps(self, appstruct): + return signed_serialize(appstruct, secret) + + def loads(self, bstruct): + return signed_deserialize(bstruct, secret) + + return BaseCookieSessionFactory( + _Serializer(), + cookie_name=cookie_name, + max_age=cookie_max_age, + path=cookie_path, + domain=cookie_domain, + secure=cookie_secure, + httponly=cookie_httponly, + timeout=timeout, + reissue_time=0, # to keep session.accessed == session.renewed + set_on_exception=cookie_on_exception, + ) + +def SignedCookieSessionFactory( + secret, + cookie_name='session', + max_age=None, + path='/', + domain=None, + secure=False, + httponly=False, + set_on_exception=True, + timeout=1200, + reissue_time=0, + hashalg='sha512', serializer=None, ): """ @@ -353,86 +475,106 @@ def SignedCookieSessionFactory( The HMAC digest algorithm to use for signing. The algorithm must be supported by the :mod:`hashlib` library. Default: ``'sha512'``. - ``timeout`` - A number of seconds of inactivity before a session times out. - Default: 1200. - ``cookie_name`` The name of the cookie used for sessioning. Default: ``'session'``. - ``cookie_max_age`` + ``max_age`` The maximum age of the cookie used for sessioning (in seconds). Default: ``None`` (browser scope). - ``cookie_path`` + ``path`` The path used for the session cookie. Default: ``'/'``. - ``cookie_domain`` + ``domain`` The domain used for the session cookie. Default: ``None`` (no domain). - ``cookie_secure`` + ``secure`` The 'secure' flag of the session cookie. Default: ``False``. - ``cookie_httponly`` - The 'httpOnly' flag of the session cookie. Default: ``False``. + ``httponly`` + Hide the cookie from Javascript by setting the 'HttpOnly' flag of the + session cookie. Default: ``False``. - ``cookie_on_exception`` + ``timeout`` + A number of seconds of inactivity before a session times out. If + ``None`` then the cookie never expires. Default: 1200. + + ``reissue_time`` + The number of seconds that must pass before the cookie is automatically + reissued as the result of a request which accesses the session. The + duration is measured as the number of seconds since the last session + cookie was issued and 'now'. If this value is ``0``, a new cookie + will be reissued on every request accesses the session. If ``None`` + then the cookie's lifetime will never be extended. + + A good rule of thumb: if you want auto-expired cookies based on + inactivity: set the ``timeout`` value to 1200 (20 mins) and set the + ``reissue_time`` value to perhaps a tenth of the ``timeout`` value + (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower + than the ``reissue_time`` value, as the ticket will never be reissued. + However, such a configuration is not explicitly prevented. + + Default: ``0``. + + ``set_on_exception`` If ``True``, set a session cookie even if an exception occurs while rendering a view. Default: ``True``. ``serializer`` - An object which can convert data between Python objects and bytestrings. - - ``loads`` should accept a bytestring and return a Python object. - - ``dumps`` should accept a Python object and return a bytestring. - The default implementation uses Python's pickle module. + An object with 2 methods, ``loads`` and ``dumps``, which will be used + to perform serialization and deserialization. The value generated from + serialization will be cryptographically signed to prevent tampering. + A ``ValueError`` should be raised if deserialization fails. + + .. versionadded: 1.5a3 """ if serializer is None: serializer = _PickleSerializer() - signer = _SignedSerializer(hashalg, serializer) + signed_serializer = _SignedSerializer(secret, hashalg, serializer) - return UnencryptedCookieSessionFactoryConfig( - secret, - timeout=timeout, + return BaseCookieSessionFactory( + signed_serializer, cookie_name=cookie_name, - cookie_max_age=cookie_max_age, - cookie_path=cookie_path, - cookie_domain=cookie_domain, - cookie_secure=cookie_secure, - cookie_httponly=cookie_httponly, - cookie_on_exception=cookie_on_exception, - signed_serialize=signer.dumps, - signed_deserialize=signer.loads, + max_age=max_age, + path=path, + domain=domain, + secure=secure, + httponly=httponly, + timeout=timeout, + reissue_time=reissue_time, + set_on_exception=set_on_exception, ) class _PickleSerializer(object): - def dumps(self, value): - return pickle.dumps(value, pickle.HIGHEST_PROTOCOL) + def dumps(self, appstruct): + return pickle.dumps(appstruct, pickle.HIGHEST_PROTOCOL) - def loads(self, value): - return pickle.loads(value) + def loads(self, bstruct): + return pickle.loads(bstruct) class _SignedSerializer(object): - def __init__(self, hashalg, serializer): + def __init__(self, secret, hashalg, serializer): + self.secret = secret self.digestmod = lambda: hashlib.new(hashalg) self.digest_size = self.digestmod().digest_size self.salt_size = 8 self.serializer = serializer - def derive_key(self, secret, salt): - return hmac.new(secret, salt, self.digestmod).digest() + def derive_key(self, salt): + return hmac.new(self.secret, salt, self.digestmod).digest() - def dumps(self, appstruct, secret): + def dumps(self, appstruct): salt = os.urandom(self.salt_size) - derived_secret = self.derive_key(secret, salt) + derived_secret = self.derive_key(salt) cstruct = self.serializer.dumps(appstruct) sig = hmac.new(derived_secret, cstruct, self.digestmod).digest() - return native_(base64.b64encode(cstruct + salt + sig)) + return base64.b64encode(cstruct + salt + sig) - def loads(self, bstruct, secret): + def loads(self, bstruct): try: - fstruct = base64.b64decode(bytes_(bstruct)) + fstruct = base64.b64decode(bstruct) except (binascii.Error, TypeError) as e: raise ValueError('Badly formed base64 data: %s' % e) @@ -441,10 +583,10 @@ class _SignedSerializer(object): raise ValueError('Input is too short.') cstruct = fstruct[:cstruct_size] - salt = fstruct[cstruct_size:self.salt_size] - expected_sig = fstruct[:-self.digest_size] + salt = fstruct[cstruct_size:cstruct_size + self.salt_size] + expected_sig = fstruct[-self.digest_size:] - derived_secret = self.derive_key(secret, salt) + derived_secret = self.derive_key(salt) sig = hmac.new(derived_secret, cstruct, self.digestmod).digest() if strings_differ(sig, expected_sig): raise ValueError('Invalid signature') diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py index 35e2b5c27..0e1ed78a6 100644 --- a/pyramid/tests/test_session.py +++ b/pyramid/tests/test_session.py @@ -1,10 +1,8 @@ +import json import unittest from pyramid import testing -class TestUnencryptedCookieSession(unittest.TestCase): - def _makeOne(self, request, **kw): - from pyramid.session import UnencryptedCookieSessionFactoryConfig - return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request) +class SharedCookieSessionTests(object): def test_ctor_no_cookie(self): request = testing.DummyRequest() @@ -18,36 +16,47 @@ class TestUnencryptedCookieSession(unittest.TestCase): session = self._makeOne(request) verifyObject(ISession, session) - def _serialize(self, accessed, state, secret='secret'): - from pyramid.session import signed_serialize - return signed_serialize((accessed, accessed, state), secret) - def test_ctor_with_cookie_still_valid(self): import time request = testing.DummyRequest() - cookieval = self._serialize(time.time(), {'state':1}) + cookieval = self._serialize((time.time(), 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(dict(session), {'state':1}) - + def test_ctor_with_cookie_expired(self): request = testing.DummyRequest() - cookieval = self._serialize(0, {'state':1}) + cookieval = self._serialize((0, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(dict(session), {}) - def test_ctor_with_bad_cookie(self): + def test_ctor_with_bad_cookie_cannot_deserialize(self): + request = testing.DummyRequest() + request.cookies['session'] = 'abc' + session = self._makeOne(request) + self.assertEqual(dict(session), {}) + + def test_ctor_with_bad_cookie_not_tuple(self): request = testing.DummyRequest() - cookieval = 'abc' + cookieval = self._serialize('abc') request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(dict(session), {}) + def test_timeout(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time() - 5, 0, {'state': 1})) + request.cookies['session'] = cookieval + session = self._makeOne(request, timeout=1) + self.assertEqual(dict(session), {}) + def test_changed(self): request = testing.DummyRequest() session = self._makeOne(request) self.assertEqual(session.changed(), None) + self.assertTrue(session._dirty) def test_invalidate(self): request = testing.DummyRequest() @@ -56,6 +65,15 @@ class TestUnencryptedCookieSession(unittest.TestCase): self.assertEqual(session.invalidate(), None) self.assertFalse('a' in session) + def test_reissue_triggered(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) + request.cookies['session'] = cookieval + session = self._makeOne(request) + self.assertEqual(session['state'], 1) + self.assertTrue(session._dirty) + def test__set_cookie_on_exception(self): request = testing.DummyRequest() request.exception = True @@ -95,16 +113,16 @@ class TestUnencryptedCookieSession(unittest.TestCase): request = testing.DummyRequest() request.exception = None session = self._makeOne(request, - cookie_name = 'abc', - cookie_path = '/foo', - cookie_domain = 'localhost', - cookie_secure = True, - cookie_httponly = True, + cookie_name='abc', + path='/foo', + domain='localhost', + secure=True, + httponly=True, ) session['abc'] = 'x' response = Response() self.assertEqual(session._set_cookie(response), True) - cookieval= response.headerlist[-1][1] + cookieval = response.headerlist[-1][1] val, domain, path, secure, httponly = [x.strip() for x in cookieval.split(';')] self.assertTrue(val.startswith('abc=')) @@ -205,6 +223,110 @@ class TestUnencryptedCookieSession(unittest.TestCase): self.assertTrue(token) self.assertTrue('_csrft_' in session) + def test_no_set_cookie_with_exception(self): + import webob + request = testing.DummyRequest() + request.exception = True + session = self._makeOne(request, set_on_exception=False) + session['a'] = 1 + callbacks = request.response_callbacks + self.assertEqual(len(callbacks), 1) + response = webob.Response() + result = callbacks[0](request, response) + self.assertEqual(result, None) + self.assertFalse('Set-Cookie' in dict(response.headerlist)) + + def test_set_cookie_with_exception(self): + import webob + request = testing.DummyRequest() + request.exception = True + session = self._makeOne(request) + session['a'] = 1 + callbacks = request.response_callbacks + self.assertEqual(len(callbacks), 1) + response = webob.Response() + result = callbacks[0](request, response) + self.assertEqual(result, None) + self.assertTrue('Set-Cookie' in dict(response.headerlist)) + + def test_cookie_is_set(self): + import webob + request = testing.DummyRequest() + session = self._makeOne(request) + session['a'] = 1 + callbacks = request.response_callbacks + self.assertEqual(len(callbacks), 1) + response = webob.Response() + result = callbacks[0](request, response) + self.assertEqual(result, None) + self.assertTrue('Set-Cookie' in dict(response.headerlist)) + +class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase): + def _makeOne(self, request, **kw): + from pyramid.session import BaseCookieSessionFactory + return BaseCookieSessionFactory(DummySerializer(), **kw)(request) + + def _serialize(self, value): + return json.dumps(value) + + def test_reissue_not_triggered(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1})) + request.cookies['session'] = cookieval + session = self._makeOne(request, reissue_time=1) + self.assertEqual(session['state'], 1) + self.assertFalse(session._dirty) + +class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase): + def _makeOne(self, request, **kw): + from pyramid.session import SignedCookieSessionFactory + return SignedCookieSessionFactory('secret', **kw)(request) + + def _serialize(self, value): + from pyramid.session import _SignedSerializer, _PickleSerializer + serializer = _PickleSerializer() + serializer = _SignedSerializer('secret', 'sha512', serializer) + return serializer.dumps(value) + + def test_reissue_not_triggered(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1})) + request.cookies['session'] = cookieval + session = self._makeOne(request, reissue_time=1) + self.assertEqual(session['state'], 1) + self.assertFalse(session._dirty) + + def test_custom_serializer(self): + import time + from pyramid.session import _SignedSerializer + serializer = DummySerializer() + signer = _SignedSerializer('secret', 'sha512', serializer=serializer) + cookieval = signer.dumps((time.time(), 0, {'state': 1})) + request = testing.DummyRequest() + request.cookies['session'] = cookieval + session = self._makeOne(request, serializer=serializer) + self.assertEqual(session['state'], 1) + +class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase): + def _makeOne(self, request, **kw): + from pyramid.session import UnencryptedCookieSessionFactoryConfig + self._rename_cookie_var(kw, 'path', 'cookie_path') + self._rename_cookie_var(kw, 'domain', 'cookie_domain') + self._rename_cookie_var(kw, 'secure', 'cookie_secure') + self._rename_cookie_var(kw, 'httponly', 'cookie_httponly') + self._rename_cookie_var(kw, 'set_on_exception', 'cookie_on_exception') + return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request) + + def _rename_cookie_var(self, kw, src, dest): + if src in kw: + kw.setdefault(dest, kw.pop(src)) + + def _serialize(self, value): + from pyramid.session import signed_serialize + return signed_serialize(value, 'secret') + def test_serialize_option(self): from pyramid.response import Response secret = 'secret' @@ -255,54 +377,48 @@ class Test_manage_accessed(unittest.TestCase): def test_accessed_set(self): request = testing.DummyRequest() session = DummySessionFactory(request) - session.accessed = None + session.renewed = 0 wrapper = self._makeOne(session.__class__.get) wrapper(session, 'a') self.assertNotEqual(session.accessed, None) - - def test_already_dirty(self): + self.assertTrue(session._dirty) + + def test_accessed_without_renew(self): + import time request = testing.DummyRequest() session = DummySessionFactory(request) - session._dirty = True - session['a'] = 1 + session._reissue_time = 5 + session.renewed = time.time() wrapper = self._makeOne(session.__class__.get) - self.assertEqual(wrapper.__doc__, session.get.__doc__) - result = wrapper(session, 'a') - self.assertEqual(result, 1) - callbacks = request.response_callbacks - self.assertEqual(len(callbacks), 0) + wrapper(session, 'a') + self.assertNotEqual(session.accessed, None) + self.assertFalse(session._dirty) - def test_with_exception(self): - import webob + def test_already_dirty(self): request = testing.DummyRequest() - request.exception = True session = DummySessionFactory(request) + session.renewed = 0 + session._dirty = True session['a'] = 1 wrapper = self._makeOne(session.__class__.get) self.assertEqual(wrapper.__doc__, session.get.__doc__) result = wrapper(session, 'a') self.assertEqual(result, 1) callbacks = request.response_callbacks - self.assertEqual(len(callbacks), 1) - response = webob.Response() - result = callbacks[0](request, response) - self.assertEqual(result, None) - self.assertFalse('Set-Cookie' in dict(response.headerlist)) + self.assertEqual(len(callbacks), 0) - def test_cookie_is_set(self): +class Test_manage_changed(unittest.TestCase): + def _makeOne(self, wrapped): + from pyramid.session import manage_changed + return manage_changed(wrapped) + + def test_it(self): request = testing.DummyRequest() session = DummySessionFactory(request) - session['a'] = 1 - wrapper = self._makeOne(session.__class__.get) - self.assertEqual(wrapper.__doc__, session.get.__doc__) - result = wrapper(session, 'a') - self.assertEqual(result, 1) - callbacks = request.response_callbacks - self.assertEqual(len(callbacks), 1) - response = DummyResponse() - result = callbacks[0](request, response) - self.assertEqual(result, None) - self.assertEqual(session.response, response) + wrapper = self._makeOne(session.__class__.__setitem__) + wrapper(session, 'a', 1) + self.assertNotEqual(session.accessed, None) + self.assertTrue(session._dirty) def serialize(data, secret): import hmac @@ -354,7 +470,67 @@ class Test_signed_deserialize(unittest.TestCase): def test_it_bad_encoding(self): serialized = 'bad' + serialize('123', 'secret') self.assertRaises(ValueError, self._callFUT, serialized, 'secret') - + +class TestSignedSerializer(unittest.TestCase): + def _makeOne(self, secret='secret', hashalg='sha512'): + from pyramid.session import _SignedSerializer + serializer = DummySerializer() + return _SignedSerializer(secret, hashalg, serializer) + + def test_it_same_serializer(self): + serializer = self._makeOne() + appstruct = {'state': 1} + cstruct = serializer.dumps(appstruct) + result = serializer.loads(cstruct) + self.assertEqual(result, appstruct) + + def test_it_different_serializers(self): + serializer1 = self._makeOne() + appstruct = {'state': 1} + cstruct = serializer1.dumps(appstruct) + + serializer2 = self._makeOne() + result = serializer2.loads(cstruct) + self.assertEqual(result, appstruct) + + def test_invalid_signature_with_different_secret(self): + serializer1 = self._makeOne('secret1') + appstruct = {'state': 1} + cstruct = serializer1.dumps(appstruct) + + serializer2 = self._makeOne('secret2') + try: + serializer2.loads(cstruct) + except ValueError as exc: + self.assertTrue('Invalid signature' in exc.args[0]) + else: # pragma: no cover + self.fail() + + def test_invalid_signature_after_tamper(self): + import base64 + serializer = self._makeOne() + appstruct = {'state': 1} + cstruct = serializer.dumps(appstruct) + actual_val = base64.b64decode(cstruct) + test_val = base64.b64encode(actual_val[1:]) + try: + serializer.loads(test_val) + except ValueError as exc: + self.assertTrue('Invalid signature' in exc.args[0]) + else: # pragma: no cover + self.fail() + + def test_invalid_data_size(self): + import base64 + serializer = self._makeOne() + num_bytes = serializer.digest_size + serializer.salt_size - 1 + try: + serializer.loads(base64.b64encode(b' ' * num_bytes)) + except ValueError as exc: + self.assertTrue('Input is too short' in exc.args[0]) + else: # pragma: no cover + self.fail() + class Test_check_csrf_token(unittest.TestCase): def _callFUT(self, *args, **kwargs): from ..session import check_csrf_token @@ -390,6 +566,13 @@ class Test_check_csrf_token(unittest.TestCase): result = self._callFUT(request, 'csrf_token', raises=False) self.assertEqual(result, False) +class DummySerializer(object): + def dumps(self, value): + return json.dumps(value).encode('utf-8') + + def loads(self, value): + return json.loads(value.decode('utf-8')) + class DummySessionFactory(dict): _dirty = False _cookie_name = 'session' @@ -399,13 +582,14 @@ class DummySessionFactory(dict): _cookie_secure = False _cookie_httponly = False _timeout = 1200 - _secret = 'secret' + _reissue_time = 0 + def __init__(self, request): self.request = request dict.__init__(self, {}) - def _set_cookie(self, response): - self.response = response + def changed(self): + self._dirty = True class DummyResponse(object): def __init__(self): |
