summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/api/csrf.rst10
-rw-r--r--docs/api/interfaces.rst2
-rw-r--r--docs/narr/security.rst34
-rw-r--r--docs/narr/sessions.rst4
-rw-r--r--pyramid/config/views.py2
-rw-r--r--pyramid/csrf.py35
-rw-r--r--pyramid/tests/test_config/test_views.py1
-rw-r--r--pyramid/tests/test_csrf.py367
-rw-r--r--pyramid/tests/test_session.py138
-rw-r--r--pyramid/tests/test_viewderivers.py1
10 files changed, 369 insertions, 225 deletions
diff --git a/docs/api/csrf.rst b/docs/api/csrf.rst
index 3125bdac9..89fb0c4b2 100644
--- a/docs/api/csrf.rst
+++ b/docs/api/csrf.rst
@@ -5,14 +5,16 @@
.. automodule:: pyramid.csrf
+ .. autoclass:: SessionCSRF
+ :members:
+
+ .. autoclass:: CookieCSRF
+ :members:
+
.. autofunction:: get_csrf_token
.. autofunction:: new_csrf_token
- .. autoclass:: SessionCSRF
- :members:
-
.. autofunction:: check_csrf_origin
.. autofunction:: check_csrf_token
-
diff --git a/docs/api/interfaces.rst b/docs/api/interfaces.rst
index 2ca472616..b88209a36 100644
--- a/docs/api/interfaces.rst
+++ b/docs/api/interfaces.rst
@@ -44,7 +44,7 @@ Other Interfaces
.. autointerface:: IRoutePregenerator
:members:
- .. autointerface:: ICSRF
+ .. autointerface:: ICSRFPolicy
:members:
.. autointerface:: ISession
diff --git a/docs/narr/security.rst b/docs/narr/security.rst
index b4fb3b8a8..6962a0fe3 100644
--- a/docs/narr/security.rst
+++ b/docs/narr/security.rst
@@ -146,7 +146,7 @@ For example, the following view declaration protects the view named
# config is an instance of pyramid.config.Configurator
config.add_view('mypackage.views.blog_entry_add_view',
- name='add_entry.html',
+ name='add_entry.html',
context='mypackage.resources.Blog',
permission='add')
@@ -725,7 +725,7 @@ object that implements the following interface:
""" Return ``True`` if any of the ``principals`` is allowed the
``permission`` in the current ``context``, else return ``False``
"""
-
+
def principals_allowed_by_permission(self, context, permission):
""" Return a set of principal identifiers allowed by the
``permission`` in ``context``. This behavior is optional; if you
@@ -777,11 +777,27 @@ If the URL is one that may modify or delete data, the consequences can be dire.
You can avoid most of these attacks by issuing a unique token to the browser
and then requiring that it be present in all potentially unsafe requests.
-:app:`Pyramid` sessions provide facilities to create and check CSRF tokens.
+:app:`Pyramid` provides facilities to create and check CSRF tokens.
+
+By default :app:`Pyramid` comes with a session-based CSRF implementation
+:class:`pyramid.csrf.SessionCSRF`. To use it, you must first enable
+a :term:`session factory` as described in
+:ref:`using_the_default_session_factory` or
+:ref:`using_alternate_session_factories`. Alternatively, you can use
+a cookie-based implementation :class:`pyramid.csrf.CookieCSRF` which gives
+some additional flexibility as it does not require a session for each user.
+You can also define your own implementation of
+:class:`pyramid.interfaces.ICSRFPolicy` and register it with the
+:meth:`pyramid.config.Configurator.set_default_csrf_options` directive.
-To use CSRF tokens, you must first enable a :term:`session factory` as
-described in :ref:`using_the_default_session_factory` or
-:ref:`using_alternate_session_factories`.
+For example:
+
+.. code-block:: python
+
+ from pyramid.config import Configurator
+
+ config = Configurator()
+ config.set_default_csrf_options(implementation=MyCustomCSRFPolicy())
.. index::
single: csrf.get_csrf_token
@@ -866,7 +882,7 @@ Checking CSRF Tokens Manually
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In request handling code, you can check the presence and validity of a CSRF
-token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it
+token with :func:`pyramid.csrf.check_csrf_token`. If the token is valid, it
will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally,
you can specify ``raises=False`` to have the check return ``False`` instead of
raising an exception.
@@ -876,7 +892,7 @@ named ``X-CSRF-Token``.
.. code-block:: python
- from pyramid.session import check_csrf_token
+ from pyramid.csrf import check_csrf_token
def myview(request):
# Require CSRF Token
@@ -955,4 +971,4 @@ include ``check_csrf=True`` as a view predicate. See
A mismatch of a CSRF token is treated like any other predicate miss, and the
predicate system, when it doesn't find a view, raises ``HTTPNotFound``
instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different
- from calling :func:`pyramid.session.check_csrf_token`.
+ from calling :func:`pyramid.csrf.check_csrf_token`.
diff --git a/docs/narr/sessions.rst b/docs/narr/sessions.rst
index 90b5f4585..86fe2a139 100644
--- a/docs/narr/sessions.rst
+++ b/docs/narr/sessions.rst
@@ -12,8 +12,7 @@ application.
This chapter describes how to configure sessions, what session implementations
:app:`Pyramid` provides out of the box, how to store and retrieve data from
-sessions, and two session-specific features: flash messages, and cross-site
-request forgery attack prevention.
+sessions, and a session-specific feature: flash messages.
.. index::
single: session factory (default)
@@ -320,4 +319,3 @@ flash storage.
.. index::
single: preventing cross-site request forgery attacks
single: cross-site request forgery attacks, prevention
-
diff --git a/pyramid/config/views.py b/pyramid/config/views.py
index 7a383be44..4ebd014de 100644
--- a/pyramid/config/views.py
+++ b/pyramid/config/views.py
@@ -643,7 +643,7 @@ class ViewsConfiguratorMixin(object):
If CSRF checking is performed, the checked value will be the value of
``request.params[check_name]``. This value will be compared against
- the value of ``impl.get_csrf_token()`` (where ``impl`` is an
+ the value of ``policy.get_csrf_token()`` (where ``policy`` is an
implementation of :meth:`pyramid.interfaces.ICSRFPolicy`), and the
check will pass if these two values are the same. If the check
passes, the associated view will be permitted to execute. If the
diff --git a/pyramid/csrf.py b/pyramid/csrf.py
index c373079a4..7adbc9fee 100644
--- a/pyramid/csrf.py
+++ b/pyramid/csrf.py
@@ -53,11 +53,12 @@ class CookieCSRF(object):
""" An alternative CSRF implementation that stores its information in
unauthenticated cookies, known as the 'Double Submit Cookie' method in the
OWASP CSRF guidelines. This gives some additional flexibility with regards
- to scalingas the tokens can be generated and verified by a front-end server.
-
+ to scaling as the tokens can be generated and verified by a front-end
+ server.
+
.. versionadded :: 1.8a1
"""
-
+
def __init__(self, cookie_name='csrf_token', secure=False, httponly=False,
domain=None, path='/'):
self.cookie_name = cookie_name
@@ -108,8 +109,7 @@ def csrf_token_template_global(event):
return
else:
csrf = registry.getUtility(ICSRFPolicy)
- if csrf is not None:
- event['get_csrf_token'] = partial(csrf.get_csrf_token, request)
+ event['get_csrf_token'] = partial(csrf.get_csrf_token, request)
def get_csrf_token(request):
@@ -121,8 +121,7 @@ def get_csrf_token(request):
"""
registry = request.registry
csrf = registry.getUtility(ICSRFPolicy)
- if csrf is not None:
- return csrf.get_csrf_token(request)
+ return csrf.get_csrf_token(request)
def new_csrf_token(request):
@@ -134,25 +133,25 @@ def new_csrf_token(request):
"""
registry = request.registry
csrf = registry.getUtility(ICSRFPolicy)
- if csrf is not None:
- return csrf.new_csrf_token(request)
+ return csrf.new_csrf_token(request)
def check_csrf_token(request,
token='csrf_token',
header='X-CSRF-Token',
raises=True):
- """ Check the CSRF token returned by the :meth:`pyramid.interfaces.ICSRFPolicy`
- implementation against the value in ``request.POST.get(token)`` (if a POST
- request) or ``request.headers.get(header)``. If a ``token`` keyword is not
- supplied to this function, the string ``csrf_token`` will be used to look
- up the token in ``request.POST``. If a ``header`` keyword is not supplied
- to this function, the string ``X-CSRF-Token`` will be used to look up the
- token in ``request.headers``.
+ """ Check the CSRF token returned by the
+ :class:`pyramid.interfaces.ICSRFPolicy` implementation against the value in
+ ``request.POST.get(token)`` (if a POST request) or
+ ``request.headers.get(header)``. If a ``token`` keyword is not supplied to
+ this function, the string ``csrf_token`` will be used to look up the token
+ in ``request.POST``. If a ``header`` keyword is not supplied to this
+ function, the string ``X-CSRF-Token`` will be used to look up the token in
+ ``request.headers``.
If the value supplied by post or by header doesn't match the value supplied
- by ``impl.get_csrf_token()`` (where ``impl`` is an implementation of
- :meth:`pyramid.interfaces.ICSRFPolicy`), and ``raises`` is ``True``, this
+ by ``policy.get_csrf_token()`` (where ``policy`` is an implementation of
+ :class:`pyramid.interfaces.ICSRFPolicy`), and ``raises`` is ``True``, this
function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If
the values differ and ``raises`` is ``False``, this function will return
``False``. If the CSRF check is successful, this function will return
diff --git a/pyramid/tests/test_config/test_views.py b/pyramid/tests/test_config/test_views.py
index 45495f1fa..0816d9958 100644
--- a/pyramid/tests/test_config/test_views.py
+++ b/pyramid/tests/test_config/test_views.py
@@ -18,6 +18,7 @@ class TestViewsConfigurationMixin(unittest.TestCase):
def _makeOne(self, *arg, **kw):
from pyramid.config import Configurator
config = Configurator(*arg, **kw)
+ config.set_default_csrf_options(require_csrf=False)
return config
def _getViewCallable(self, config, ctx_iface=None, exc_iface=None,
diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py
index a74d2a07b..1b3f3fc3b 100644
--- a/pyramid/tests/test_csrf.py
+++ b/pyramid/tests/test_csrf.py
@@ -1,54 +1,109 @@
import unittest
+from zope.interface.interfaces import ComponentLookupError
+
+from pyramid import testing
from pyramid.config import Configurator
-from pyramid.csrf import CookieCSRF, SessionCSRF, get_csrf_token, new_csrf_token
from pyramid.events import BeforeRender
-from pyramid.interfaces import ICSRFPolicy
-from pyramid.tests.test_view import BaseTest as ViewBaseTest
-class CSRFTokenTests(ViewBaseTest, unittest.TestCase):
- class DummyCSRF(object):
- def new_csrf_token(self, request):
- return 'e5e9e30a08b34ff9842ff7d2b958c14b'
+class Test_get_csrf_token(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
- def get_csrf_token(self, request):
- return '02821185e4c94269bdc38e6eeae0a2f8'
+ def _callFUT(self, *args, **kwargs):
+ from pyramid.csrf import get_csrf_token
+ return get_csrf_token(*args, **kwargs)
+
+ def test_no_csrf_utility_registered(self):
+ request = testing.DummyRequest()
+
+ with self.assertRaises(ComponentLookupError):
+ self._callFUT(request)
+
+ def test_success(self):
+ self.config.set_default_csrf_options(implementation=DummyCSRF())
+ request = testing.DummyRequest()
+
+ csrf_token = self._callFUT(request)
+
+ self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8')
+
+
+class Test_new_csrf_token(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ def _callFUT(self, *args, **kwargs):
+ from pyramid.csrf import new_csrf_token
+ return new_csrf_token(*args, **kwargs)
+
+ def test_no_csrf_utility_registered(self):
+ request = testing.DummyRequest()
+
+ with self.assertRaises(ComponentLookupError):
+ self._callFUT(request)
+
+ def test_success(self):
+ self.config.set_default_csrf_options(implementation=DummyCSRF())
+ request = testing.DummyRequest()
+
+ csrf_token = self._callFUT(request)
+
+ self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b')
+
+
+class Test_csrf_token_template_global(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ def _callFUT(self, *args, **kwargs):
+ from pyramid.csrf import csrf_token_template_global
+ return csrf_token_template_global(*args, **kwargs)
+
+ def test_event_is_missing_request(self):
+ event = BeforeRender({}, {})
+
+ self._callFUT(event)
+
+ self.assertNotIn('get_csrf_token', event)
+
+ def test_request_is_missing_registry(self):
+ request = DummyRequest(registry=None)
+ del request.registry
+ del request.__class__.registry
+ event = BeforeRender({'request': request}, {})
+
+ self._callFUT(event)
+
+ self.assertNotIn('get_csrf_token', event)
+
+ def test_csrf_utility_not_registered(self):
+ request = testing.DummyRequest()
+ event = BeforeRender({'request': request}, {})
+
+ with self.assertRaises(ComponentLookupError):
+ self._callFUT(event)
def test_csrf_token_passed_to_template(self):
config = Configurator()
- config.set_default_csrf_options(implementation=self.DummyCSRF)
+ config.set_default_csrf_options(implementation=DummyCSRF())
config.commit()
- request = self._makeRequest()
+ request = testing.DummyRequest()
request.registry = config.registry
before = BeforeRender({'request': request}, {})
config.registry.notify(before)
+
self.assertIn('get_csrf_token', before)
self.assertEqual(
before['get_csrf_token'](),
'02821185e4c94269bdc38e6eeae0a2f8'
)
- def test_simple_api_for_tokens_from_python(self):
- config = Configurator()
- config.set_default_csrf_options(implementation=self.DummyCSRF)
- config.commit()
-
- request = self._makeRequest()
- request.registry = config.registry
- self.assertEqual(
- get_csrf_token(request),
- '02821185e4c94269bdc38e6eeae0a2f8'
- )
- self.assertEqual(
- new_csrf_token(request),
- 'e5e9e30a08b34ff9842ff7d2b958c14b'
- )
-
-class SessionCSRFTests(unittest.TestCase):
+class TestSessionCSRF(unittest.TestCase):
class MockSession(object):
def new_csrf_token(self):
return 'e5e9e30a08b34ff9842ff7d2b958c14b'
@@ -56,33 +111,75 @@ class SessionCSRFTests(unittest.TestCase):
def get_csrf_token(self):
return '02821185e4c94269bdc38e6eeae0a2f8'
- def test_session_csrf_implementation_delegates_to_session(self):
+ def _makeOne(self):
+ from pyramid.csrf import SessionCSRF
+ return SessionCSRF()
+
+ def test_register_session_csrf_policy(self):
+ from pyramid.csrf import SessionCSRF
+ from pyramid.interfaces import ICSRFPolicy
+
config = Configurator()
- config.set_default_csrf_options(implementation=SessionCSRF)
+ config.set_default_csrf_options(implementation=self._makeOne())
config.commit()
- request = DummyRequest(config.registry, session=self.MockSession())
+ policy = config.registry.queryUtility(ICSRFPolicy)
+
+ self.assertTrue(isinstance(policy, SessionCSRF))
+
+ def test_session_csrf_implementation_delegates_to_session(self):
+ policy = self._makeOne()
+ request = DummyRequest(session=self.MockSession())
+
self.assertEqual(
- config.registry.getUtility(ICSRFPolicy).get_csrf_token(request),
+ policy.get_csrf_token(request),
'02821185e4c94269bdc38e6eeae0a2f8'
)
self.assertEqual(
- config.registry.getUtility(ICSRFPolicy).new_csrf_token(request),
+ policy.new_csrf_token(request),
'e5e9e30a08b34ff9842ff7d2b958c14b'
)
+ def test_verifying_token_invalid(self):
+ policy = self._makeOne()
+ request = DummyRequest(session=self.MockSession())
-class CookieCSRFTests(unittest.TestCase):
+ result = policy.check_csrf_token(request, 'invalid-token')
+ self.assertFalse(result)
+
+ def test_verifying_token_valid(self):
+ policy = self._makeOne()
+ request = DummyRequest(session=self.MockSession())
+
+ result = policy.check_csrf_token(
+ request, '02821185e4c94269bdc38e6eeae0a2f8')
+ self.assertTrue(result)
+
+
+class TestCookieCSRF(unittest.TestCase):
+ def _makeOne(self):
+ from pyramid.csrf import CookieCSRF
+ return CookieCSRF()
+
+ def test_register_cookie_csrf_policy(self):
+ from pyramid.csrf import CookieCSRF
+ from pyramid.interfaces import ICSRFPolicy
- def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self):
config = Configurator()
- config.set_default_csrf_options(implementation=CookieCSRF())
+ config.set_default_csrf_options(implementation=self._makeOne())
config.commit()
+ policy = config.registry.queryUtility(ICSRFPolicy)
+
+ self.assertTrue(isinstance(policy, CookieCSRF))
+
+ def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self):
response = MockResponse()
- request = DummyRequest(config.registry, response=response)
+ request = DummyRequest(response=response)
+
+ policy = self._makeOne()
+ token = policy.get_csrf_token(request)
- token = config.registry.getUtility(ICSRFPolicy).get_csrf_token(request)
self.assertEqual(
response.called_args,
('csrf_token', token),
@@ -99,15 +196,13 @@ class CookieCSRFTests(unittest.TestCase):
)
def test_existing_cookie_csrf_does_not_set_cookie(self):
- config = Configurator()
- config.set_default_csrf_options(implementation=CookieCSRF())
- config.commit()
-
response = MockResponse()
- request = DummyRequest(config.registry, response=response)
+ request = DummyRequest(response=response)
request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
- token = config.registry.getUtility(ICSRFPolicy).get_csrf_token(request)
+ policy = self._makeOne()
+ token = policy.get_csrf_token(request)
+
self.assertEqual(
token,
'e6f325fee5974f3da4315a8ccf4513d2'
@@ -122,15 +217,13 @@ class CookieCSRFTests(unittest.TestCase):
)
def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self):
- config = Configurator()
- config.set_default_csrf_options(implementation=CookieCSRF())
- config.commit()
-
response = MockResponse()
- request = DummyRequest(config.registry, response=response)
+ request = DummyRequest(response=response)
request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
- token = config.registry.getUtility(ICSRFPolicy).new_csrf_token(request)
+ policy = self._makeOne()
+ token = policy.new_csrf_token(request)
+
self.assertEqual(
response.called_args,
('csrf_token', token),
@@ -146,13 +239,177 @@ class CookieCSRFTests(unittest.TestCase):
}
)
+ def test_verifying_token_invalid_token(self):
+ response = MockResponse()
+ request = DummyRequest(response=response)
+ request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
+
+ policy = self._makeOne()
+ self.assertFalse(
+ policy.check_csrf_token(request, 'invalid-token')
+ )
+
+ def test_verifying_token_against_existing_cookie(self):
+ response = MockResponse()
+ request = DummyRequest(response=response)
+ request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
+
+ policy = self._makeOne()
+ self.assertTrue(
+ policy.check_csrf_token(request, 'e6f325fee5974f3da4315a8ccf4513d2')
+ )
+
+
+class Test_check_csrf_token(unittest.TestCase):
+ def setUp(self):
+ self.config = testing.setUp()
+
+ # set up CSRF (this will also register SessionCSRF policy)
+ self.config.set_default_csrf_options(require_csrf=False)
+
+ def _callFUT(self, *args, **kwargs):
+ from ..csrf import check_csrf_token
+ return check_csrf_token(*args, **kwargs)
+
+ def test_success_token(self):
+ request = testing.DummyRequest()
+ request.method = "POST"
+ request.POST = {'csrf_token': request.session.get_csrf_token()}
+ self.assertEqual(self._callFUT(request, token='csrf_token'), True)
+
+ def test_success_header(self):
+ request = testing.DummyRequest()
+ request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
+ self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True)
+
+ def test_success_default_token(self):
+ request = testing.DummyRequest()
+ request.method = "POST"
+ request.POST = {'csrf_token': request.session.get_csrf_token()}
+ self.assertEqual(self._callFUT(request), True)
+
+ def test_success_default_header(self):
+ request = testing.DummyRequest()
+ request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
+ self.assertEqual(self._callFUT(request), True)
+
+ def test_failure_raises(self):
+ from pyramid.exceptions import BadCSRFToken
+ request = testing.DummyRequest()
+ self.assertRaises(BadCSRFToken, self._callFUT, request,
+ 'csrf_token')
+
+ def test_failure_no_raises(self):
+ request = testing.DummyRequest()
+ result = self._callFUT(request, 'csrf_token', raises=False)
+ self.assertEqual(result, False)
+
+ def test_token_differing_types(self):
+ from pyramid.compat import text_
+ request = testing.DummyRequest()
+ request.method = "POST"
+ request.session['_csrft_'] = text_('foo')
+ request.POST = {'csrf_token': b'foo'}
+ self.assertEqual(self._callFUT(request, token='csrf_token'), True)
+
+
+class Test_check_csrf_origin(unittest.TestCase):
+ def _callFUT(self, *args, **kwargs):
+ from ..csrf import check_csrf_origin
+ return check_csrf_origin(*args, **kwargs)
+
+ def test_success_with_http(self):
+ request = testing.DummyRequest()
+ request.scheme = "http"
+ self.assertTrue(self._callFUT(request))
+
+ def test_success_with_https_and_referrer(self):
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com"
+ request.host_port = "443"
+ request.referrer = "https://example.com/login/"
+ request.registry.settings = {}
+ self.assertTrue(self._callFUT(request))
+
+ def test_success_with_https_and_origin(self):
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com"
+ request.host_port = "443"
+ request.headers = {"Origin": "https://example.com/"}
+ request.referrer = "https://not-example.com/"
+ request.registry.settings = {}
+ self.assertTrue(self._callFUT(request))
+
+ def test_success_with_additional_trusted_host(self):
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com"
+ request.host_port = "443"
+ request.referrer = "https://not-example.com/login/"
+ request.registry.settings = {
+ "pyramid.csrf_trusted_origins": ["not-example.com"],
+ }
+ self.assertTrue(self._callFUT(request))
+
+ def test_success_with_nonstandard_port(self):
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com:8080"
+ request.host_port = "8080"
+ request.referrer = "https://example.com:8080/login/"
+ request.registry.settings = {}
+ self.assertTrue(self._callFUT(request))
+
+ def test_fails_with_wrong_host(self):
+ from pyramid.exceptions import BadCSRFOrigin
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com"
+ request.host_port = "443"
+ request.referrer = "https://not-example.com/login/"
+ request.registry.settings = {}
+ self.assertRaises(BadCSRFOrigin, self._callFUT, request)
+ self.assertFalse(self._callFUT(request, raises=False))
+
+ def test_fails_with_no_origin(self):
+ from pyramid.exceptions import BadCSRFOrigin
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.referrer = None
+ self.assertRaises(BadCSRFOrigin, self._callFUT, request)
+ self.assertFalse(self._callFUT(request, raises=False))
+
+ def test_fails_when_http_to_https(self):
+ from pyramid.exceptions import BadCSRFOrigin
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com"
+ request.host_port = "443"
+ request.referrer = "http://example.com/evil/"
+ request.registry.settings = {}
+ self.assertRaises(BadCSRFOrigin, self._callFUT, request)
+ self.assertFalse(self._callFUT(request, raises=False))
+
+ def test_fails_with_nonstandard_port(self):
+ from pyramid.exceptions import BadCSRFOrigin
+ request = testing.DummyRequest()
+ request.scheme = "https"
+ request.host = "example.com:8080"
+ request.host_port = "8080"
+ request.referrer = "https://example.com/login/"
+ request.registry.settings = {}
+ self.assertRaises(BadCSRFOrigin, self._callFUT, request)
+ self.assertFalse(self._callFUT(request, raises=False))
+
class DummyRequest(object):
registry = None
session = None
cookies = {}
- def __init__(self, registry, session=None, response=None):
+ def __init__(self, registry=None, session=None, response=None):
self.registry = registry
self.session = session
self.response = response
@@ -170,3 +427,11 @@ class MockResponse(object):
self.called_args = args
self.called_kwargs = kwargs
return
+
+
+class DummyCSRF(object):
+ def new_csrf_token(self, request):
+ return 'e5e9e30a08b34ff9842ff7d2b958c14b'
+
+ def get_csrf_token(self, request):
+ return '02821185e4c94269bdc38e6eeae0a2f8'
diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py
index b51dccecc..ade602799 100644
--- a/pyramid/tests/test_session.py
+++ b/pyramid/tests/test_session.py
@@ -659,144 +659,6 @@ class Test_signed_deserialize(unittest.TestCase):
result = self._callFUT(serialized, secret.decode('latin-1'))
self.assertEqual(result, '123')
-class Test_check_csrf_token(unittest.TestCase):
- def _callFUT(self, *args, **kwargs):
- from ..csrf import check_csrf_token
- return check_csrf_token(*args, **kwargs)
-
- def test_success_token(self):
- request = testing.DummyRequest()
- request.method = "POST"
- request.POST = {'csrf_token': request.session.get_csrf_token()}
- self.assertEqual(self._callFUT(request, token='csrf_token'), True)
-
- def test_success_header(self):
- request = testing.DummyRequest()
- request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
- self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True)
-
- def test_success_default_token(self):
- request = testing.DummyRequest()
- request.method = "POST"
- request.POST = {'csrf_token': request.session.get_csrf_token()}
- self.assertEqual(self._callFUT(request), True)
-
- def test_success_default_header(self):
- request = testing.DummyRequest()
- request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
- self.assertEqual(self._callFUT(request), True)
-
- def test_failure_raises(self):
- from pyramid.exceptions import BadCSRFToken
- request = testing.DummyRequest()
- self.assertRaises(BadCSRFToken, self._callFUT, request,
- 'csrf_token')
-
- def test_failure_no_raises(self):
- request = testing.DummyRequest()
- result = self._callFUT(request, 'csrf_token', raises=False)
- self.assertEqual(result, False)
-
- def test_token_differing_types(self):
- from pyramid.compat import text_
- request = testing.DummyRequest()
- request.method = "POST"
- request.session['_csrft_'] = text_('foo')
- request.POST = {'csrf_token': b'foo'}
- self.assertEqual(self._callFUT(request, token='csrf_token'), True)
-
-
-class Test_check_csrf_origin(unittest.TestCase):
-
- def _callFUT(self, *args, **kwargs):
- from ..csrf import check_csrf_origin
- return check_csrf_origin(*args, **kwargs)
-
- def test_success_with_http(self):
- request = testing.DummyRequest()
- request.scheme = "http"
- self.assertTrue(self._callFUT(request))
-
- def test_success_with_https_and_referrer(self):
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com"
- request.host_port = "443"
- request.referrer = "https://example.com/login/"
- request.registry.settings = {}
- self.assertTrue(self._callFUT(request))
-
- def test_success_with_https_and_origin(self):
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com"
- request.host_port = "443"
- request.headers = {"Origin": "https://example.com/"}
- request.referrer = "https://not-example.com/"
- request.registry.settings = {}
- self.assertTrue(self._callFUT(request))
-
- def test_success_with_additional_trusted_host(self):
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com"
- request.host_port = "443"
- request.referrer = "https://not-example.com/login/"
- request.registry.settings = {
- "pyramid.csrf_trusted_origins": ["not-example.com"],
- }
- self.assertTrue(self._callFUT(request))
-
- def test_success_with_nonstandard_port(self):
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com:8080"
- request.host_port = "8080"
- request.referrer = "https://example.com:8080/login/"
- request.registry.settings = {}
- self.assertTrue(self._callFUT(request))
-
- def test_fails_with_wrong_host(self):
- from pyramid.exceptions import BadCSRFOrigin
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com"
- request.host_port = "443"
- request.referrer = "https://not-example.com/login/"
- request.registry.settings = {}
- self.assertRaises(BadCSRFOrigin, self._callFUT, request)
- self.assertFalse(self._callFUT(request, raises=False))
-
- def test_fails_with_no_origin(self):
- from pyramid.exceptions import BadCSRFOrigin
- request = testing.DummyRequest()
- request.scheme = "https"
- request.referrer = None
- self.assertRaises(BadCSRFOrigin, self._callFUT, request)
- self.assertFalse(self._callFUT(request, raises=False))
-
- def test_fails_when_http_to_https(self):
- from pyramid.exceptions import BadCSRFOrigin
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com"
- request.host_port = "443"
- request.referrer = "http://example.com/evil/"
- request.registry.settings = {}
- self.assertRaises(BadCSRFOrigin, self._callFUT, request)
- self.assertFalse(self._callFUT(request, raises=False))
-
- def test_fails_with_nonstandard_port(self):
- from pyramid.exceptions import BadCSRFOrigin
- request = testing.DummyRequest()
- request.scheme = "https"
- request.host = "example.com:8080"
- request.host_port = "8080"
- request.referrer = "https://example.com/login/"
- request.registry.settings = {}
- self.assertRaises(BadCSRFOrigin, self._callFUT, request)
- self.assertFalse(self._callFUT(request, raises=False))
-
class DummySerializer(object):
def dumps(self, value):
diff --git a/pyramid/tests/test_viewderivers.py b/pyramid/tests/test_viewderivers.py
index 51d0bd367..6b81cc1e5 100644
--- a/pyramid/tests/test_viewderivers.py
+++ b/pyramid/tests/test_viewderivers.py
@@ -12,6 +12,7 @@ class TestDeriveView(unittest.TestCase):
def setUp(self):
self.config = testing.setUp()
+ self.config.set_default_csrf_options(require_csrf=False)
def tearDown(self):
self.config = None