diff options
| -rw-r--r-- | CHANGES.txt | 12 | ||||
| -rw-r--r-- | CONTRIBUTORS.txt | 2 | ||||
| -rw-r--r-- | docs/api/config.rst | 1 | ||||
| -rw-r--r-- | docs/api/csrf.rst | 23 | ||||
| -rw-r--r-- | docs/api/interfaces.rst | 3 | ||||
| -rw-r--r-- | docs/api/session.rst | 4 | ||||
| -rw-r--r-- | docs/glossary.rst | 5 | ||||
| -rw-r--r-- | docs/narr/security.rst | 216 | ||||
| -rw-r--r-- | docs/narr/sessions.rst | 183 | ||||
| -rw-r--r-- | docs/narr/templates.rst | 4 | ||||
| -rw-r--r-- | pyramid/config/__init__.py | 1 | ||||
| -rw-r--r-- | pyramid/config/security.py | 29 | ||||
| -rw-r--r-- | pyramid/config/views.py | 20 | ||||
| -rw-r--r-- | pyramid/csrf.py | 332 | ||||
| -rw-r--r-- | pyramid/interfaces.py | 43 | ||||
| -rw-r--r-- | pyramid/predicates.py | 2 | ||||
| -rw-r--r-- | pyramid/renderers.py | 5 | ||||
| -rw-r--r-- | pyramid/session.py | 169 | ||||
| -rw-r--r-- | pyramid/testing.py | 1 | ||||
| -rw-r--r-- | pyramid/tests/test_config/test_views.py | 3 | ||||
| -rw-r--r-- | pyramid/tests/test_csrf.py | 406 | ||||
| -rw-r--r-- | pyramid/tests/test_renderers.py | 8 | ||||
| -rw-r--r-- | pyramid/tests/test_session.py | 138 | ||||
| -rw-r--r-- | pyramid/tests/test_util.py | 6 | ||||
| -rw-r--r-- | pyramid/tests/test_viewderivers.py | 1 | ||||
| -rw-r--r-- | pyramid/viewderivers.py | 2 |
26 files changed, 1119 insertions, 500 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 8868e6ff7..a2e2d6db1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,6 +24,14 @@ Features can be alleviated by invoking ``config.begin()`` and ``config.end()`` appropriately. See https://github.com/Pylons/pyramid/pull/2989 +- A new CSRF implementation, ``pyramid.csrf.SessionCSRFStoragePolicy``, + has been added which delegates all CSRF generation to the current session, + following the old API for this. A ``pyramid.csrf.get_csrf_token()`` api is now + available in template global scope, to make it easy for template developers + to get the current CSRF token without adding it to Python code. + See https://github.com/Pylons/pyramid/pull/2854 and + https://github.com/Pylons/pyramid/pull/3019 + - The ``pyramid.config.Configurator`` can now be used as a context manager which will automatically push/pop threadlocals (similar to ``config.begin()`` and ``config.end()``). It will also automatically perform @@ -56,3 +64,7 @@ Backward Incompatibilities Documentation Changes --------------------- + +- Retrieving CSRF token from the session has been deprecated, in favor of + equivalent methods in :mod:`pyramid.csrf`. + See https://github.com/Pylons/pyramid/pull/2854 diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 3fe2c2d58..ca1f56f51 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -291,6 +291,8 @@ Contributors - Mikko Ohtamaa, 2016/12/6 +- Jure Cerjak, 2016/12/7 + - Martin Frlin, 2016/12/7 - Kirill Kuzminykh, 2017/03/01 diff --git a/docs/api/config.rst b/docs/api/config.rst index c76d3d5ff..a785b64ad 100644 --- a/docs/api/config.rst +++ b/docs/api/config.rst @@ -37,6 +37,7 @@ .. automethod:: set_authentication_policy .. automethod:: set_authorization_policy .. automethod:: set_default_csrf_options + .. automethod:: set_csrf_storage_policy .. automethod:: set_default_permission .. automethod:: add_permission diff --git a/docs/api/csrf.rst b/docs/api/csrf.rst new file mode 100644 index 000000000..38501546e --- /dev/null +++ b/docs/api/csrf.rst @@ -0,0 +1,23 @@ +.. _csrf_module: + +:mod:`pyramid.csrf` +------------------- + +.. automodule:: pyramid.csrf + + .. autoclass:: LegacySessionCSRFStoragePolicy + :members: + + .. autoclass:: SessionCSRFStoragePolicy + :members: + + .. autoclass:: CookieCSRFStoragePolicy + :members: + + .. autofunction:: get_csrf_token + + .. autofunction:: new_csrf_token + + .. autofunction:: check_csrf_origin + + .. autofunction:: check_csrf_token diff --git a/docs/api/interfaces.rst b/docs/api/interfaces.rst index a212ba7a9..e542a6be0 100644 --- a/docs/api/interfaces.rst +++ b/docs/api/interfaces.rst @@ -44,6 +44,9 @@ Other Interfaces .. autointerface:: IRoutePregenerator :members: + .. autointerface:: ICSRFStoragePolicy + :members: + .. autointerface:: ISession :members: diff --git a/docs/api/session.rst b/docs/api/session.rst index 56c4f52d7..53bae7c52 100644 --- a/docs/api/session.rst +++ b/docs/api/session.rst @@ -9,10 +9,6 @@ .. autofunction:: signed_deserialize - .. autofunction:: check_csrf_origin - - .. autofunction:: check_csrf_token - .. autofunction:: SignedCookieSessionFactory .. autofunction:: UnencryptedCookieSessionFactoryConfig diff --git a/docs/glossary.rst b/docs/glossary.rst index 0a46fac3b..0cf96f488 100644 --- a/docs/glossary.rst +++ b/docs/glossary.rst @@ -891,6 +891,11 @@ Glossary :meth:`pyramid.config.Configurator.set_session_factory` for more information. + CSRF storage policy + A utility that implements :class:`pyramid.interfaces.ICSRFStoragePolicy` + which is responsible for allocating CSRF tokens to a user and verifying + that a provided token is acceptable. + Mako `Mako <http://www.makotemplates.org/>`_ is a template language which refines the familiar ideas of componentized layout and inheritance diff --git a/docs/narr/security.rst b/docs/narr/security.rst index 77e7fd707..ddf496b69 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -146,7 +146,7 @@ For example, the following view declaration protects the view named # config is an instance of pyramid.config.Configurator config.add_view('mypackage.views.blog_entry_add_view', - name='add_entry.html', + name='add_entry.html', context='mypackage.resources.Blog', permission='add') @@ -725,7 +725,7 @@ object that implements the following interface: """ Return ``True`` if any of the ``principals`` is allowed the ``permission`` in the current ``context``, else return ``False`` """ - + def principals_allowed_by_permission(self, context, permission): """ Return a set of principal identifiers allowed by the ``permission`` in ``context``. This behavior is optional; if you @@ -765,3 +765,215 @@ which would allow the attacker to control the content of the payload. Re-using a secret across two different subsystems might drop the security of signing to zero. Keys should not be re-used across different contexts where an attacker has the possibility of providing a chosen plaintext. + +.. index:: + single: preventing cross-site request forgery attacks + single: cross-site request forgery attacks, prevention + +Preventing Cross-Site Request Forgery Attacks +--------------------------------------------- + +`Cross-site request forgery +<https://en.wikipedia.org/wiki/Cross-site_request_forgery>`_ attacks are a +phenomenon whereby a user who is logged in to your website might inadvertantly +load a URL because it is linked from, or embedded in, an attacker's website. +If the URL is one that may modify or delete data, the consequences can be dire. + +You can avoid most of these attacks by issuing a unique token to the browser +and then requiring that it be present in all potentially unsafe requests. +:app:`Pyramid` provides facilities to create and check CSRF tokens. + +By default :app:`Pyramid` comes with a session-based CSRF implementation +:class:`pyramid.csrf.SessionCSRFStoragePolicy`. To use it, you must first enable +a :term:`session factory` as described in +:ref:`using_the_default_session_factory` or +:ref:`using_alternate_session_factories`. Alternatively, you can use +a cookie-based implementation :class:`pyramid.csrf.CookieCSRFStoragePolicy` which gives +some additional flexibility as it does not require a session for each user. +You can also define your own implementation of +:class:`pyramid.interfaces.ICSRFStoragePolicy` and register it with the +:meth:`pyramid.config.Configurator.set_csrf_storage_policy` directive. + +For example: + +.. code-block:: python + + from pyramid.config import Configurator + + config = Configurator() + config.set_csrf_storage_policy(MyCustomCSRFPolicy()) + +.. index:: + single: csrf.get_csrf_token + +Using the ``csrf.get_csrf_token`` Method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To get the current CSRF token, use the +:data:`pyramid.csrf.get_csrf_token` method. + +.. code-block:: python + + from pyramid.csrf import get_csrf_token + token = get_csrf_token(request) + +The ``get_csrf_token()`` method accepts a single argument: the request. It +returns a CSRF *token* string. If ``get_csrf_token()`` or ``new_csrf_token()`` +was invoked previously for this user, then the existing token will be returned. +If no CSRF token previously existed for this user, then a new token will be set +into the session and returned. The newly created token will be opaque and +randomized. + +.. _get_csrf_token_in_templates: + +Using the ``get_csrf_token`` global in templates +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Templates have a ``get_csrf_token()`` method inserted into their globals, which +allows you to get the current token without modifying the view code. This +method takes no arguments and returns a CSRF token string. You can use the +returned token as the value of a hidden field in a form that posts to a method +that requires elevated privileges, or supply it as a request header in AJAX +requests. + +For example, include the CSRF token as a hidden field: + +.. code-block:: html + + <form method="post" action="/myview"> + <input type="hidden" name="csrf_token" value="${get_csrf_token()}"> + <input type="submit" value="Delete Everything"> + </form> + +Or include it as a header in a jQuery AJAX request: + +.. code-block:: javascript + + var csrfToken = "${get_csrf_token()}"; + $.ajax({ + type: "POST", + url: "/myview", + headers: { 'X-CSRF-Token': csrfToken } + }).done(function() { + alert("Deleted"); + }); + +The handler for the URL that receives the request should then require that the +correct CSRF token is supplied. + +.. index:: + single: csrf.new_csrf_token + +Using the ``csrf.new_csrf_token`` Method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To explicitly create a new CSRF token, use the ``csrf.new_csrf_token()`` +method. This differs only from ``csrf.get_csrf_token()`` inasmuch as it +clears any existing CSRF token, creates a new CSRF token, sets the token into +the user, and returns the token. + +.. code-block:: python + + from pyramid.csrf import get_csrf_token + token = new_csrf_token() + +.. note:: + + It is not possible to force a new CSRF token from a template. If you + want to regenerate your CSRF token then do it in the view code and return + the new token as part of the context. + +Checking CSRF Tokens Manually +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In request handling code, you can check the presence and validity of a CSRF +token with :func:`pyramid.csrf.check_csrf_token`. If the token is valid, it +will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, +you can specify ``raises=False`` to have the check return ``False`` instead of +raising an exception. + +By default, it checks for a POST parameter named ``csrf_token`` or a header +named ``X-CSRF-Token``. + +.. code-block:: python + + from pyramid.csrf import check_csrf_token + + def myview(request): + # Require CSRF Token + check_csrf_token(request) + + # ... + +.. _auto_csrf_checking: + +Checking CSRF Tokens Automatically +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 1.7 + +:app:`Pyramid` supports automatically checking CSRF tokens on requests with an +unsafe method as defined by RFC2616. Any other request may be checked manually. +This feature can be turned on globally for an application using the +:meth:`pyramid.config.Configurator.set_default_csrf_options` directive. +For example: + +.. code-block:: python + + from pyramid.config import Configurator + + config = Configurator() + config.set_default_csrf_options(require_csrf=True) + +CSRF checking may be explicitly enabled or disabled on a per-view basis using +the ``require_csrf`` view option. A value of ``True`` or ``False`` will +override the default set by ``set_default_csrf_options``. For example: + +.. code-block:: python + + @view_config(route_name='hello', require_csrf=False) + def myview(request): + # ... + +When CSRF checking is active, the token and header used to find the +supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively, +unless otherwise overridden by ``set_default_csrf_options``. The token is +checked against the value in ``request.POST`` which is the submitted form body. +If this value is not present, then the header will be checked. + +In addition to token based CSRF checks, if the request is using HTTPS then the +automatic CSRF checking will also check the referrer of the request to ensure +that it matches one of the trusted origins. By default the only trusted origin +is the current host, however additional origins may be configured by setting +``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they +are non standard). If a host in the list of domains starts with a ``.`` then +that will allow all subdomains as well as the domain without the ``.``. + +If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or +:class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This +exception may be caught and handled by an :term:`exception view` but, by +default, will result in a ``400 Bad Request`` response being sent to the +client. + +Checking CSRF Tokens with a View Predicate +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. deprecated:: 1.7 + Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead + to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised. + +A convenient way to require a valid CSRF token for a particular view is to +include ``check_csrf=True`` as a view predicate. See +:meth:`pyramid.config.Configurator.add_view`. + +.. code-block:: python + + @view_config(request_method='POST', check_csrf=True, ...) + def myview(request): + ... + +.. note:: + A mismatch of a CSRF token is treated like any other predicate miss, and the + predicate system, when it doesn't find a view, raises ``HTTPNotFound`` + instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different + from calling :func:`pyramid.csrf.check_csrf_token`. diff --git a/docs/narr/sessions.rst b/docs/narr/sessions.rst index 5b24201a9..7e2469d54 100644 --- a/docs/narr/sessions.rst +++ b/docs/narr/sessions.rst @@ -12,8 +12,7 @@ application. This chapter describes how to configure sessions, what session implementations :app:`Pyramid` provides out of the box, how to store and retrieve data from -sessions, and two session-specific features: flash messages, and cross-site -request forgery attack prevention. +sessions, and a session-specific feature: flash messages. .. index:: single: session factory (default) @@ -316,183 +315,3 @@ flash storage. ['info message'] >>> request.session.peek_flash() [] - -.. index:: - single: preventing cross-site request forgery attacks - single: cross-site request forgery attacks, prevention - -Preventing Cross-Site Request Forgery Attacks ---------------------------------------------- - -`Cross-site request forgery -<https://en.wikipedia.org/wiki/Cross-site_request_forgery>`_ attacks are a -phenomenon whereby a user who is logged in to your website might inadvertantly -load a URL because it is linked from, or embedded in, an attacker's website. -If the URL is one that may modify or delete data, the consequences can be dire. - -You can avoid most of these attacks by issuing a unique token to the browser -and then requiring that it be present in all potentially unsafe requests. -:app:`Pyramid` sessions provide facilities to create and check CSRF tokens. - -To use CSRF tokens, you must first enable a :term:`session factory` as -described in :ref:`using_the_default_session_factory` or -:ref:`using_alternate_session_factories`. - -.. index:: - single: session.get_csrf_token - -Using the ``session.get_csrf_token`` Method -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To get the current CSRF token from the session, use the -``session.get_csrf_token()`` method. - -.. code-block:: python - - token = request.session.get_csrf_token() - -The ``session.get_csrf_token()`` method accepts no arguments. It returns a -CSRF *token* string. If ``session.get_csrf_token()`` or -``session.new_csrf_token()`` was invoked previously for this session, then the -existing token will be returned. If no CSRF token previously existed for this -session, then a new token will be set into the session and returned. The newly -created token will be opaque and randomized. - -You can use the returned token as the value of a hidden field in a form that -posts to a method that requires elevated privileges, or supply it as a request -header in AJAX requests. - -For example, include the CSRF token as a hidden field: - -.. code-block:: html - - <form method="post" action="/myview"> - <input type="hidden" name="csrf_token" value="${request.session.get_csrf_token()}"> - <input type="submit" value="Delete Everything"> - </form> - -Or include it as a header in a jQuery AJAX request: - -.. code-block:: javascript - - var csrfToken = ${request.session.get_csrf_token()}; - $.ajax({ - type: "POST", - url: "/myview", - headers: { 'X-CSRF-Token': csrfToken } - }).done(function() { - alert("Deleted"); - }); - -The handler for the URL that receives the request should then require that the -correct CSRF token is supplied. - -.. index:: - single: session.new_csrf_token - -Using the ``session.new_csrf_token`` Method -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To explicitly create a new CSRF token, use the ``session.new_csrf_token()`` -method. This differs only from ``session.get_csrf_token()`` inasmuch as it -clears any existing CSRF token, creates a new CSRF token, sets the token into -the session, and returns the token. - -.. code-block:: python - - token = request.session.new_csrf_token() - -Checking CSRF Tokens Manually -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -In request handling code, you can check the presence and validity of a CSRF -token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it -will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, -you can specify ``raises=False`` to have the check return ``False`` instead of -raising an exception. - -By default, it checks for a POST parameter named ``csrf_token`` or a header -named ``X-CSRF-Token``. - -.. code-block:: python - - from pyramid.session import check_csrf_token - - def myview(request): - # Require CSRF Token - check_csrf_token(request) - - # ... - -.. _auto_csrf_checking: - -Checking CSRF Tokens Automatically -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. versionadded:: 1.7 - -:app:`Pyramid` supports automatically checking CSRF tokens on requests with an -unsafe method as defined by RFC2616. Any other request may be checked manually. -This feature can be turned on globally for an application using the -:meth:`pyramid.config.Configurator.set_default_csrf_options` directive. -For example: - -.. code-block:: python - - from pyramid.config import Configurator - - config = Configurator() - config.set_default_csrf_options(require_csrf=True) - -CSRF checking may be explicitly enabled or disabled on a per-view basis using -the ``require_csrf`` view option. A value of ``True`` or ``False`` will -override the default set by ``set_default_csrf_options``. For example: - -.. code-block:: python - - @view_config(route_name='hello', require_csrf=False) - def myview(request): - # ... - -When CSRF checking is active, the token and header used to find the -supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively, -unless otherwise overridden by ``set_default_csrf_options``. The token is -checked against the value in ``request.POST`` which is the submitted form body. -If this value is not present, then the header will be checked. - -In addition to token based CSRF checks, if the request is using HTTPS then the -automatic CSRF checking will also check the referrer of the request to ensure -that it matches one of the trusted origins. By default the only trusted origin -is the current host, however additional origins may be configured by setting -``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they -are non standard). If a host in the list of domains starts with a ``.`` then -that will allow all subdomains as well as the domain without the ``.``. - -If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or -:class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This -exception may be caught and handled by an :term:`exception view` but, by -default, will result in a ``400 Bad Request`` response being sent to the -client. - -Checking CSRF Tokens with a View Predicate -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. deprecated:: 1.7 - Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead - to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised. - -A convenient way to require a valid CSRF token for a particular view is to -include ``check_csrf=True`` as a view predicate. See -:meth:`pyramid.config.Configurator.add_view`. - -.. code-block:: python - - @view_config(request_method='POST', check_csrf=True, ...) - def myview(request): - ... - -.. note:: - A mismatch of a CSRF token is treated like any other predicate miss, and the - predicate system, when it doesn't find a view, raises ``HTTPNotFound`` - instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different - from calling :func:`pyramid.session.check_csrf_token`. diff --git a/docs/narr/templates.rst b/docs/narr/templates.rst index 6b3b5fcce..4eadbd2f0 100644 --- a/docs/narr/templates.rst +++ b/docs/narr/templates.rst @@ -228,6 +228,10 @@ These values are provided to the template: provided if the template is rendered as the result of a ``renderer=`` argument to the view configuration being used. +``get_csrf_token()`` + A convenience function to access the current CSRF token. See + :ref:`get_csrf_token_in_templates` for more information. + ``renderer_name`` The renderer name used to perform the rendering, e.g., ``mypackage:templates/foo.pt``. diff --git a/pyramid/config/__init__.py b/pyramid/config/__init__.py index bcd4b3904..a34f0b4db 100644 --- a/pyramid/config/__init__.py +++ b/pyramid/config/__init__.py @@ -396,6 +396,7 @@ class Configurator( self.add_default_view_derivers() self.add_default_route_predicates() self.add_default_tweens() + self.add_default_security() if exceptionresponse_view is not None: exceptionresponse_view = self.maybe_dotted(exceptionresponse_view) diff --git a/pyramid/config/security.py b/pyramid/config/security.py index 1d4bbe890..20b816161 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -3,17 +3,24 @@ from zope.interface import implementer from pyramid.interfaces import ( IAuthorizationPolicy, IAuthenticationPolicy, + ICSRFStoragePolicy, IDefaultCSRFOptions, IDefaultPermission, PHASE1_CONFIG, PHASE2_CONFIG, ) +from pyramid.csrf import LegacySessionCSRFStoragePolicy from pyramid.exceptions import ConfigurationError from pyramid.util import action_method from pyramid.util import as_sorted_tuple + class SecurityConfiguratorMixin(object): + + def add_default_security(self): + self.set_csrf_storage_policy(LegacySessionCSRFStoragePolicy()) + @action_method def set_authentication_policy(self, policy): """ Override the :app:`Pyramid` :term:`authentication policy` in the @@ -223,9 +230,31 @@ class SecurityConfiguratorMixin(object): intr['header'] = header intr['safe_methods'] = as_sorted_tuple(safe_methods) intr['callback'] = callback + self.action(IDefaultCSRFOptions, register, order=PHASE1_CONFIG, introspectables=(intr,)) + @action_method + def set_csrf_storage_policy(self, policy): + """ + Set the :term:`CSRF storage policy` used by subsequent view + registrations. + + ``policy`` is a class that implements the + :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface and defines + how to generate and persist CSRF tokens. + + """ + def register(): + self.registry.registerUtility(policy, ICSRFStoragePolicy) + intr = self.introspectable('csrf storage policy', + None, + policy, + 'csrf storage policy') + intr['policy'] = policy + self.action(ICSRFStoragePolicy, register, introspectables=(intr,)) + + @implementer(IDefaultCSRFOptions) class DefaultCSRFOptions(object): def __init__(self, require_csrf, token, header, safe_methods, callback): diff --git a/pyramid/config/views.py b/pyramid/config/views.py index 2433ccfef..48c4e3437 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -641,18 +641,22 @@ class ViewsConfiguratorMixin(object): 'check name'. If the value provided is ``True``, ``csrf_token`` will be used as the check name. - If CSRF checking is performed, the checked value will be the value - of ``request.params[check_name]``. This value will be compared - against the value of ``request.session.get_csrf_token()``, and the - check will pass if these two values are the same. If the check - passes, the associated view will be permitted to execute. If the + If CSRF checking is performed, the checked value will be the value of + ``request.params[check_name]``. This value will be compared against + the value of ``policy.get_csrf_token()`` (where ``policy`` is an + implementation of :meth:`pyramid.interfaces.ICSRFStoragePolicy`), and the + check will pass if these two values are the same. If the check + passes, the associated view will be permitted to execute. If the check fails, the associated view will not be permitted to execute. - Note that using this feature requires a :term:`session factory` to - have been configured. - .. versionadded:: 1.4a2 + .. versionchanged:: 1.9 + This feature requires either a :term:`session factory` to have been + configured, or a :term:`CSRF storage policy` other than the default + to be in use. + + physical_path If specified, this value should be a string or a tuple representing diff --git a/pyramid/csrf.py b/pyramid/csrf.py new file mode 100644 index 000000000..c8f097777 --- /dev/null +++ b/pyramid/csrf.py @@ -0,0 +1,332 @@ +import uuid + +from webob.cookies import CookieProfile +from zope.interface import implementer + + +from pyramid.authentication import _SimpleSerializer + +from pyramid.compat import ( + bytes_, + urlparse, + text_, +) +from pyramid.exceptions import ( + BadCSRFOrigin, + BadCSRFToken, +) +from pyramid.interfaces import ICSRFStoragePolicy +from pyramid.settings import aslist +from pyramid.util import ( + is_same_domain, + strings_differ +) + + +@implementer(ICSRFStoragePolicy) +class LegacySessionCSRFStoragePolicy(object): + """ A CSRF storage policy that defers control of CSRF storage to the + session. + + This policy maintains compatibility with legacy ISession implementations + that know how to manage CSRF tokens themselves via + ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``. + + Note that using this CSRF implementation requires that + a :term:`session factory` is configured. + + .. versionadded:: 1.9 + + """ + def new_csrf_token(self, request): + """ Sets a new CSRF token into the session and returns it. """ + return request.session.new_csrf_token() + + def get_csrf_token(self, request): + """ Returns the currently active CSRF token from the session, + generating a new one if needed.""" + return request.session.get_csrf_token() + + def check_csrf_token(self, request, supplied_token): + """ Returns ``True`` if the ``supplied_token`` is valid.""" + expected_token = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected_token), bytes_(supplied_token)) + + +@implementer(ICSRFStoragePolicy) +class SessionCSRFStoragePolicy(object): + """ A CSRF storage policy that persists the CSRF token in the session. + + Note that using this CSRF implementation requires that + a :term:`session factory` is configured. + + ``key`` + + The session key where the CSRF token will be stored. + Default: `_csrft_`. + + .. versionadded:: 1.9 + + """ + _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) + + def __init__(self, key='_csrft_'): + self.key = key + + def new_csrf_token(self, request): + """ Sets a new CSRF token into the session and returns it. """ + token = self._token_factory() + request.session[self.key] = token + return token + + def get_csrf_token(self, request): + """ Returns the currently active CSRF token from the session, + generating a new one if needed.""" + token = request.session.get(self.key, None) + if not token: + token = self.new_csrf_token(request) + return token + + def check_csrf_token(self, request, supplied_token): + """ Returns ``True`` if the ``supplied_token`` is valid.""" + expected_token = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected_token), bytes_(supplied_token)) + + +@implementer(ICSRFStoragePolicy) +class CookieCSRFStoragePolicy(object): + """ An alternative CSRF implementation that stores its information in + unauthenticated cookies, known as the 'Double Submit Cookie' method in the + `OWASP CSRF guidelines <https://www.owasp.org/index.php/ + Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet# + Double_Submit_Cookie>`_. This gives some additional flexibility with + regards to scaling as the tokens can be generated and verified by a + front-end server. + + .. versionadded:: 1.9 + + """ + _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) + + def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, + domain=None, max_age=None, path='/'): + serializer = _SimpleSerializer() + self.cookie_profile = CookieProfile( + cookie_name=cookie_name, + secure=secure, + max_age=max_age, + httponly=httponly, + path=path, + domains=[domain], + serializer=serializer + ) + self.cookie_name = cookie_name + + def new_csrf_token(self, request): + """ Sets a new CSRF token into the request and returns it. """ + token = self._token_factory() + request.cookies[self.cookie_name] = token + def set_cookie(request, response): + self.cookie_profile.set_cookies( + response, + token, + ) + request.add_response_callback(set_cookie) + return token + + def get_csrf_token(self, request): + """ Returns the currently active CSRF token by checking the cookies + sent with the current request.""" + bound_cookies = self.cookie_profile.bind(request) + token = bound_cookies.get_value() + if not token: + token = self.new_csrf_token(request) + return token + + def check_csrf_token(self, request, supplied_token): + """ Returns ``True`` if the ``supplied_token`` is valid.""" + expected_token = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected_token), bytes_(supplied_token)) + + +def get_csrf_token(request): + """ Get the currently active CSRF token for the request passed, generating + a new one using ``new_csrf_token(request)`` if one does not exist. This + calls the equivalent method in the chosen CSRF protection implementation. + + .. versionadded :: 1.9 + + """ + registry = request.registry + csrf = registry.getUtility(ICSRFStoragePolicy) + return csrf.get_csrf_token(request) + + +def new_csrf_token(request): + """ Generate a new CSRF token for the request passed and persist it in an + implementation defined manner. This calls the equivalent method in the + chosen CSRF protection implementation. + + .. versionadded :: 1.9 + + """ + registry = request.registry + csrf = registry.getUtility(ICSRFStoragePolicy) + return csrf.new_csrf_token(request) + + +def check_csrf_token(request, + token='csrf_token', + header='X-CSRF-Token', + raises=True): + """ Check the CSRF token returned by the + :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the + value in ``request.POST.get(token)`` (if a POST request) or + ``request.headers.get(header)``. If a ``token`` keyword is not supplied to + this function, the string ``csrf_token`` will be used to look up the token + in ``request.POST``. If a ``header`` keyword is not supplied to this + function, the string ``X-CSRF-Token`` will be used to look up the token in + ``request.headers``. + + If the value supplied by post or by header cannot be verified by the + :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is + ``True``, this function will raise an + :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ + and ``raises`` is ``False``, this function will return ``False``. If the + CSRF check is successful, this function will return ``True`` + unconditionally. + + See :ref:`auto_csrf_checking` for information about how to secure your + application automatically against CSRF attacks. + + .. versionadded:: 1.4a2 + + .. versionchanged:: 1.7a1 + A CSRF token passed in the query string of the request is no longer + considered valid. It must be passed in either the request body or + a header. + + .. versionchanged:: 1.9 + Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated + to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to + verify the CSRF token. + + """ + supplied_token = "" + # We first check the headers for a csrf token, as that is significantly + # cheaper than checking the POST body + if header is not None: + supplied_token = request.headers.get(header, "") + + # If this is a POST/PUT/etc request, then we'll check the body to see if it + # has a token. We explicitly use request.POST here because CSRF tokens + # should never appear in an URL as doing so is a security issue. We also + # explicitly check for request.POST here as we do not support sending form + # encoded data over anything but a request.POST. + if supplied_token == "" and token is not None: + supplied_token = request.POST.get(token, "") + + policy = request.registry.getUtility(ICSRFStoragePolicy) + if not policy.check_csrf_token(request, text_(supplied_token)): + if raises: + raise BadCSRFToken('check_csrf_token(): Invalid token') + return False + return True + + +def check_csrf_origin(request, trusted_origins=None, raises=True): + """ + Check the ``Origin`` of the request to see if it is a cross site request or + not. + + If the value supplied by the ``Origin`` or ``Referer`` header isn't one of the + trusted origins and ``raises`` is ``True``, this function will raise a + :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is + ``False``, this function will return ``False`` instead. If the CSRF origin + checks are successful this function will return ``True`` unconditionally. + + Additional trusted origins may be added by passing a list of domain (and + ports if nonstandard like ``['example.com', 'dev.example.com:8080']``) in + with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` + (the default) this list of additional domains will be pulled from the + ``pyramid.csrf_trusted_origins`` setting. + + Note that this function will do nothing if ``request.scheme`` is not + ``https``. + + .. versionadded:: 1.7 + + .. versionchanged:: 1.9 + Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` + + """ + def _fail(reason): + if raises: + raise BadCSRFOrigin(reason) + else: + return False + + if request.scheme == "https": + # Suppose user visits http://example.com/ + # An active network attacker (man-in-the-middle, MITM) sends a + # POST form that targets https://example.com/detonate-bomb/ and + # submits it via JavaScript. + # + # The attacker will need to provide a CSRF cookie and token, but + # that's no problem for a MITM when we cannot make any assumptions + # about what kind of session storage is being used. So the MITM can + # circumvent the CSRF protection. This is true for any HTTP connection, + # but anyone using HTTPS expects better! For this reason, for + # https://example.com/ we need additional protection that treats + # http://example.com/ as completely untrusted. Under HTTPS, + # Barth et al. found that the Referer header is missing for + # same-domain requests in only about 0.2% of cases or less, so + # we can use strict Referer checking. + + # Determine the origin of this request + origin = request.headers.get("Origin") + if origin is None: + origin = request.referrer + + # Fail if we were not able to locate an origin at all + if not origin: + return _fail("Origin checking failed - no Origin or Referer.") + + # Parse our origin so we we can extract the required information from + # it. + originp = urlparse.urlparse(origin) + + # Ensure that our Referer is also secure. + if originp.scheme != "https": + return _fail( + "Referer checking failed - Referer is insecure while host is " + "secure." + ) + + # Determine which origins we trust, which by default will include the + # current origin. + if trusted_origins is None: + trusted_origins = aslist( + request.registry.settings.get( + "pyramid.csrf_trusted_origins", []) + ) + + if request.host_port not in set(["80", "443"]): + trusted_origins.append("{0.domain}:{0.host_port}".format(request)) + else: + trusted_origins.append(request.domain) + + # Actually check to see if the request's origin matches any of our + # trusted origins. + if not any(is_same_domain(originp.netloc, host) + for host in trusted_origins): + reason = ( + "Referer checking failed - {0} does not match any trusted " + "origins." + ) + return _fail(reason.format(origin)) + + return True diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py index bbb4754e4..ab83813c8 100644 --- a/pyramid/interfaces.py +++ b/pyramid/interfaces.py @@ -927,6 +927,13 @@ class ISession(IDict): usually accessed via ``request.session``. Keys and values of a session must be pickleable. + + .. versionchanged:: 1.9 + + Sessions are no longer required to implement ``get_csrf_token`` and + ``new_csrf_token``. CSRF token support was moved to the pluggable + :class:`pyramid.interfaces.ICSRFStoragePolicy` configuration hook. + """ # attributes @@ -981,19 +988,39 @@ class ISession(IDict): :meth:`pyramid.interfaces.ISession.flash` """ - def new_csrf_token(): - """ Create and set into the session a new, random cross-site request - forgery protection token. Return the token. It will be a string.""" - def get_csrf_token(): - """ Return a random cross-site request forgery protection token. It - will be a string. If a token was previously added to the session via - ``new_csrf_token``, that token will be returned. If no CSRF token - was previously set into the session, ``new_csrf_token`` will be +class ICSRFStoragePolicy(Interface): + """ An object that offers the ability to verify CSRF tokens and generate + new ones.""" + + def new_csrf_token(request): + """ Create and return a new, random cross-site request forgery + protection token. The token will be an ascii-compatible unicode + string. + + """ + + def get_csrf_token(request): + """ Return a cross-site request forgery protection token. It + will be an ascii-compatible unicode string. If a token was previously + set for this user via ``new_csrf_token``, that token will be returned. + If no CSRF token was previously set, ``new_csrf_token`` will be called, which will create and set a token, and this token will be returned. + + """ + + def check_csrf_token(request, token): + """ Determine if the supplied ``token`` is valid. Most implementations + should simply compare the ``token`` to the current value of + ``get_csrf_token`` but it is possible to verify the token using + any mechanism necessary using this method. + + Returns ``True`` if the ``token`` is valid, otherwise ``False``. + """ + class IIntrospector(Interface): def get(category_name, discriminator, default=None): """ Get the IIntrospectable related to the category_name and the diff --git a/pyramid/predicates.py b/pyramid/predicates.py index 7c3a778ca..3d7bb1b4b 100644 --- a/pyramid/predicates.py +++ b/pyramid/predicates.py @@ -4,7 +4,7 @@ from pyramid.exceptions import ConfigurationError from pyramid.compat import is_nonstr_iter -from pyramid.session import check_csrf_token +from pyramid.csrf import check_csrf_token from pyramid.traversal import ( find_interface, traversal_path, diff --git a/pyramid/renderers.py b/pyramid/renderers.py index 47705d5d9..6019f50fb 100644 --- a/pyramid/renderers.py +++ b/pyramid/renderers.py @@ -1,3 +1,4 @@ +from functools import partial import json import os import re @@ -19,6 +20,7 @@ from pyramid.compat import ( text_type, ) +from pyramid.csrf import get_csrf_token from pyramid.decorator import reify from pyramid.events import BeforeRender @@ -428,6 +430,7 @@ class RendererHelper(object): 'context':context, 'request':request, 'req':request, + 'get_csrf_token':partial(get_csrf_token, request), } return self.render_to_response(response, system, request=request) @@ -441,13 +444,13 @@ class RendererHelper(object): 'context':getattr(request, 'context', None), 'request':request, 'req':request, + 'get_csrf_token':partial(get_csrf_token, request), } system_values = BeforeRender(system_values, value) registry = self.registry registry.notify(system_values) - result = renderer(value, system_values) return result diff --git a/pyramid/session.py b/pyramid/session.py index 47b80f617..33119343b 100644 --- a/pyramid/session.py +++ b/pyramid/session.py @@ -16,19 +16,15 @@ from pyramid.compat import ( text_, bytes_, native_, - urlparse, ) - -from pyramid.exceptions import ( - BadCSRFOrigin, - BadCSRFToken, +from pyramid.csrf import ( + check_csrf_origin, + check_csrf_token, ) + from pyramid.interfaces import ISession -from pyramid.settings import aslist -from pyramid.util import ( - is_same_domain, - strings_differ, -) +from pyramid.util import strings_differ + def manage_accessed(wrapped): """ Decorator which causes a cookie to be renewed when an accessor @@ -109,149 +105,6 @@ def signed_deserialize(serialized, secret, hmac=hmac): return pickle.loads(pickled) -def check_csrf_origin(request, trusted_origins=None, raises=True): - """ - Check the Origin of the request to see if it is a cross site request or - not. - - If the value supplied by the Origin or Referer header isn't one of the - trusted origins and ``raises`` is ``True``, this function will raise a - :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is - ``False`` this function will return ``False`` instead. If the CSRF origin - checks are successful this function will return ``True`` unconditionally. - - Additional trusted origins may be added by passing a list of domain (and - ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in - with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` - (the default) this list of additional domains will be pulled from the - ``pyramid.csrf_trusted_origins`` setting. - - Note that this function will do nothing if request.scheme is not https. - - .. versionadded:: 1.7 - """ - def _fail(reason): - if raises: - raise BadCSRFOrigin(reason) - else: - return False - - if request.scheme == "https": - # Suppose user visits http://example.com/ - # An active network attacker (man-in-the-middle, MITM) sends a - # POST form that targets https://example.com/detonate-bomb/ and - # submits it via JavaScript. - # - # The attacker will need to provide a CSRF cookie and token, but - # that's no problem for a MITM when we cannot make any assumptions - # about what kind of session storage is being used. So the MITM can - # circumvent the CSRF protection. This is true for any HTTP connection, - # but anyone using HTTPS expects better! For this reason, for - # https://example.com/ we need additional protection that treats - # http://example.com/ as completely untrusted. Under HTTPS, - # Barth et al. found that the Referer header is missing for - # same-domain requests in only about 0.2% of cases or less, so - # we can use strict Referer checking. - - # Determine the origin of this request - origin = request.headers.get("Origin") - if origin is None: - origin = request.referrer - - # Fail if we were not able to locate an origin at all - if not origin: - return _fail("Origin checking failed - no Origin or Referer.") - - # Parse our origin so we we can extract the required information from - # it. - originp = urlparse.urlparse(origin) - - # Ensure that our Referer is also secure. - if originp.scheme != "https": - return _fail( - "Referer checking failed - Referer is insecure while host is " - "secure." - ) - - # Determine which origins we trust, which by default will include the - # current origin. - if trusted_origins is None: - trusted_origins = aslist( - request.registry.settings.get( - "pyramid.csrf_trusted_origins", []) - ) - - if request.host_port not in set(["80", "443"]): - trusted_origins.append("{0.domain}:{0.host_port}".format(request)) - else: - trusted_origins.append(request.domain) - - # Actually check to see if the request's origin matches any of our - # trusted origins. - if not any(is_same_domain(originp.netloc, host) - for host in trusted_origins): - reason = ( - "Referer checking failed - {0} does not match any trusted " - "origins." - ) - return _fail(reason.format(origin)) - - return True - - -def check_csrf_token(request, - token='csrf_token', - header='X-CSRF-Token', - raises=True): - """ Check the CSRF token in the request's session against the value in - ``request.POST.get(token)`` (if a POST request) or - ``request.headers.get(header)``. If a ``token`` keyword is not supplied to - this function, the string ``csrf_token`` will be used to look up the token - in ``request.POST``. If a ``header`` keyword is not supplied to this - function, the string ``X-CSRF-Token`` will be used to look up the token in - ``request.headers``. - - If the value supplied by post or by header doesn't match the value - supplied by ``request.session.get_csrf_token()``, and ``raises`` is - ``True``, this function will raise an - :exc:`pyramid.exceptions.BadCSRFToken` exception. - If the values differ and ``raises`` is ``False``, this function will - return ``False``. If the CSRF check is successful, this function will - return ``True`` unconditionally. - - Note that using this function requires that a :term:`session factory` is - configured. - - See :ref:`auto_csrf_checking` for information about how to secure your - application automatically against CSRF attacks. - - .. versionadded:: 1.4a2 - - .. versionchanged:: 1.7a1 - A CSRF token passed in the query string of the request is no longer - considered valid. It must be passed in either the request body or - a header. - """ - supplied_token = "" - # If this is a POST/PUT/etc request, then we'll check the body to see if it - # has a token. We explicitly use request.POST here because CSRF tokens - # should never appear in an URL as doing so is a security issue. We also - # explicitly check for request.POST here as we do not support sending form - # encoded data over anything but a request.POST. - if token is not None: - supplied_token = request.POST.get(token, "") - - # If we were unable to locate a CSRF token in a request body, then we'll - # check to see if there are any headers that have a value for us. - if supplied_token == "" and header is not None: - supplied_token = request.headers.get(header, "") - - expected_token = request.session.get_csrf_token() - if strings_differ(bytes_(expected_token), bytes_(supplied_token)): - if raises: - raise BadCSRFToken('check_csrf_token(): Invalid token') - return False - return True class PickleSerializer(object): """ A serializer that uses the pickle protocol to dump Python @@ -759,3 +612,13 @@ def SignedCookieSessionFactory( reissue_time=reissue_time, set_on_exception=set_on_exception, ) + +check_csrf_origin = check_csrf_origin # api +deprecated('check_csrf_origin', + 'pyramid.session.check_csrf_origin is deprecated as of Pyramid ' + '1.9. Use pyramid.csrf.check_csrf_origin instead.') + +check_csrf_token = check_csrf_token # api +deprecated('check_csrf_token', + 'pyramid.session.check_csrf_token is deprecated as of Pyramid ' + '1.9. Use pyramid.csrf.check_csrf_token instead.') diff --git a/pyramid/testing.py b/pyramid/testing.py index 877b351db..69b30e83f 100644 --- a/pyramid/testing.py +++ b/pyramid/testing.py @@ -479,6 +479,7 @@ def setUp(registry=None, request=None, hook_zca=True, autocommit=True, config.add_default_view_derivers() config.add_default_route_predicates() config.add_default_tweens() + config.add_default_security() config.commit() global have_zca try: diff --git a/pyramid/tests/test_config/test_views.py b/pyramid/tests/test_config/test_views.py index 211632730..0816d9958 100644 --- a/pyramid/tests/test_config/test_views.py +++ b/pyramid/tests/test_config/test_views.py @@ -18,6 +18,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): def _makeOne(self, *arg, **kw): from pyramid.config import Configurator config = Configurator(*arg, **kw) + config.set_default_csrf_options(require_csrf=False) return config def _getViewCallable(self, config, ctx_iface=None, exc_iface=None, @@ -2373,7 +2374,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): view = lambda r: 'OK' config.set_default_csrf_options(require_csrf=True) config.add_view(view, context=Exception, renderer=null_renderer) - view_intr = introspector.introspectables[1] + view_intr = introspector.introspectables[-1] self.assertTrue(view_intr.type_name, 'view') self.assertEqual(view_intr['callable'], view) derived_view = view_intr['derived_callable'] diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py new file mode 100644 index 000000000..f01780ad8 --- /dev/null +++ b/pyramid/tests/test_csrf.py @@ -0,0 +1,406 @@ +import unittest + +from pyramid import testing +from pyramid.config import Configurator + + +class TestLegacySessionCSRFStoragePolicy(unittest.TestCase): + class MockSession(object): + def __init__(self, current_token='02821185e4c94269bdc38e6eeae0a2f8'): + self.current_token = current_token + + def new_csrf_token(self): + self.current_token = 'e5e9e30a08b34ff9842ff7d2b958c14b' + return self.current_token + + def get_csrf_token(self): + return self.current_token + + def _makeOne(self): + from pyramid.csrf import LegacySessionCSRFStoragePolicy + return LegacySessionCSRFStoragePolicy() + + def test_register_session_csrf_policy(self): + from pyramid.csrf import LegacySessionCSRFStoragePolicy + from pyramid.interfaces import ICSRFStoragePolicy + + config = Configurator() + config.set_csrf_storage_policy(self._makeOne()) + config.commit() + + policy = config.registry.queryUtility(ICSRFStoragePolicy) + + self.assertTrue(isinstance(policy, LegacySessionCSRFStoragePolicy)) + + def test_session_csrf_implementation_delegates_to_session(self): + policy = self._makeOne() + request = DummyRequest(session=self.MockSession()) + + self.assertEqual( + policy.get_csrf_token(request), + '02821185e4c94269bdc38e6eeae0a2f8' + ) + self.assertEqual( + policy.new_csrf_token(request), + 'e5e9e30a08b34ff9842ff7d2b958c14b' + ) + + def test_check_csrf_token(self): + request = DummyRequest(session=self.MockSession('foo')) + + policy = self._makeOne() + self.assertTrue(policy.check_csrf_token(request, 'foo')) + self.assertFalse(policy.check_csrf_token(request, 'bar')) + + +class TestSessionCSRFStoragePolicy(unittest.TestCase): + def _makeOne(self, **kw): + from pyramid.csrf import SessionCSRFStoragePolicy + return SessionCSRFStoragePolicy(**kw) + + def test_register_session_csrf_policy(self): + from pyramid.csrf import SessionCSRFStoragePolicy + from pyramid.interfaces import ICSRFStoragePolicy + + config = Configurator() + config.set_csrf_storage_policy(self._makeOne()) + config.commit() + + policy = config.registry.queryUtility(ICSRFStoragePolicy) + + self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) + + def test_it_creates_a_new_token(self): + request = DummyRequest(session={}) + + policy = self._makeOne() + policy._token_factory = lambda: 'foo' + self.assertEqual(policy.get_csrf_token(request), 'foo') + + def test_get_csrf_token_returns_the_new_token(self): + request = DummyRequest(session={'_csrft_': 'foo'}) + + policy = self._makeOne() + self.assertEqual(policy.get_csrf_token(request), 'foo') + + token = policy.new_csrf_token(request) + self.assertNotEqual(token, 'foo') + self.assertEqual(token, policy.get_csrf_token(request)) + + def test_check_csrf_token(self): + request = DummyRequest(session={}) + + policy = self._makeOne() + self.assertFalse(policy.check_csrf_token(request, 'foo')) + + request.session = {'_csrft_': 'foo'} + self.assertTrue(policy.check_csrf_token(request, 'foo')) + self.assertFalse(policy.check_csrf_token(request, 'bar')) + + +class TestCookieCSRFStoragePolicy(unittest.TestCase): + def _makeOne(self, **kw): + from pyramid.csrf import CookieCSRFStoragePolicy + return CookieCSRFStoragePolicy(**kw) + + def test_register_cookie_csrf_policy(self): + from pyramid.csrf import CookieCSRFStoragePolicy + from pyramid.interfaces import ICSRFStoragePolicy + + config = Configurator() + config.set_csrf_storage_policy(self._makeOne()) + config.commit() + + policy = config.registry.queryUtility(ICSRFStoragePolicy) + + self.assertTrue(isinstance(policy, CookieCSRFStoragePolicy)) + + def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): + response = MockResponse() + request = DummyRequest() + + policy = self._makeOne() + token = policy.get_csrf_token(request) + request.response_callback(request, response) + self.assertEqual( + response.headerlist, + [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] + ) + + def test_existing_cookie_csrf_does_not_set_cookie(self): + request = DummyRequest() + request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + + policy = self._makeOne() + token = policy.get_csrf_token(request) + + self.assertEqual( + token, + 'e6f325fee5974f3da4315a8ccf4513d2' + ) + self.assertIsNone(request.response_callback) + + def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): + request = DummyRequest() + request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + + policy = self._makeOne() + token = policy.new_csrf_token(request) + + response = MockResponse() + request.response_callback(request, response) + self.assertEqual( + response.headerlist, + [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] + ) + + def test_get_csrf_token_returns_the_new_token(self): + request = DummyRequest() + request.cookies = {'csrf_token': 'foo'} + + policy = self._makeOne() + self.assertEqual(policy.get_csrf_token(request), 'foo') + + token = policy.new_csrf_token(request) + self.assertNotEqual(token, 'foo') + self.assertEqual(token, policy.get_csrf_token(request)) + + def test_check_csrf_token(self): + request = DummyRequest() + + policy = self._makeOne() + self.assertFalse(policy.check_csrf_token(request, 'foo')) + + request.cookies = {'csrf_token': 'foo'} + self.assertTrue(policy.check_csrf_token(request, 'foo')) + self.assertFalse(policy.check_csrf_token(request, 'bar')) + +class Test_get_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import get_csrf_token + return get_csrf_token(*args, **kwargs) + + def test_no_override_csrf_utility_registered(self): + request = testing.DummyRequest() + self._callFUT(request) + + def test_success(self): + self.config.set_csrf_storage_policy(DummyCSRF()) + request = testing.DummyRequest() + + csrf_token = self._callFUT(request) + + self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8') + + +class Test_new_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import new_csrf_token + return new_csrf_token(*args, **kwargs) + + def test_no_override_csrf_utility_registered(self): + request = testing.DummyRequest() + self._callFUT(request) + + def test_success(self): + self.config.set_csrf_storage_policy(DummyCSRF()) + request = testing.DummyRequest() + + csrf_token = self._callFUT(request) + + self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') + + +class Test_check_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + # set up CSRF + self.config.set_default_csrf_options(require_csrf=False) + + def _callFUT(self, *args, **kwargs): + from ..csrf import check_csrf_token + return check_csrf_token(*args, **kwargs) + + def test_success_token(self): + request = testing.DummyRequest() + request.method = "POST" + request.POST = {'csrf_token': request.session.get_csrf_token()} + self.assertEqual(self._callFUT(request, token='csrf_token'), True) + + def test_success_header(self): + request = testing.DummyRequest() + request.headers['X-CSRF-Token'] = request.session.get_csrf_token() + self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True) + + def test_success_default_token(self): + request = testing.DummyRequest() + request.method = "POST" + request.POST = {'csrf_token': request.session.get_csrf_token()} + self.assertEqual(self._callFUT(request), True) + + def test_success_default_header(self): + request = testing.DummyRequest() + request.headers['X-CSRF-Token'] = request.session.get_csrf_token() + self.assertEqual(self._callFUT(request), True) + + def test_failure_raises(self): + from pyramid.exceptions import BadCSRFToken + request = testing.DummyRequest() + self.assertRaises(BadCSRFToken, self._callFUT, request, + 'csrf_token') + + def test_failure_no_raises(self): + request = testing.DummyRequest() + result = self._callFUT(request, 'csrf_token', raises=False) + self.assertEqual(result, False) + + +class Test_check_csrf_token_without_defaults_configured(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from ..csrf import check_csrf_token + return check_csrf_token(*args, **kwargs) + + def test_success_token(self): + request = testing.DummyRequest() + request.method = "POST" + request.POST = {'csrf_token': request.session.get_csrf_token()} + self.assertEqual(self._callFUT(request, token='csrf_token'), True) + + def test_failure_raises(self): + from pyramid.exceptions import BadCSRFToken + request = testing.DummyRequest() + self.assertRaises(BadCSRFToken, self._callFUT, request, + 'csrf_token') + + def test_failure_no_raises(self): + request = testing.DummyRequest() + result = self._callFUT(request, 'csrf_token', raises=False) + self.assertEqual(result, False) + + +class Test_check_csrf_origin(unittest.TestCase): + def _callFUT(self, *args, **kwargs): + from ..csrf import check_csrf_origin + return check_csrf_origin(*args, **kwargs) + + def test_success_with_http(self): + request = testing.DummyRequest() + request.scheme = "http" + self.assertTrue(self._callFUT(request)) + + def test_success_with_https_and_referrer(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "https://example.com/login/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_success_with_https_and_origin(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.headers = {"Origin": "https://example.com/"} + request.referrer = "https://not-example.com/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_success_with_additional_trusted_host(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "https://not-example.com/login/" + request.registry.settings = { + "pyramid.csrf_trusted_origins": ["not-example.com"], + } + self.assertTrue(self._callFUT(request)) + + def test_success_with_nonstandard_port(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com:8080" + request.host_port = "8080" + request.referrer = "https://example.com:8080/login/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_fails_with_wrong_host(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "https://not-example.com/login/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_with_no_origin(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.referrer = None + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_when_http_to_https(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "http://example.com/evil/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_with_nonstandard_port(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com:8080" + request.host_port = "8080" + request.referrer = "https://example.com/login/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + +class DummyRequest(object): + registry = None + session = None + response_callback = None + + def __init__(self, registry=None, session=None): + self.registry = registry + self.session = session + self.cookies = {} + + def add_response_callback(self, callback): + self.response_callback = callback + + +class MockResponse(object): + def __init__(self): + self.headerlist = [] + + +class DummyCSRF(object): + def new_csrf_token(self, request): + return 'e5e9e30a08b34ff9842ff7d2b958c14b' + + def get_csrf_token(self, request): + return '02821185e4c94269bdc38e6eeae0a2f8' diff --git a/pyramid/tests/test_renderers.py b/pyramid/tests/test_renderers.py index 65bfa5582..86d8b582a 100644 --- a/pyramid/tests/test_renderers.py +++ b/pyramid/tests/test_renderers.py @@ -203,6 +203,7 @@ class TestRendererHelper(unittest.TestCase): self.assertEqual(helper.get_renderer(), factory.respond) def test_render_view(self): + import pyramid.csrf self._registerRendererFactory() self._registerResponseFactory() request = Dummy() @@ -212,6 +213,9 @@ class TestRendererHelper(unittest.TestCase): request = testing.DummyRequest() response = 'response' response = helper.render_view(request, response, view, context) + get_csrf = response.app_iter[1].pop('get_csrf_token') + self.assertEqual(get_csrf.args, (request, )) + self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token) self.assertEqual(response.app_iter[0], 'response') self.assertEqual(response.app_iter[1], {'renderer_info': helper, @@ -242,12 +246,16 @@ class TestRendererHelper(unittest.TestCase): self.assertEqual(reg.event.__class__.__name__, 'BeforeRender') def test_render_system_values_is_None(self): + import pyramid.csrf self._registerRendererFactory() request = Dummy() context = Dummy() request.context = context helper = self._makeOne('loo.foo') result = helper.render('values', None, request=request) + get_csrf = result[1].pop('get_csrf_token') + self.assertEqual(get_csrf.args, (request, )) + self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token) system = {'request':request, 'context':context, 'renderer_name':'loo.foo', diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py index 3a308d08b..ade602799 100644 --- a/pyramid/tests/test_session.py +++ b/pyramid/tests/test_session.py @@ -659,144 +659,6 @@ class Test_signed_deserialize(unittest.TestCase): result = self._callFUT(serialized, secret.decode('latin-1')) self.assertEqual(result, '123') -class Test_check_csrf_token(unittest.TestCase): - def _callFUT(self, *args, **kwargs): - from ..session import check_csrf_token - return check_csrf_token(*args, **kwargs) - - def test_success_token(self): - request = testing.DummyRequest() - request.method = "POST" - request.POST = {'csrf_token': request.session.get_csrf_token()} - self.assertEqual(self._callFUT(request, token='csrf_token'), True) - - def test_success_header(self): - request = testing.DummyRequest() - request.headers['X-CSRF-Token'] = request.session.get_csrf_token() - self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True) - - def test_success_default_token(self): - request = testing.DummyRequest() - request.method = "POST" - request.POST = {'csrf_token': request.session.get_csrf_token()} - self.assertEqual(self._callFUT(request), True) - - def test_success_default_header(self): - request = testing.DummyRequest() - request.headers['X-CSRF-Token'] = request.session.get_csrf_token() - self.assertEqual(self._callFUT(request), True) - - def test_failure_raises(self): - from pyramid.exceptions import BadCSRFToken - request = testing.DummyRequest() - self.assertRaises(BadCSRFToken, self._callFUT, request, - 'csrf_token') - - def test_failure_no_raises(self): - request = testing.DummyRequest() - result = self._callFUT(request, 'csrf_token', raises=False) - self.assertEqual(result, False) - - def test_token_differing_types(self): - from pyramid.compat import text_ - request = testing.DummyRequest() - request.method = "POST" - request.session['_csrft_'] = text_('foo') - request.POST = {'csrf_token': b'foo'} - self.assertEqual(self._callFUT(request, token='csrf_token'), True) - - -class Test_check_csrf_origin(unittest.TestCase): - - def _callFUT(self, *args, **kwargs): - from ..session import check_csrf_origin - return check_csrf_origin(*args, **kwargs) - - def test_success_with_http(self): - request = testing.DummyRequest() - request.scheme = "http" - self.assertTrue(self._callFUT(request)) - - def test_success_with_https_and_referrer(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "https://example.com/login/" - request.registry.settings = {} - self.assertTrue(self._callFUT(request)) - - def test_success_with_https_and_origin(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.headers = {"Origin": "https://example.com/"} - request.referrer = "https://not-example.com/" - request.registry.settings = {} - self.assertTrue(self._callFUT(request)) - - def test_success_with_additional_trusted_host(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "https://not-example.com/login/" - request.registry.settings = { - "pyramid.csrf_trusted_origins": ["not-example.com"], - } - self.assertTrue(self._callFUT(request)) - - def test_success_with_nonstandard_port(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com:8080" - request.host_port = "8080" - request.referrer = "https://example.com:8080/login/" - request.registry.settings = {} - self.assertTrue(self._callFUT(request)) - - def test_fails_with_wrong_host(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "https://not-example.com/login/" - request.registry.settings = {} - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - - def test_fails_with_no_origin(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.referrer = None - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - - def test_fails_when_http_to_https(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "http://example.com/evil/" - request.registry.settings = {} - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - - def test_fails_with_nonstandard_port(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com:8080" - request.host_port = "8080" - request.referrer = "https://example.com/login/" - request.registry.settings = {} - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - class DummySerializer(object): def dumps(self, value): diff --git a/pyramid/tests/test_util.py b/pyramid/tests/test_util.py index bbf6103f4..d64f0a73f 100644 --- a/pyramid/tests/test_util.py +++ b/pyramid/tests/test_util.py @@ -369,12 +369,16 @@ class Test_strings_differ(unittest.TestCase): from pyramid.util import strings_differ return strings_differ(*args, **kw) - def test_it(self): + def test_it_bytes(self): self.assertFalse(self._callFUT(b'foo', b'foo')) self.assertTrue(self._callFUT(b'123', b'345')) self.assertTrue(self._callFUT(b'1234', b'123')) self.assertTrue(self._callFUT(b'123', b'1234')) + def test_it_native_str(self): + self.assertFalse(self._callFUT('123', '123')) + self.assertTrue(self._callFUT('123', '1234')) + def test_it_with_internal_comparator(self): result = self._callFUT(b'foo', b'foo', compare_digest=None) self.assertFalse(result) diff --git a/pyramid/tests/test_viewderivers.py b/pyramid/tests/test_viewderivers.py index 51d0bd367..6b81cc1e5 100644 --- a/pyramid/tests/test_viewderivers.py +++ b/pyramid/tests/test_viewderivers.py @@ -12,6 +12,7 @@ class TestDeriveView(unittest.TestCase): def setUp(self): self.config = testing.setUp() + self.config.set_default_csrf_options(require_csrf=False) def tearDown(self): self.config = None diff --git a/pyramid/viewderivers.py b/pyramid/viewderivers.py index 4eb0ce704..d2869b162 100644 --- a/pyramid/viewderivers.py +++ b/pyramid/viewderivers.py @@ -6,7 +6,7 @@ from zope.interface import ( ) from pyramid.security import NO_PERMISSION_REQUIRED -from pyramid.session import ( +from pyramid.csrf import ( check_csrf_origin, check_csrf_token, ) |
