From b0b09c9bfb4924ce22627f4da94d3216829d5ec8 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Sat, 19 Oct 2013 01:00:47 -0500 Subject: update session to use a static salt and separate serialize funcs --- pyramid/session.py | 140 +++++++++++++++++++------------------- pyramid/tests/test_session.py | 154 +++++++++++++++++++++--------------------- 2 files changed, 145 insertions(+), 149 deletions(-) diff --git a/pyramid/session.py b/pyramid/session.py index 3c493a561..800400223 100644 --- a/pyramid/session.py +++ b/pyramid/session.py @@ -120,7 +120,8 @@ def check_csrf_token(request, return True def BaseCookieSessionFactory( - serializer, + serialize, + deserialize, cookie_name='session', max_age=None, path='/', @@ -151,13 +152,13 @@ def BaseCookieSessionFactory( Parameters: - ``serializer`` - An object with 2 methods, ``loads`` and ``dumps``, which will be used - to perform serialization and deserialization. - - ``dumps(value)`` should accept a Python object and return a - bytestring which can later be deserialized with ``loads``. - - ``loads(value)`` should expect to receive a bytestring, generated by - ``dumps`` and return a Python object. + ``serialize`` + A callable accepting a Python object and returning a bytestring. A + ``ValueError`` should be raised for malformed inputs. + + ``deserialize`` + A callable accepting a bytestring and returning a Python object. A + ``ValueError`` should be raised for malformed inputs. ``cookie_name`` The name of the cookie used for sessioning. Default: ``'session'``. @@ -235,8 +236,9 @@ def BaseCookieSessionFactory( cookieval = request.cookies.get(self._cookie_name) if cookieval is not None: try: - value = serializer.loads(bytes_(cookieval)) + value = deserialize(bytes_(cookieval)) except ValueError: + # the cookie failed to deserialize, dropped value = None if value is not None: @@ -244,8 +246,12 @@ def BaseCookieSessionFactory( renewed, created, state = value new = False if now - renewed > self._timeout: + # expire the session because it was not renewed + # before the timeout threshold state = {} except TypeError: + # value failed to unpack properly or renewed was not + # a numeric type so we'll fail deserialization here state = {} self.created = created @@ -328,7 +334,7 @@ def BaseCookieSessionFactory( exception = getattr(self.request, 'exception', None) if exception is not None: # dont set a cookie during exceptions return False - cookieval = native_(serializer.dumps( + cookieval = native_(serialize( (self.accessed, self.created, dict(self)) )) if len(cookieval) > 4064: @@ -418,15 +424,9 @@ def UnencryptedCookieSessionFactoryConfig( is valid. Default: ``signed_deserialize`` (using pickle). """ - class _Serializer(object): - def dumps(self, appstruct): - return signed_serialize(appstruct, secret) - - def loads(self, bstruct): - return signed_deserialize(bstruct, secret) - return BaseCookieSessionFactory( - _Serializer(), + lambda v: signed_serialize(v, secret), + lambda v: signed_deserialize(v, secret), cookie_name=cookie_name, max_age=cookie_max_age, path=cookie_path, @@ -450,7 +450,9 @@ def SignedCookieSessionFactory( timeout=1200, reissue_time=0, hashalg='sha512', - serializer=None, + salt='pyramid.session.', + serialize=None, + deserialize=None, ): """ Configure a :term:`session factory` which will provide signed @@ -469,12 +471,19 @@ def SignedCookieSessionFactory( Parameters: ``secret`` - A string which is used to sign the cookie. + A string which is used to sign the cookie. The secret should be at + least as long as the block size of the selected hash algorithm. For + ``sha512`` this would mean a 128 bit (64 character) secret. ``hashalg`` The HMAC digest algorithm to use for signing. The algorithm must be supported by the :mod:`hashlib` library. Default: ``'sha512'``. + ``salt`` + A namespace to avoid collisions between different uses of a shared + secret. Reusing a secret for different parts of an application is + strongly discouraged. + ``cookie_name`` The name of the cookie used for sessioning. Default: ``'session'``. @@ -520,77 +529,64 @@ def SignedCookieSessionFactory( If ``True``, set a session cookie even if an exception occurs while rendering a view. Default: ``True``. - ``serializer`` - An object with 2 methods, ``loads`` and ``dumps``, which will be used - to perform serialization and deserialization. - - ``dumps(value)`` should accept a Python object and return a - bytestring which can later be deserialized with ``loads``. - - ``loads(value)`` should expect to receive a bytestring, generated by - ``dumps`` and return a Python object. + ``serialize`` + A callable accepting a Python object and returning a bytestring. A + ``ValueError`` should be raised for malformed inputs. + Default: :func:`pickle.dumps`. + + ``deserialize`` + A callable accepting a bytestring and returning a Python object. A + ``ValueError`` should be raised for malformed inputs. + Default: :func:`pickle.loads`. .. versionadded: 1.5a3 """ - if serializer is None: - serializer = _PickleSerializer() - - signed_serializer = _SignedSerializer(secret, hashalg, serializer) - - return BaseCookieSessionFactory( - signed_serializer, - cookie_name=cookie_name, - max_age=max_age, - path=path, - domain=domain, - secure=secure, - httponly=httponly, - timeout=timeout, - reissue_time=reissue_time, - set_on_exception=set_on_exception, - ) - -class _PickleSerializer(object): - def dumps(self, appstruct): - return pickle.dumps(appstruct, pickle.HIGHEST_PROTOCOL) + if serialize is None: + serialize = lambda v: pickle.dumps(v, pickle.HIGHEST_PROTOCOL) - def loads(self, bstruct): - return pickle.loads(bstruct) + if deserialize is None: + deserialize = pickle.loads -class _SignedSerializer(object): - def __init__(self, secret, hashalg, serializer): - self.secret = secret - self.digestmod = lambda: hashlib.new(hashalg) - self.digest_size = self.digestmod().digest_size - self.salt_size = 8 - self.serializer = serializer + digestmod = lambda: hashlib.new(hashalg) + digest_size = digestmod().digest_size - def derive_key(self, salt): - return hmac.new(bytes_(self.secret), salt, self.digestmod).digest() + salted_secret = bytes_(salt or '') + bytes_(secret) - def dumps(self, appstruct): - salt = os.urandom(self.salt_size) - derived_secret = self.derive_key(salt) - cstruct = self.serializer.dumps(appstruct) - sig = hmac.new(derived_secret, cstruct, self.digestmod).digest() - return base64.b64encode(cstruct + salt + sig) + def signed_serialize(appstruct): + cstruct = serialize(appstruct) + sig = hmac.new(salted_secret, cstruct, digestmod).digest() + return base64.b64encode(cstruct + sig) - def loads(self, bstruct): + def signed_deserialize(bstruct): try: fstruct = base64.b64decode(bstruct) except (binascii.Error, TypeError) as e: raise ValueError('Badly formed base64 data: %s' % e) - cstruct_size = len(fstruct) - self.salt_size - self.digest_size + cstruct_size = len(fstruct) - digest_size if cstruct_size < 0: raise ValueError('Input is too short.') cstruct = fstruct[:cstruct_size] - salt = fstruct[cstruct_size:cstruct_size + self.salt_size] - expected_sig = fstruct[-self.digest_size:] + expected_sig = fstruct[-digest_size:] - derived_secret = self.derive_key(salt) - sig = hmac.new(derived_secret, cstruct, self.digestmod).digest() + sig = hmac.new(salted_secret, cstruct, digestmod).digest() if strings_differ(sig, expected_sig): raise ValueError('Invalid signature') - return self.serializer.loads(cstruct) + return deserialize(cstruct) + + return BaseCookieSessionFactory( + signed_serialize, + signed_deserialize, + cookie_name=cookie_name, + max_age=max_age, + path=path, + domain=domain, + secure=secure, + httponly=httponly, + timeout=timeout, + reissue_time=reissue_time, + set_on_exception=set_on_exception, + ) diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py index 0e1ed78a6..04740b0cd 100644 --- a/pyramid/tests/test_session.py +++ b/pyramid/tests/test_session.py @@ -264,7 +264,8 @@ class SharedCookieSessionTests(object): class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase): def _makeOne(self, request, **kw): from pyramid.session import BaseCookieSessionFactory - return BaseCookieSessionFactory(DummySerializer(), **kw)(request) + return BaseCookieSessionFactory( + dummy_serialize, dummy_deserialize, **kw)(request) def _serialize(self, value): return json.dumps(value) @@ -281,13 +282,19 @@ class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase): class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase): def _makeOne(self, request, **kw): from pyramid.session import SignedCookieSessionFactory - return SignedCookieSessionFactory('secret', **kw)(request) + kw.setdefault('secret', 'secret') + return SignedCookieSessionFactory(**kw)(request) - def _serialize(self, value): - from pyramid.session import _SignedSerializer, _PickleSerializer - serializer = _PickleSerializer() - serializer = _SignedSerializer('secret', 'sha512', serializer) - return serializer.dumps(value) + def _serialize(self, value, salt='pyramid.session.', hashalg='sha512'): + import base64 + import hashlib + import hmac + import pickle + + digestmod = lambda: hashlib.new(hashalg) + cstruct = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) + sig = hmac.new(salt + 'secret', cstruct, digestmod).digest() + return base64.b64encode(cstruct + sig) def test_reissue_not_triggered(self): import time @@ -298,17 +305,71 @@ class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase): self.assertEqual(session['state'], 1) self.assertFalse(session._dirty) + def test_custom_salt(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1}), salt='f.') + request.cookies['session'] = cookieval + session = self._makeOne(request, salt='f.') + self.assertEqual(session['state'], 1) + + def test_salt_mismatch(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1}), salt='f.') + request.cookies['session'] = cookieval + session = self._makeOne(request, salt='g.') + self.assertEqual(session, {}) + + def test_custom_hashalg(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1}), + hashalg='sha1') + request.cookies['session'] = cookieval + session = self._makeOne(request, hashalg='sha1') + self.assertEqual(session['state'], 1) + + def test_hashalg_mismatch(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1}), + hashalg='sha1') + request.cookies['session'] = cookieval + session = self._makeOne(request, hashalg='sha256') + self.assertEqual(session, {}) + + def test_secret_mismatch(self): + import time + request = testing.DummyRequest() + cookieval = self._serialize((time.time(), 0, {'state': 1})) + request.cookies['session'] = cookieval + session = self._makeOne(request, secret='evilsecret') + self.assertEqual(session, {}) + def test_custom_serializer(self): + import base64 + from hashlib import sha512 + import hmac import time - from pyramid.session import _SignedSerializer - serializer = DummySerializer() - signer = _SignedSerializer('secret', 'sha512', serializer=serializer) - cookieval = signer.dumps((time.time(), 0, {'state': 1})) request = testing.DummyRequest() + cstruct = dummy_serialize((time.time(), 0, {'state': 1})) + sig = hmac.new('pyramid.session.secret', cstruct, sha512).digest() + cookieval = base64.b64encode(cstruct + sig) request.cookies['session'] = cookieval - session = self._makeOne(request, serializer=serializer) + session = self._makeOne(request, deserialize=dummy_deserialize) self.assertEqual(session['state'], 1) + def test_invalid_data_size(self): + from hashlib import sha512 + import base64 + request = testing.DummyRequest() + num_bytes = sha512().digest_size - 1 + cookieval = base64.b64encode(b' ' * num_bytes) + request.cookies['session'] = cookieval + session = self._makeOne(request) + self.assertEqual(session, {}) + class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase): def _makeOne(self, request, **kw): from pyramid.session import UnencryptedCookieSessionFactoryConfig @@ -471,66 +532,6 @@ class Test_signed_deserialize(unittest.TestCase): serialized = 'bad' + serialize('123', 'secret') self.assertRaises(ValueError, self._callFUT, serialized, 'secret') -class TestSignedSerializer(unittest.TestCase): - def _makeOne(self, secret='secret', hashalg='sha512'): - from pyramid.session import _SignedSerializer - serializer = DummySerializer() - return _SignedSerializer(secret, hashalg, serializer) - - def test_it_same_serializer(self): - serializer = self._makeOne() - appstruct = {'state': 1} - cstruct = serializer.dumps(appstruct) - result = serializer.loads(cstruct) - self.assertEqual(result, appstruct) - - def test_it_different_serializers(self): - serializer1 = self._makeOne() - appstruct = {'state': 1} - cstruct = serializer1.dumps(appstruct) - - serializer2 = self._makeOne() - result = serializer2.loads(cstruct) - self.assertEqual(result, appstruct) - - def test_invalid_signature_with_different_secret(self): - serializer1 = self._makeOne('secret1') - appstruct = {'state': 1} - cstruct = serializer1.dumps(appstruct) - - serializer2 = self._makeOne('secret2') - try: - serializer2.loads(cstruct) - except ValueError as exc: - self.assertTrue('Invalid signature' in exc.args[0]) - else: # pragma: no cover - self.fail() - - def test_invalid_signature_after_tamper(self): - import base64 - serializer = self._makeOne() - appstruct = {'state': 1} - cstruct = serializer.dumps(appstruct) - actual_val = base64.b64decode(cstruct) - test_val = base64.b64encode(actual_val[1:]) - try: - serializer.loads(test_val) - except ValueError as exc: - self.assertTrue('Invalid signature' in exc.args[0]) - else: # pragma: no cover - self.fail() - - def test_invalid_data_size(self): - import base64 - serializer = self._makeOne() - num_bytes = serializer.digest_size + serializer.salt_size - 1 - try: - serializer.loads(base64.b64encode(b' ' * num_bytes)) - except ValueError as exc: - self.assertTrue('Input is too short' in exc.args[0]) - else: # pragma: no cover - self.fail() - class Test_check_csrf_token(unittest.TestCase): def _callFUT(self, *args, **kwargs): from ..session import check_csrf_token @@ -566,12 +567,11 @@ class Test_check_csrf_token(unittest.TestCase): result = self._callFUT(request, 'csrf_token', raises=False) self.assertEqual(result, False) -class DummySerializer(object): - def dumps(self, value): - return json.dumps(value).encode('utf-8') +def dummy_serialize(value): + return json.dumps(value).encode('utf-8') - def loads(self, value): - return json.loads(value.decode('utf-8')) +def dummy_deserialize(value): + return json.loads(value.decode('utf-8')) class DummySessionFactory(dict): _dirty = False -- cgit v1.2.3