summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Merickel <michael@digitalartefacts.com>2013-10-19 01:00:47 -0500
committerMichael Merickel <michael@digitalartefacts.com>2013-10-19 01:00:47 -0500
commitb0b09c9bfb4924ce22627f4da94d3216829d5ec8 (patch)
treeeacbb8d14d1f438038f94ce55e201523c06577f7
parentf23f38db37e8e323512424e5715a40dc2dce9ab8 (diff)
downloadpyramid-b0b09c9bfb4924ce22627f4da94d3216829d5ec8.tar.gz
pyramid-b0b09c9bfb4924ce22627f4da94d3216829d5ec8.tar.bz2
pyramid-b0b09c9bfb4924ce22627f4da94d3216829d5ec8.zip
update session to use a static salt and separate serialize funcs
-rw-r--r--pyramid/session.py140
-rw-r--r--pyramid/tests/test_session.py154
2 files changed, 145 insertions, 149 deletions
diff --git a/pyramid/session.py b/pyramid/session.py
index 3c493a561..800400223 100644
--- a/pyramid/session.py
+++ b/pyramid/session.py
@@ -120,7 +120,8 @@ def check_csrf_token(request,
return True
def BaseCookieSessionFactory(
- serializer,
+ serialize,
+ deserialize,
cookie_name='session',
max_age=None,
path='/',
@@ -151,13 +152,13 @@ def BaseCookieSessionFactory(
Parameters:
- ``serializer``
- An object with 2 methods, ``loads`` and ``dumps``, which will be used
- to perform serialization and deserialization.
- - ``dumps(value)`` should accept a Python object and return a
- bytestring which can later be deserialized with ``loads``.
- - ``loads(value)`` should expect to receive a bytestring, generated by
- ``dumps`` and return a Python object.
+ ``serialize``
+ A callable accepting a Python object and returning a bytestring. A
+ ``ValueError`` should be raised for malformed inputs.
+
+ ``deserialize``
+ A callable accepting a bytestring and returning a Python object. A
+ ``ValueError`` should be raised for malformed inputs.
``cookie_name``
The name of the cookie used for sessioning. Default: ``'session'``.
@@ -235,8 +236,9 @@ def BaseCookieSessionFactory(
cookieval = request.cookies.get(self._cookie_name)
if cookieval is not None:
try:
- value = serializer.loads(bytes_(cookieval))
+ value = deserialize(bytes_(cookieval))
except ValueError:
+ # the cookie failed to deserialize, dropped
value = None
if value is not None:
@@ -244,8 +246,12 @@ def BaseCookieSessionFactory(
renewed, created, state = value
new = False
if now - renewed > self._timeout:
+ # expire the session because it was not renewed
+ # before the timeout threshold
state = {}
except TypeError:
+ # value failed to unpack properly or renewed was not
+ # a numeric type so we'll fail deserialization here
state = {}
self.created = created
@@ -328,7 +334,7 @@ def BaseCookieSessionFactory(
exception = getattr(self.request, 'exception', None)
if exception is not None: # dont set a cookie during exceptions
return False
- cookieval = native_(serializer.dumps(
+ cookieval = native_(serialize(
(self.accessed, self.created, dict(self))
))
if len(cookieval) > 4064:
@@ -418,15 +424,9 @@ def UnencryptedCookieSessionFactoryConfig(
is valid. Default: ``signed_deserialize`` (using pickle).
"""
- class _Serializer(object):
- def dumps(self, appstruct):
- return signed_serialize(appstruct, secret)
-
- def loads(self, bstruct):
- return signed_deserialize(bstruct, secret)
-
return BaseCookieSessionFactory(
- _Serializer(),
+ lambda v: signed_serialize(v, secret),
+ lambda v: signed_deserialize(v, secret),
cookie_name=cookie_name,
max_age=cookie_max_age,
path=cookie_path,
@@ -450,7 +450,9 @@ def SignedCookieSessionFactory(
timeout=1200,
reissue_time=0,
hashalg='sha512',
- serializer=None,
+ salt='pyramid.session.',
+ serialize=None,
+ deserialize=None,
):
"""
Configure a :term:`session factory` which will provide signed
@@ -469,12 +471,19 @@ def SignedCookieSessionFactory(
Parameters:
``secret``
- A string which is used to sign the cookie.
+ A string which is used to sign the cookie. The secret should be at
+ least as long as the block size of the selected hash algorithm. For
+ ``sha512`` this would mean a 128 bit (64 character) secret.
``hashalg``
The HMAC digest algorithm to use for signing. The algorithm must be
supported by the :mod:`hashlib` library. Default: ``'sha512'``.
+ ``salt``
+ A namespace to avoid collisions between different uses of a shared
+ secret. Reusing a secret for different parts of an application is
+ strongly discouraged.
+
``cookie_name``
The name of the cookie used for sessioning. Default: ``'session'``.
@@ -520,77 +529,64 @@ def SignedCookieSessionFactory(
If ``True``, set a session cookie even if an exception occurs
while rendering a view. Default: ``True``.
- ``serializer``
- An object with 2 methods, ``loads`` and ``dumps``, which will be used
- to perform serialization and deserialization.
- - ``dumps(value)`` should accept a Python object and return a
- bytestring which can later be deserialized with ``loads``.
- - ``loads(value)`` should expect to receive a bytestring, generated by
- ``dumps`` and return a Python object.
+ ``serialize``
+ A callable accepting a Python object and returning a bytestring. A
+ ``ValueError`` should be raised for malformed inputs.
+ Default: :func:`pickle.dumps`.
+
+ ``deserialize``
+ A callable accepting a bytestring and returning a Python object. A
+ ``ValueError`` should be raised for malformed inputs.
+ Default: :func:`pickle.loads`.
.. versionadded: 1.5a3
"""
- if serializer is None:
- serializer = _PickleSerializer()
-
- signed_serializer = _SignedSerializer(secret, hashalg, serializer)
-
- return BaseCookieSessionFactory(
- signed_serializer,
- cookie_name=cookie_name,
- max_age=max_age,
- path=path,
- domain=domain,
- secure=secure,
- httponly=httponly,
- timeout=timeout,
- reissue_time=reissue_time,
- set_on_exception=set_on_exception,
- )
-
-class _PickleSerializer(object):
- def dumps(self, appstruct):
- return pickle.dumps(appstruct, pickle.HIGHEST_PROTOCOL)
+ if serialize is None:
+ serialize = lambda v: pickle.dumps(v, pickle.HIGHEST_PROTOCOL)
- def loads(self, bstruct):
- return pickle.loads(bstruct)
+ if deserialize is None:
+ deserialize = pickle.loads
-class _SignedSerializer(object):
- def __init__(self, secret, hashalg, serializer):
- self.secret = secret
- self.digestmod = lambda: hashlib.new(hashalg)
- self.digest_size = self.digestmod().digest_size
- self.salt_size = 8
- self.serializer = serializer
+ digestmod = lambda: hashlib.new(hashalg)
+ digest_size = digestmod().digest_size
- def derive_key(self, salt):
- return hmac.new(bytes_(self.secret), salt, self.digestmod).digest()
+ salted_secret = bytes_(salt or '') + bytes_(secret)
- def dumps(self, appstruct):
- salt = os.urandom(self.salt_size)
- derived_secret = self.derive_key(salt)
- cstruct = self.serializer.dumps(appstruct)
- sig = hmac.new(derived_secret, cstruct, self.digestmod).digest()
- return base64.b64encode(cstruct + salt + sig)
+ def signed_serialize(appstruct):
+ cstruct = serialize(appstruct)
+ sig = hmac.new(salted_secret, cstruct, digestmod).digest()
+ return base64.b64encode(cstruct + sig)
- def loads(self, bstruct):
+ def signed_deserialize(bstruct):
try:
fstruct = base64.b64decode(bstruct)
except (binascii.Error, TypeError) as e:
raise ValueError('Badly formed base64 data: %s' % e)
- cstruct_size = len(fstruct) - self.salt_size - self.digest_size
+ cstruct_size = len(fstruct) - digest_size
if cstruct_size < 0:
raise ValueError('Input is too short.')
cstruct = fstruct[:cstruct_size]
- salt = fstruct[cstruct_size:cstruct_size + self.salt_size]
- expected_sig = fstruct[-self.digest_size:]
+ expected_sig = fstruct[-digest_size:]
- derived_secret = self.derive_key(salt)
- sig = hmac.new(derived_secret, cstruct, self.digestmod).digest()
+ sig = hmac.new(salted_secret, cstruct, digestmod).digest()
if strings_differ(sig, expected_sig):
raise ValueError('Invalid signature')
- return self.serializer.loads(cstruct)
+ return deserialize(cstruct)
+
+ return BaseCookieSessionFactory(
+ signed_serialize,
+ signed_deserialize,
+ cookie_name=cookie_name,
+ max_age=max_age,
+ path=path,
+ domain=domain,
+ secure=secure,
+ httponly=httponly,
+ timeout=timeout,
+ reissue_time=reissue_time,
+ set_on_exception=set_on_exception,
+ )
diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py
index 0e1ed78a6..04740b0cd 100644
--- a/pyramid/tests/test_session.py
+++ b/pyramid/tests/test_session.py
@@ -264,7 +264,8 @@ class SharedCookieSessionTests(object):
class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase):
def _makeOne(self, request, **kw):
from pyramid.session import BaseCookieSessionFactory
- return BaseCookieSessionFactory(DummySerializer(), **kw)(request)
+ return BaseCookieSessionFactory(
+ dummy_serialize, dummy_deserialize, **kw)(request)
def _serialize(self, value):
return json.dumps(value)
@@ -281,13 +282,19 @@ class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase):
class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase):
def _makeOne(self, request, **kw):
from pyramid.session import SignedCookieSessionFactory
- return SignedCookieSessionFactory('secret', **kw)(request)
+ kw.setdefault('secret', 'secret')
+ return SignedCookieSessionFactory(**kw)(request)
- def _serialize(self, value):
- from pyramid.session import _SignedSerializer, _PickleSerializer
- serializer = _PickleSerializer()
- serializer = _SignedSerializer('secret', 'sha512', serializer)
- return serializer.dumps(value)
+ def _serialize(self, value, salt='pyramid.session.', hashalg='sha512'):
+ import base64
+ import hashlib
+ import hmac
+ import pickle
+
+ digestmod = lambda: hashlib.new(hashalg)
+ cstruct = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
+ sig = hmac.new(salt + 'secret', cstruct, digestmod).digest()
+ return base64.b64encode(cstruct + sig)
def test_reissue_not_triggered(self):
import time
@@ -298,17 +305,71 @@ class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase):
self.assertEqual(session['state'], 1)
self.assertFalse(session._dirty)
+ def test_custom_salt(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}), salt='f.')
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, salt='f.')
+ self.assertEqual(session['state'], 1)
+
+ def test_salt_mismatch(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}), salt='f.')
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, salt='g.')
+ self.assertEqual(session, {})
+
+ def test_custom_hashalg(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}),
+ hashalg='sha1')
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, hashalg='sha1')
+ self.assertEqual(session['state'], 1)
+
+ def test_hashalg_mismatch(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}),
+ hashalg='sha1')
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, hashalg='sha256')
+ self.assertEqual(session, {})
+
+ def test_secret_mismatch(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize((time.time(), 0, {'state': 1}))
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request, secret='evilsecret')
+ self.assertEqual(session, {})
+
def test_custom_serializer(self):
+ import base64
+ from hashlib import sha512
+ import hmac
import time
- from pyramid.session import _SignedSerializer
- serializer = DummySerializer()
- signer = _SignedSerializer('secret', 'sha512', serializer=serializer)
- cookieval = signer.dumps((time.time(), 0, {'state': 1}))
request = testing.DummyRequest()
+ cstruct = dummy_serialize((time.time(), 0, {'state': 1}))
+ sig = hmac.new('pyramid.session.secret', cstruct, sha512).digest()
+ cookieval = base64.b64encode(cstruct + sig)
request.cookies['session'] = cookieval
- session = self._makeOne(request, serializer=serializer)
+ session = self._makeOne(request, deserialize=dummy_deserialize)
self.assertEqual(session['state'], 1)
+ def test_invalid_data_size(self):
+ from hashlib import sha512
+ import base64
+ request = testing.DummyRequest()
+ num_bytes = sha512().digest_size - 1
+ cookieval = base64.b64encode(b' ' * num_bytes)
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request)
+ self.assertEqual(session, {})
+
class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase):
def _makeOne(self, request, **kw):
from pyramid.session import UnencryptedCookieSessionFactoryConfig
@@ -471,66 +532,6 @@ class Test_signed_deserialize(unittest.TestCase):
serialized = 'bad' + serialize('123', 'secret')
self.assertRaises(ValueError, self._callFUT, serialized, 'secret')
-class TestSignedSerializer(unittest.TestCase):
- def _makeOne(self, secret='secret', hashalg='sha512'):
- from pyramid.session import _SignedSerializer
- serializer = DummySerializer()
- return _SignedSerializer(secret, hashalg, serializer)
-
- def test_it_same_serializer(self):
- serializer = self._makeOne()
- appstruct = {'state': 1}
- cstruct = serializer.dumps(appstruct)
- result = serializer.loads(cstruct)
- self.assertEqual(result, appstruct)
-
- def test_it_different_serializers(self):
- serializer1 = self._makeOne()
- appstruct = {'state': 1}
- cstruct = serializer1.dumps(appstruct)
-
- serializer2 = self._makeOne()
- result = serializer2.loads(cstruct)
- self.assertEqual(result, appstruct)
-
- def test_invalid_signature_with_different_secret(self):
- serializer1 = self._makeOne('secret1')
- appstruct = {'state': 1}
- cstruct = serializer1.dumps(appstruct)
-
- serializer2 = self._makeOne('secret2')
- try:
- serializer2.loads(cstruct)
- except ValueError as exc:
- self.assertTrue('Invalid signature' in exc.args[0])
- else: # pragma: no cover
- self.fail()
-
- def test_invalid_signature_after_tamper(self):
- import base64
- serializer = self._makeOne()
- appstruct = {'state': 1}
- cstruct = serializer.dumps(appstruct)
- actual_val = base64.b64decode(cstruct)
- test_val = base64.b64encode(actual_val[1:])
- try:
- serializer.loads(test_val)
- except ValueError as exc:
- self.assertTrue('Invalid signature' in exc.args[0])
- else: # pragma: no cover
- self.fail()
-
- def test_invalid_data_size(self):
- import base64
- serializer = self._makeOne()
- num_bytes = serializer.digest_size + serializer.salt_size - 1
- try:
- serializer.loads(base64.b64encode(b' ' * num_bytes))
- except ValueError as exc:
- self.assertTrue('Input is too short' in exc.args[0])
- else: # pragma: no cover
- self.fail()
-
class Test_check_csrf_token(unittest.TestCase):
def _callFUT(self, *args, **kwargs):
from ..session import check_csrf_token
@@ -566,12 +567,11 @@ class Test_check_csrf_token(unittest.TestCase):
result = self._callFUT(request, 'csrf_token', raises=False)
self.assertEqual(result, False)
-class DummySerializer(object):
- def dumps(self, value):
- return json.dumps(value).encode('utf-8')
+def dummy_serialize(value):
+ return json.dumps(value).encode('utf-8')
- def loads(self, value):
- return json.loads(value.decode('utf-8'))
+def dummy_deserialize(value):
+ return json.loads(value.decode('utf-8'))
class DummySessionFactory(dict):
_dirty = False