summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris McDonough <chrism@plope.com>2010-10-28 21:32:43 -0400
committerChris McDonough <chrism@plope.com>2010-10-28 21:32:43 -0400
commit96820957ac659d51d79b1b30c296f98c086ee878 (patch)
tree896389a4b9c26ec8c70e418c6cb578ab1cd1543a
parenta62cc2264c2dda6a0588fddbc5712afea9d89837 (diff)
downloadpyramid-96820957ac659d51d79b1b30c296f98c086ee878.tar.gz
pyramid-96820957ac659d51d79b1b30c296f98c086ee878.tar.bz2
pyramid-96820957ac659d51d79b1b30c296f98c086ee878.zip
add sessioning interfaces, configuration API, and a sample implementation that uses cookies
-rw-r--r--CHANGES.txt7
-rw-r--r--docs/api/configuration.rst4
-rw-r--r--pyramid/configuration.py31
-rw-r--r--pyramid/interfaces.py117
-rw-r--r--pyramid/request.py17
-rw-r--r--pyramid/session.py194
-rw-r--r--pyramid/tests/test_configuration.py12
-rw-r--r--pyramid/tests/test_request.py24
-rw-r--r--pyramid/tests/test_session.py252
9 files changed, 654 insertions, 4 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 905bd27f1..1e4d73c6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,13 @@ Features (delta from BFG 1.3.X)
a Pylons-style "view handler" (such a thing used to be called a
"controller" in Pylons 1.0).
+- New argument to configurator: ``session_factory``.
+
+- New method on configurator: ``set_session_factory``
+
+- Using ``request.session`` now returns a (dictionary-like) session
+ object if a session factory has been configured.
+
Documentation (delta from BFG 1.3)
-----------------------------------
diff --git a/docs/api/configuration.rst b/docs/api/configuration.rst
index 5215bfb3c..6d5c9f16b 100644
--- a/docs/api/configuration.rst
+++ b/docs/api/configuration.rst
@@ -5,7 +5,7 @@
.. automodule:: pyramid.configuration
- .. autoclass:: Configurator(registry=None, package=None, settings=None, root_factory=None, authentication_policy=None, authorization_policy=None, renderers=DEFAULT_RENDERERS, debug_logger=None, locale_negotiator=None, request_factory=None, renderer_globals_factory=None, default_permission=None)
+ .. autoclass:: Configurator(registry=None, package=None, settings=None, root_factory=None, authentication_policy=None, authorization_policy=None, renderers=DEFAULT_RENDERERS, debug_logger=None, locale_negotiator=None, request_factory=None, renderer_globals_factory=None, default_permission=None, session_factory=None)
.. attribute:: registry
@@ -64,6 +64,8 @@
.. automethod:: set_default_permission
+ .. automethod:: set_session_factory
+
.. automethod:: set_request_factory
.. automethod:: set_renderer_globals_factory
diff --git a/pyramid/configuration.py b/pyramid/configuration.py
index b3ba7a20b..f159a342f 100644
--- a/pyramid/configuration.py
+++ b/pyramid/configuration.py
@@ -42,6 +42,7 @@ from pyramid.interfaces import IView
from pyramid.interfaces import IViewClassifier
from pyramid.interfaces import IExceptionResponse
from pyramid.interfaces import IException
+from pyramid.interfaces import ISessionFactory
from pyramid import chameleon_text
from pyramid import chameleon_zpt
@@ -187,7 +188,17 @@ class Configurator(object):
always be executable by entirely anonymous users (any
authorization policy in effect is ignored). See also
:ref:`setting_a_default_permission`.
- """
+
+ If ``session_factory`` is passed, it should be an object which
+ implements the :term:`session factory` interface. If a nondefault
+ value is passed, the ``session_factory`` will be used to create a
+ session object when ``request.session`` is accessed. Note that
+ the same outcome can be achieved by calling
+ :ref:`pyramid.configration.Configurator.set_session_factory`. By
+ default, this argument is ``None``, indicating that no session
+ factory will be configured (and thus accessing ``request.session``
+ will throw an error) unless ``set_session_factory`` is called later
+ during configuration. """
manager = manager # for testing injection
venusian = venusian # for testing injection
@@ -205,7 +216,9 @@ class Configurator(object):
locale_negotiator=None,
request_factory=None,
renderer_globals_factory=None,
- default_permission=None):
+ default_permission=None,
+ session_factory=None,
+ ):
if package is None:
package = caller_package()
name_resolver = DottedNameResolver(package)
@@ -227,6 +240,7 @@ class Configurator(object):
request_factory=request_factory,
renderer_globals_factory=renderer_globals_factory,
default_permission=default_permission,
+ session_factory=session_factory,
)
def _set_settings(self, mapping):
@@ -362,7 +376,8 @@ class Configurator(object):
renderers=DEFAULT_RENDERERS, debug_logger=None,
locale_negotiator=None, request_factory=None,
renderer_globals_factory=None,
- default_permission=None):
+ default_permission=None,
+ session_factory=None):
""" When you pass a non-``None`` ``registry`` argument to the
:term:`Configurator` constructor, no initial 'setup' is
performed against the registry. This is because the registry
@@ -408,6 +423,8 @@ class Configurator(object):
self.set_renderer_globals_factory(renderer_globals_factory)
if default_permission:
self.set_default_permission(default_permission)
+ if session_factory is not None:
+ self.set_session_factory(session_factory)
# getSiteManager is a unit testing dep injection
def hook_zca(self, getSiteManager=None):
@@ -1800,6 +1817,14 @@ class Configurator(object):
"""
self.registry.registerUtility(permission, IDefaultPermission)
+ def set_session_factory(self, session_factory):
+ """
+ Configure the application with a :term:`session factory`. If
+ this method is called, the ``session_factory`` argument must
+ be a session factory callable.
+ """
+ self.registry.registerUtility(session_factory, ISessionFactory)
+
def add_translation_dirs(self, *specs):
""" Add one or more :term:`translation directory` paths to the
current configuration state. The ``specs`` argument is a
diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py
index 9b7488f1e..204d713b4 100644
--- a/pyramid/interfaces.py
+++ b/pyramid/interfaces.py
@@ -375,5 +375,122 @@ class IDefaultPermission(Interface):
""" A string object representing the default permission to be used
for all view configurations which do not explicitly declare their
own."""
+
+class ISessionFactory(Interface):
+ """ An interface representing a factory which accepts a request object and
+ returns an ISession object """
+ def __call__(request):
+ """ Return an ISession object """
+
+class ISession(Interface):
+ """ An interface representing a session (a web session object,
+ usually accessed via ``request.session``.
+
+ Keys and values of a session must be pickleable.
+ """
+
+ # attributes
+
+ created = Attribute('Integer representing Epoch time when created.')
+ modified = Attribute(
+ 'Integer representing Epoch time of last modification. If the '
+ 'session has not yet been modified (it is new), this time will '
+ 'be the created time.')
+ new = Attribute('Boolean attribute. If ``True``, the session is new.')
+
+ # special methods
+
+ def invalidate():
+ """ Invalidate the session. The action caused by
+ ``invalidate`` is implementation-dependent, but it should have
+ the effect of completely dissociating any data stored in the
+ session with the current request. It might set response
+ values (such as one which clears a cookie), or it might not."""
+
+ def changed():
+ """ Mark the session as changed. A user of a session should
+ call this method after he or she mutates a mutable object that
+ is *a value of the session* (it should not be required after
+ mutating the session itself). For example, if the user has
+ stored a dictionary in the session under the key ``foo``, and
+ he or she does ``session['foo'] = {}``, ``changed()`` needn't
+ be called. However, if subsequently he or she does
+ ``session['foo']['a'] = 1``, ``changed()`` must be called for
+ the sessioning machinery to notice the mutation of the
+ internal dictionary."""
+
+ # mapping methods
+ def __getitem__(key):
+ """Get a value for a key
+
+ A KeyError is raised if there is no value for the key.
+ """
+
+ def get(key, default=None):
+ """Get a value for a key
+
+ The default is returned if there is no value for the key.
+ """
+
+ def __delitem__(key):
+ """Delete a value from the mapping using the key.
+
+ A KeyError is raised if there is no value for the key.
+ """
+
+ def __setitem__(key, value):
+ """Set a new item in the mapping."""
+
+ def keys():
+ """Return the keys of the mapping object.
+ """
+
+ def values():
+ """Return the values of the mapping object.
+ """
+
+ def items():
+ """Return the items of the mapping object.
+ """
+
+ def iterkeys():
+ "iterate over keys; equivalent to __iter__"
+
+ def itervalues():
+ "iterate over values"
+
+ def iteritems():
+ "iterate over items"
+
+ def clear():
+ "delete all items"
+
+ def update(d):
+ " Update D from E: for k in E.keys(): D[k] = E[k]"
+
+ def setdefault(key, default=None):
+ "D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D"
+
+ def pop(k, *args):
+ """remove specified key and return the corresponding value
+ *args may contain a single default value, or may not be supplied.
+ If key is not found, default is returned if given, otherwise
+ KeyError is raised"""
+
+ def popitem():
+ """remove and return some (key, value) pair as a
+ 2-tuple; but raise KeyError if mapping is empty"""
+
+ def __len__():
+ """Return the number of items in the session.
+ """
+
+ def __iter__():
+ """Return an iterator for the keys of the mapping object.
+ """
+
+ def __contains__(key):
+ """Return true if a key exists in the mapping."""
+
NO_PERMISSION_REQUIRED = '__no_permission_required__'
diff --git a/pyramid/request.py b/pyramid/request.py
index 19d4a260c..b3e398b6d 100644
--- a/pyramid/request.py
+++ b/pyramid/request.py
@@ -4,6 +4,10 @@ from zope.interface.interface import InterfaceClass
from webob import Request as WebobRequest
from pyramid.interfaces import IRequest
+from pyramid.interfaces import ISessionFactory
+
+from pyramid.exceptions import ConfigurationError
+from pyramid.decorator import reify
class Request(WebobRequest):
"""
@@ -137,6 +141,19 @@ class Request(WebobRequest):
callback = callbacks.pop(0)
callback(self)
+ @reify
+ def session(self):
+ """ Obtain the :term:`session` object associated with this
+ request. If a :term:`session factory` has not been registered
+ during application configuration, a
+ :class:`pyramid.exceptions.ConfigurationError` will be raised"""
+ factory = self.registry.queryUtility(ISessionFactory)
+ if factory is None:
+ raise ConfigurationError(
+ 'No session factory registered '
+ '(use ``config.add_session_factory``)')
+ return factory(self)
+
# override default WebOb "environ['adhoc_attr']" mutation behavior
__getattr__ = object.__getattribute__
__setattr__ = object.__setattr__
diff --git a/pyramid/session.py b/pyramid/session.py
new file mode 100644
index 000000000..88eb8720b
--- /dev/null
+++ b/pyramid/session.py
@@ -0,0 +1,194 @@
+try:
+ from hashlib import sha1
+except ImportError: # pragma: no cover
+ import sha as sha1
+
+try:
+ import cPickle as pickle
+except ImportError: # pragma: no cover
+ import pickle
+
+from webob import Response
+
+import hmac
+import binascii
+import time
+import base64
+
+def manage_accessed(wrapped):
+ """ Decorator which causes a cookie to be set when a wrapped
+ method is called"""
+ def accessed(session, *arg, **kw):
+ session.accessed = int(time.time())
+ if not session._dirty:
+ session._dirty = True
+ def set_cookie_callback(request, response):
+ session._set_cookie(response)
+ session.request = None # explicitly break cycle for gc
+ session.request.add_response_callback(set_cookie_callback)
+ return wrapped(session, *arg, **kw)
+ accessed.__doc__ = wrapped.__doc__
+ return accessed
+
+def manage_modified(wrapped):
+ accessed = manage_accessed(wrapped)
+ def modified(session, *arg, **kw):
+ session.modified = int(time.time())
+ return accessed(session, *arg, **kw)
+ modified.__doc__ = accessed.__doc__
+ return modified
+
+def InsecureCookieSessionFactoryConfig(
+ secret,
+ timeout=1200,
+ cookie_name='session',
+ cookie_max_age=None,
+ cookie_path='/',
+ cookie_domain=None,
+ cookie_secure=False,
+ cookie_httponly=False,
+ cookie_on_exception=False,
+ ):
+
+ class InsecureCookieSessionFactory(dict):
+ """ Dictionary-like session object """
+
+ # configuration parameters
+ _cookie_name = cookie_name
+ _cookie_max_age = cookie_max_age
+ _cookie_path = cookie_path
+ _cookie_domain = cookie_domain
+ _cookie_secure = cookie_secure
+ _cookie_httponly = cookie_httponly
+ _cookie_on_exception = cookie_on_exception
+ _secret = secret
+ _timeout = timeout
+
+ # dirty flag
+ _dirty = False
+
+ def __init__(self, request):
+ self.request = request
+ now = time.time()
+ created = accessed = modified = now
+ new = True
+ cookieval = request.cookies.get(self._cookie_name)
+ value = deserialize(cookieval, self._secret)
+ state = {}
+ if value is not None:
+ accessed, created, modified, state = value
+ new = False
+ if now - accessed > self._timeout:
+ state = {}
+
+ self.created = created
+ self.accessed = accessed
+ self.modified = modified
+ self.new = new
+ dict.__init__(self, state)
+
+ # ISession methods
+ def changed(self):
+ """ This is intentionally a noop; the session is
+ serialized on every access, so unnecessary"""
+ pass
+
+ def invalidate(self):
+ self.clear() # XXX probably needs to unset cookie
+
+ # non-modifying dictionary methods
+ get = manage_accessed(dict.get)
+ __getitem__ = manage_accessed(dict.__getitem__)
+ items = manage_accessed(dict.items)
+ iteritems = manage_accessed(dict.iteritems)
+ values = manage_accessed(dict.values)
+ itervalues = manage_accessed(dict.itervalues)
+ keys = manage_accessed(dict.keys)
+ iterkeys = manage_accessed(dict.iterkeys)
+ __contains__ = manage_accessed(dict.__contains__)
+ has_key = manage_accessed(dict.has_key)
+ __len__ = manage_accessed(dict.__len__)
+ __iter__ = manage_accessed(dict.__iter__)
+
+ # modifying dictionary methods
+ clear = manage_modified(dict.clear)
+ update = manage_modified(dict.update)
+ setdefault = manage_modified(dict.setdefault)
+ pop = manage_modified(dict.pop)
+ popitem = manage_modified(dict.popitem)
+ __setitem__ = manage_modified(dict.__setitem__)
+ __delitem__ = manage_modified(dict.__delitem__)
+
+ # non-API methods
+ def _set_cookie(self, response):
+ if not self._cookie_on_exception:
+ exception = getattr(self.request, 'exception', None)
+ if exception is not None: # dont set a cookie during exceptions
+ return False
+ cookieval = serialize(
+ (self.accessed, self.created, self.modified, dict(self)),
+ self._secret
+ )
+ if len(cookieval) > 4064:
+ raise ValueError(
+ 'Cookie value is too long to store (%s bytes)' %
+ len(cookieval)
+ )
+ if hasattr(response, 'set_cookie'):
+ # ``response`` is a "real" webob response
+ set_cookie = response.set_cookie
+ else:
+ # ``response`` is not a "real" webob response, cope
+ def set_cookie(*arg, **kw):
+ tmp_response = Response()
+ tmp_response.set_cookie(*arg, **kw)
+ response.headerlist.append(
+ tmp_response.headerlist[-1])
+ set_cookie(
+ self._cookie_name,
+ value=cookieval,
+ max_age = self._cookie_max_age,
+ path = self._cookie_path,
+ domain = self._cookie_domain,
+ secure = self._cookie_secure,
+ httponly = self._cookie_httponly,
+ )
+ return True
+
+ return InsecureCookieSessionFactory
+
+def serialize(data, secret):
+ pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
+ sig = hmac.new(secret, pickled, sha1).hexdigest()
+ return sig + base64.standard_b64encode(pickled)
+
+def deserialize(serialized, secret, hmac=hmac):
+ # hmac parameterized only for unit tests
+ if serialized is None:
+ return None
+
+ try:
+ input_sig, pickled = (serialized[:40],
+ base64.standard_b64decode(serialized[40:]))
+ except (binascii.Error, TypeError):
+ # Badly formed data can make base64 die
+ return None
+
+ sig = hmac.new(secret, pickled, sha1).hexdigest()
+
+ # Avoid timing attacks (note that this is cadged from Pylons and I
+ # have no idea what it means)
+
+ if len(sig) != len(input_sig):
+ return None
+
+ invalid_bits = 0
+
+ for a, b in zip(sig, input_sig):
+ invalid_bits += a != b
+
+ if invalid_bits:
+ return None
+
+ return pickle.loads(pickled)
+
diff --git a/pyramid/tests/test_configuration.py b/pyramid/tests/test_configuration.py
index fa2318fcc..2ff6ed1e9 100644
--- a/pyramid/tests/test_configuration.py
+++ b/pyramid/tests/test_configuration.py
@@ -186,6 +186,11 @@ class ConfiguratorTests(unittest.TestCase):
config = self._makeOne(default_permission='view')
self.assertEqual(config.registry.getUtility(IDefaultPermission), 'view')
+ def test_ctor_session_factory(self):
+ from pyramid.interfaces import ISessionFactory
+ config = self._makeOne(session_factory='factory')
+ self.assertEqual(config.registry.getUtility(ISessionFactory), 'factory')
+
def test_with_package_module(self):
from pyramid.tests import test_configuration
import pyramid.tests
@@ -2477,6 +2482,13 @@ class ConfiguratorTests(unittest.TestCase):
self.assertEqual(config.registry.getUtility(IDefaultPermission),
'view')
+ def test_set_session_factory(self):
+ from pyramid.interfaces import ISessionFactory
+ config = self._makeOne()
+ config.set_session_factory('factory')
+ self.assertEqual(config.registry.getUtility(ISessionFactory),
+ 'factory')
+
def test_add_translation_dirs_missing_dir(self):
from pyramid.exceptions import ConfigurationError
config = self._makeOne()
diff --git a/pyramid/tests/test_request.py b/pyramid/tests/test_request.py
index 92edf2a51..93cbb9691 100644
--- a/pyramid/tests/test_request.py
+++ b/pyramid/tests/test_request.py
@@ -1,6 +1,14 @@
import unittest
class TestRequest(unittest.TestCase):
+ def setUp(self):
+ from pyramid.configuration import Configurator
+ self.config = Configurator()
+ self.config.begin()
+
+ def tearDown(self):
+ self.config.end()
+
def _makeOne(self, environ):
return self._getTargetClass()(environ)
@@ -35,6 +43,22 @@ class TestRequest(unittest.TestCase):
inst = self._makeOne({})
self.assertTrue(IRequest.providedBy(inst))
+ def test_session_configured(self):
+ from pyramid.interfaces import ISessionFactory
+ inst = self._makeOne({})
+ def factory(request):
+ return 'orangejuice'
+ self.config.registry.registerUtility(factory, ISessionFactory)
+ inst.registry = self.config.registry
+ self.assertEqual(inst.session, 'orangejuice')
+ self.assertEqual(inst.__dict__['session'], 'orangejuice')
+
+ def test_session_not_configured(self):
+ from pyramid.exceptions import ConfigurationError
+ inst = self._makeOne({})
+ inst.registry = self.config.registry
+ self.assertRaises(ConfigurationError, inst.__getattr__, 'session')
+
def test_setattr_and_getattr_dotnotation(self):
inst = self._makeOne({})
inst.foo = 1
diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py
new file mode 100644
index 000000000..d8c1b2c00
--- /dev/null
+++ b/pyramid/tests/test_session.py
@@ -0,0 +1,252 @@
+import unittest
+from pyramid import testing
+
+class TestInsecureCookieSession(unittest.TestCase):
+ def _makeOne(self, request, **kw):
+ from pyramid.session import InsecureCookieSessionFactoryConfig
+ return InsecureCookieSessionFactoryConfig('secret', **kw)(request)
+
+ def test_ctor_no_cookie(self):
+ request = testing.DummyRequest()
+ session = self._makeOne(request)
+ self.assertEqual(dict(session), {})
+
+ def _serialize(self, accessed, state, secret='secret'):
+ from pyramid.session import serialize
+ return serialize((accessed, accessed, accessed, state), secret)
+
+ def test_ctor_with_cookie_still_valid(self):
+ import time
+ request = testing.DummyRequest()
+ cookieval = self._serialize(time.time(), {'state':1})
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request)
+ self.assertEqual(dict(session), {'state':1})
+
+ def test_ctor_with_cookie_expired(self):
+ request = testing.DummyRequest()
+ cookieval = self._serialize(0, {'state':1})
+ request.cookies['session'] = cookieval
+ session = self._makeOne(request)
+ self.assertEqual(dict(session), {})
+
+ def test_changed(self):
+ request = testing.DummyRequest()
+ session = self._makeOne(request)
+ self.assertEqual(session.changed(), None)
+
+ def test_invalidate(self):
+ request = testing.DummyRequest()
+ session = self._makeOne(request)
+ session['a'] = 1
+ self.assertEqual(session.invalidate(), None)
+ self.failIf('a' in session)
+
+ def test__set_cookie_on_exception(self):
+ request = testing.DummyRequest()
+ request.exception = True
+ session = self._makeOne(request)
+ session._cookie_on_exception = False
+ response = DummyResponse()
+ self.assertEqual(session._set_cookie(response), False)
+
+ def test__set_cookie_cookieval_too_long(self):
+ request = testing.DummyRequest()
+ session = self._makeOne(request)
+ session['abc'] = 'x'*100000
+ response = DummyResponse()
+ self.assertRaises(ValueError, session._set_cookie, response)
+
+ def test__set_cookie_real_webob_response(self):
+ import webob
+ request = testing.DummyRequest()
+ session = self._makeOne(request)
+ session['abc'] = 'x'
+ response = webob.Response()
+ self.assertEqual(session._set_cookie(response), True)
+ self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
+
+ def test__set_cookie_other_kind_of_response(self):
+ request = testing.DummyRequest()
+ request.exception = None
+ session = self._makeOne(request)
+ session['abc'] = 'x'
+ response = DummyResponse()
+ self.assertEqual(session._set_cookie(response), True)
+ self.assertEqual(len(response.headerlist), 1)
+
+ def test__set_cookie_options(self):
+ request = testing.DummyRequest()
+ request.exception = None
+ session = self._makeOne(request,
+ cookie_name = 'abc',
+ cookie_path = '/foo',
+ cookie_domain = 'localhost',
+ cookie_secure = True,
+ cookie_httponly = True,
+ )
+ session['abc'] = 'x'
+ response = DummyResponse()
+ self.assertEqual(session._set_cookie(response), True)
+ self.assertEqual(len(response.headerlist), 1)
+ cookieval= response.headerlist[0][1]
+ val, domain, path, secure, httponly = [x.strip() for x in
+ cookieval.split(';')]
+ self.failUnless(val.startswith('abc='))
+ self.assertEqual(domain, 'Domain=localhost')
+ self.assertEqual(path, 'Path=/foo')
+ self.assertEqual(secure, 'secure')
+ self.assertEqual(httponly, 'HttpOnly')
+
+class Test_manage_accessed(unittest.TestCase):
+ def _makeOne(self, wrapped):
+ from pyramid.session import manage_accessed
+ return manage_accessed(wrapped)
+
+ def test_accessed_set(self):
+ request = testing.DummyRequest()
+ session = DummySessionFactory(request)
+ session.accessed = None
+ wrapper = self._makeOne(session.__class__.get)
+ wrapper(session, 'a')
+ self.assertNotEqual(session.accessed, None)
+
+ def test_already_dirty(self):
+ request = testing.DummyRequest()
+ session = DummySessionFactory(request)
+ session._dirty = True
+ session['a'] = 1
+ wrapper = self._makeOne(session.__class__.get)
+ self.assertEqual(wrapper.__doc__, session.get.__doc__)
+ result = wrapper(session, 'a')
+ self.assertEqual(result, 1)
+ callbacks = request.response_callbacks
+ self.assertEqual(len(callbacks), 0)
+
+ def test_with_exception(self):
+ import webob
+ request = testing.DummyRequest()
+ request.exception = True
+ session = DummySessionFactory(request)
+ session['a'] = 1
+ wrapper = self._makeOne(session.__class__.get)
+ self.assertEqual(wrapper.__doc__, session.get.__doc__)
+ result = wrapper(session, 'a')
+ self.assertEqual(result, 1)
+ callbacks = request.response_callbacks
+ self.assertEqual(len(callbacks), 1)
+ response = webob.Response()
+ result = callbacks[0](request, response)
+ self.assertEqual(result, None)
+ self.failIf('Set-Cookie' in dict(response.headerlist))
+
+ def test_cookie_is_set(self):
+ request = testing.DummyRequest()
+ session = DummySessionFactory(request)
+ session['a'] = 1
+ wrapper = self._makeOne(session.__class__.get)
+ self.assertEqual(wrapper.__doc__, session.get.__doc__)
+ result = wrapper(session, 'a')
+ self.assertEqual(result, 1)
+ callbacks = request.response_callbacks
+ self.assertEqual(len(callbacks), 1)
+ response = DummyResponse()
+ result = callbacks[0](request, response)
+ self.assertEqual(result, None)
+ self.assertEqual(session.response, response)
+
+class Test_manage_modified(Test_manage_accessed):
+ def _makeOne(self, wrapped):
+ from pyramid.session import manage_modified
+ return manage_modified(wrapped)
+
+ def test_modified_set(self):
+ request = testing.DummyRequest()
+ session = DummySessionFactory(request)
+ session.modified = None
+ session.accessed = None
+ wrapper = self._makeOne(session.__class__.__setitem__)
+ wrapper(session, 'a', 1)
+ self.assertNotEqual(session.accessed, None)
+ self.assertNotEqual(session.modified, None)
+
+def serialize(data, secret):
+ try:
+ from hashlib import sha1
+ except ImportError: # pragma: no cover
+ import sha as sha1
+
+ try:
+ import cPickle as pickle
+ except ImportError: # pragma: no cover
+ import pickle
+
+ import hmac
+ import base64
+ pickled = pickle.dumps('123', pickle.HIGHEST_PROTOCOL)
+ sig = hmac.new(secret, pickled, sha1).hexdigest()
+ return sig + base64.standard_b64encode(pickled)
+
+class Test_serialize(unittest.TestCase):
+ def _callFUT(self, data, secret):
+ from pyramid.session import serialize
+ return serialize(data, secret)
+
+ def test_it(self):
+ expected = serialize('123', 'secret')
+ result = self._callFUT('123', 'secret')
+ self.assertEqual(result, expected)
+
+class Test_deserialize(unittest.TestCase):
+ def _callFUT(self, serialized, secret, hmac=None):
+ if hmac is None:
+ import hmac
+ from pyramid.session import deserialize
+ return deserialize(serialized, secret, hmac=hmac)
+
+ def test_it(self):
+ serialized = serialize('123', 'secret')
+ result = self._callFUT(serialized, 'secret')
+ self.assertEqual(result, '123')
+
+ def test_invalid_bits(self):
+ serialized = serialize('123', 'secret')
+ result = self._callFUT(serialized, 'seekrit')
+ self.assertEqual(result, None)
+
+ def test_invalid_len(self):
+ class hmac(object):
+ def new(self, *arg):
+ return self
+ def hexdigest(self):
+ return '1234'
+ serialized = serialize('123', 'secret123')
+ result = self._callFUT(serialized, 'secret', hmac=hmac())
+ self.assertEqual(result, None)
+
+ def test_it_bad_encoding(self):
+ serialized = 'bad' + serialize('123', 'secret')
+ result = self._callFUT(serialized, 'secret')
+ self.assertEqual(result, None)
+
+
+class DummySessionFactory(dict):
+ _dirty = False
+ _cookie_name = 'session'
+ _cookie_max_age = None
+ _cookie_path = '/'
+ _cookie_domain = None
+ _cookie_secure = False
+ _cookie_httponly = False
+ _timeout = 1200
+ _secret = 'secret'
+ def __init__(self, request):
+ self.request = request
+ dict.__init__(self, {})
+
+ def _set_cookie(self, response):
+ self.response = response
+
+class DummyResponse(object):
+ def __init__(self):
+ self.headerlist = []