From a2c7c7a49bceeaaab2853e7e73c3671979d4c9ed Mon Sep 17 00:00:00 2001 From: Matthew Wilkes Date: Mon, 5 Dec 2016 12:16:26 +0100 Subject: Create a new ICSRF implementation for getting CSRF tokens, split out from the session machinery. Adds configuration of this to the csrf_options configurator commands. Make the default implementation a fallback to the old one. Documentation patches for new best practices given updates CSRF implementation. --- CHANGES.txt | 12 ++ docs/api/csrf.rst | 18 ++ docs/api/interfaces.rst | 3 + docs/api/session.rst | 4 - docs/narr/security.rst | 191 +++++++++++++++++++++ docs/narr/sessions.rst | 175 ------------------- pyramid/config/security.py | 16 ++ pyramid/config/views.py | 14 +- pyramid/csrf.py | 286 ++++++++++++++++++++++++++++++++ pyramid/interfaces.py | 29 +++- pyramid/predicates.py | 2 +- pyramid/renderers.py | 1 - pyramid/session.py | 155 +---------------- pyramid/tests/test_config/test_views.py | 2 +- pyramid/tests/test_csrf.py | 172 +++++++++++++++++++ pyramid/tests/test_session.py | 4 +- pyramid/viewderivers.py | 2 +- 17 files changed, 731 insertions(+), 355 deletions(-) create mode 100644 docs/api/csrf.rst create mode 100644 pyramid/csrf.py create mode 100644 pyramid/tests/test_csrf.py diff --git a/CHANGES.txt b/CHANGES.txt index c8a87f625..9d6264688 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,6 +24,14 @@ Features can be alleviated by invoking ``config.begin()`` and ``config.end()`` appropriately. See https://github.com/Pylons/pyramid/pull/2989 +- A new CSRF implementation, :class:`pyramid.csrf.SessionCSRF` has been added, + which deleagates all CSRF generation to the current session, following the + old API for this. A ``get_csrf_token()`` method is now available in template + global scope, to make it easy for template developers to get the current CSRF + token without adding it to Python code. + See https://github.com/Pylons/pyramid/pull/2854 + + Bug Fixes --------- @@ -50,3 +58,7 @@ Backward Incompatibilities Documentation Changes --------------------- + +- Retrieving CSRF token from the session has been deprecated, in favor of + equivalent methods in :mod:`pyramid.csrf`. + See https://github.com/Pylons/pyramid/pull/2854 diff --git a/docs/api/csrf.rst b/docs/api/csrf.rst new file mode 100644 index 000000000..3125bdac9 --- /dev/null +++ b/docs/api/csrf.rst @@ -0,0 +1,18 @@ +.. _csrf_module: + +:mod:`pyramid.csrf` +------------------- + +.. automodule:: pyramid.csrf + + .. autofunction:: get_csrf_token + + .. autofunction:: new_csrf_token + + .. autoclass:: SessionCSRF + :members: + + .. autofunction:: check_csrf_origin + + .. autofunction:: check_csrf_token + diff --git a/docs/api/interfaces.rst b/docs/api/interfaces.rst index a212ba7a9..2ca472616 100644 --- a/docs/api/interfaces.rst +++ b/docs/api/interfaces.rst @@ -44,6 +44,9 @@ Other Interfaces .. autointerface:: IRoutePregenerator :members: + .. autointerface:: ICSRF + :members: + .. autointerface:: ISession :members: diff --git a/docs/api/session.rst b/docs/api/session.rst index 56c4f52d7..53bae7c52 100644 --- a/docs/api/session.rst +++ b/docs/api/session.rst @@ -9,10 +9,6 @@ .. autofunction:: signed_deserialize - .. autofunction:: check_csrf_origin - - .. autofunction:: check_csrf_token - .. autofunction:: SignedCookieSessionFactory .. autofunction:: UnencryptedCookieSessionFactoryConfig diff --git a/docs/narr/security.rst b/docs/narr/security.rst index 77e7fd707..b4fb3b8a8 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -765,3 +765,194 @@ which would allow the attacker to control the content of the payload. Re-using a secret across two different subsystems might drop the security of signing to zero. Keys should not be re-used across different contexts where an attacker has the possibility of providing a chosen plaintext. + +Preventing Cross-Site Request Forgery Attacks +--------------------------------------------- + +`Cross-site request forgery +`_ attacks are a +phenomenon whereby a user who is logged in to your website might inadvertantly +load a URL because it is linked from, or embedded in, an attacker's website. +If the URL is one that may modify or delete data, the consequences can be dire. + +You can avoid most of these attacks by issuing a unique token to the browser +and then requiring that it be present in all potentially unsafe requests. +:app:`Pyramid` sessions provide facilities to create and check CSRF tokens. + +To use CSRF tokens, you must first enable a :term:`session factory` as +described in :ref:`using_the_default_session_factory` or +:ref:`using_alternate_session_factories`. + +.. index:: + single: csrf.get_csrf_token + +Using the ``csrf.get_csrf_token`` Method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To get the current CSRF token, use the +:data:`pyramid.csrf.get_csrf_token` method. + +.. code-block:: python + + from pyramid.csrf import get_csrf_token + token = get_csrf_token(request) + +The ``get_csrf_token()`` method accepts a single argument: the request. It +returns a CSRF *token* string. If ``get_csrf_token()`` or ``new_csrf_token()`` +was invoked previously for this user, then the existing token will be returned. +If no CSRF token previously existed for this user, then a new token will be set +into the session and returned. The newly created token will be opaque and +randomized. + + +Using the ``get_csrf_token`` global in templates +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Templates have a ``get_csrf_token()`` method inserted into their globals, which +allows you to get the current token without modifying the view code. This +method takes no arguments and returns a CSRF token string. You can use the +returned token as the value of a hidden field in a form that posts to a method +that requires elevated privileges, or supply it as a request header in AJAX +requests. + +For example, include the CSRF token as a hidden field: + +.. code-block:: html + +
+ + +
+ +Or include it as a header in a jQuery AJAX request: + +.. code-block:: javascript + + var csrfToken = "${get_csrf_token()}"; + $.ajax({ + type: "POST", + url: "/myview", + headers: { 'X-CSRF-Token': csrfToken } + }).done(function() { + alert("Deleted"); + }); + +The handler for the URL that receives the request should then require that the +correct CSRF token is supplied. + +.. index:: + single: csrf.new_csrf_token + +Using the ``csrf.new_csrf_token`` Method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To explicitly create a new CSRF token, use the ``csrf.new_csrf_token()`` +method. This differs only from ``csrf.get_csrf_token()`` inasmuch as it +clears any existing CSRF token, creates a new CSRF token, sets the token into +the user, and returns the token. + +.. code-block:: python + + from pyramid.csrf import get_csrf_token + token = new_csrf_token() + +.. note:: + + It is not possible to force a new CSRF token from a template. If you + want to regenerate your CSRF token then do it in the view code and return + the new token as part of the context. + +Checking CSRF Tokens Manually +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In request handling code, you can check the presence and validity of a CSRF +token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it +will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, +you can specify ``raises=False`` to have the check return ``False`` instead of +raising an exception. + +By default, it checks for a POST parameter named ``csrf_token`` or a header +named ``X-CSRF-Token``. + +.. code-block:: python + + from pyramid.session import check_csrf_token + + def myview(request): + # Require CSRF Token + check_csrf_token(request) + + # ... + +.. _auto_csrf_checking: + +Checking CSRF Tokens Automatically +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 1.7 + +:app:`Pyramid` supports automatically checking CSRF tokens on requests with an +unsafe method as defined by RFC2616. Any other request may be checked manually. +This feature can be turned on globally for an application using the +:meth:`pyramid.config.Configurator.set_default_csrf_options` directive. +For example: + +.. code-block:: python + + from pyramid.config import Configurator + + config = Configurator() + config.set_default_csrf_options(require_csrf=True) + +CSRF checking may be explicitly enabled or disabled on a per-view basis using +the ``require_csrf`` view option. A value of ``True`` or ``False`` will +override the default set by ``set_default_csrf_options``. For example: + +.. code-block:: python + + @view_config(route_name='hello', require_csrf=False) + def myview(request): + # ... + +When CSRF checking is active, the token and header used to find the +supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively, +unless otherwise overridden by ``set_default_csrf_options``. The token is +checked against the value in ``request.POST`` which is the submitted form body. +If this value is not present, then the header will be checked. + +In addition to token based CSRF checks, if the request is using HTTPS then the +automatic CSRF checking will also check the referrer of the request to ensure +that it matches one of the trusted origins. By default the only trusted origin +is the current host, however additional origins may be configured by setting +``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they +are non standard). If a host in the list of domains starts with a ``.`` then +that will allow all subdomains as well as the domain without the ``.``. + +If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or +:class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This +exception may be caught and handled by an :term:`exception view` but, by +default, will result in a ``400 Bad Request`` response being sent to the +client. + +Checking CSRF Tokens with a View Predicate +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. deprecated:: 1.7 + Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead + to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised. + +A convenient way to require a valid CSRF token for a particular view is to +include ``check_csrf=True`` as a view predicate. See +:meth:`pyramid.config.Configurator.add_view`. + +.. code-block:: python + + @view_config(request_method='POST', check_csrf=True, ...) + def myview(request): + ... + +.. note:: + A mismatch of a CSRF token is treated like any other predicate miss, and the + predicate system, when it doesn't find a view, raises ``HTTPNotFound`` + instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different + from calling :func:`pyramid.session.check_csrf_token`. diff --git a/docs/narr/sessions.rst b/docs/narr/sessions.rst index 5b24201a9..90b5f4585 100644 --- a/docs/narr/sessions.rst +++ b/docs/narr/sessions.rst @@ -321,178 +321,3 @@ flash storage. single: preventing cross-site request forgery attacks single: cross-site request forgery attacks, prevention -Preventing Cross-Site Request Forgery Attacks ---------------------------------------------- - -`Cross-site request forgery -`_ attacks are a -phenomenon whereby a user who is logged in to your website might inadvertantly -load a URL because it is linked from, or embedded in, an attacker's website. -If the URL is one that may modify or delete data, the consequences can be dire. - -You can avoid most of these attacks by issuing a unique token to the browser -and then requiring that it be present in all potentially unsafe requests. -:app:`Pyramid` sessions provide facilities to create and check CSRF tokens. - -To use CSRF tokens, you must first enable a :term:`session factory` as -described in :ref:`using_the_default_session_factory` or -:ref:`using_alternate_session_factories`. - -.. index:: - single: session.get_csrf_token - -Using the ``session.get_csrf_token`` Method -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To get the current CSRF token from the session, use the -``session.get_csrf_token()`` method. - -.. code-block:: python - - token = request.session.get_csrf_token() - -The ``session.get_csrf_token()`` method accepts no arguments. It returns a -CSRF *token* string. If ``session.get_csrf_token()`` or -``session.new_csrf_token()`` was invoked previously for this session, then the -existing token will be returned. If no CSRF token previously existed for this -session, then a new token will be set into the session and returned. The newly -created token will be opaque and randomized. - -You can use the returned token as the value of a hidden field in a form that -posts to a method that requires elevated privileges, or supply it as a request -header in AJAX requests. - -For example, include the CSRF token as a hidden field: - -.. code-block:: html - -
- - -
- -Or include it as a header in a jQuery AJAX request: - -.. code-block:: javascript - - var csrfToken = ${request.session.get_csrf_token()}; - $.ajax({ - type: "POST", - url: "/myview", - headers: { 'X-CSRF-Token': csrfToken } - }).done(function() { - alert("Deleted"); - }); - -The handler for the URL that receives the request should then require that the -correct CSRF token is supplied. - -.. index:: - single: session.new_csrf_token - -Using the ``session.new_csrf_token`` Method -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To explicitly create a new CSRF token, use the ``session.new_csrf_token()`` -method. This differs only from ``session.get_csrf_token()`` inasmuch as it -clears any existing CSRF token, creates a new CSRF token, sets the token into -the session, and returns the token. - -.. code-block:: python - - token = request.session.new_csrf_token() - -Checking CSRF Tokens Manually -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -In request handling code, you can check the presence and validity of a CSRF -token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it -will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, -you can specify ``raises=False`` to have the check return ``False`` instead of -raising an exception. - -By default, it checks for a POST parameter named ``csrf_token`` or a header -named ``X-CSRF-Token``. - -.. code-block:: python - - from pyramid.session import check_csrf_token - - def myview(request): - # Require CSRF Token - check_csrf_token(request) - - # ... - -.. _auto_csrf_checking: - -Checking CSRF Tokens Automatically -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. versionadded:: 1.7 - -:app:`Pyramid` supports automatically checking CSRF tokens on requests with an -unsafe method as defined by RFC2616. Any other request may be checked manually. -This feature can be turned on globally for an application using the -:meth:`pyramid.config.Configurator.set_default_csrf_options` directive. -For example: - -.. code-block:: python - - from pyramid.config import Configurator - - config = Configurator() - config.set_default_csrf_options(require_csrf=True) - -CSRF checking may be explicitly enabled or disabled on a per-view basis using -the ``require_csrf`` view option. A value of ``True`` or ``False`` will -override the default set by ``set_default_csrf_options``. For example: - -.. code-block:: python - - @view_config(route_name='hello', require_csrf=False) - def myview(request): - # ... - -When CSRF checking is active, the token and header used to find the -supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively, -unless otherwise overridden by ``set_default_csrf_options``. The token is -checked against the value in ``request.POST`` which is the submitted form body. -If this value is not present, then the header will be checked. - -In addition to token based CSRF checks, if the request is using HTTPS then the -automatic CSRF checking will also check the referrer of the request to ensure -that it matches one of the trusted origins. By default the only trusted origin -is the current host, however additional origins may be configured by setting -``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they -are non standard). If a host in the list of domains starts with a ``.`` then -that will allow all subdomains as well as the domain without the ``.``. - -If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or -:class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This -exception may be caught and handled by an :term:`exception view` but, by -default, will result in a ``400 Bad Request`` response being sent to the -client. - -Checking CSRF Tokens with a View Predicate -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. deprecated:: 1.7 - Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead - to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised. - -A convenient way to require a valid CSRF token for a particular view is to -include ``check_csrf=True`` as a view predicate. See -:meth:`pyramid.config.Configurator.add_view`. - -.. code-block:: python - - @view_config(request_method='POST', check_csrf=True, ...) - def myview(request): - ... - -.. note:: - A mismatch of a CSRF token is treated like any other predicate miss, and the - predicate system, when it doesn't find a view, raises ``HTTPNotFound`` - instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different - from calling :func:`pyramid.session.check_csrf_token`. diff --git a/pyramid/config/security.py b/pyramid/config/security.py index 33593376b..102a61e0c 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -3,16 +3,21 @@ from zope.interface import implementer from pyramid.interfaces import ( IAuthorizationPolicy, IAuthenticationPolicy, + ICSRFPolicy, IDefaultCSRFOptions, IDefaultPermission, PHASE1_CONFIG, PHASE2_CONFIG, ) +from pyramid.csrf import csrf_token_template_global +from pyramid.csrf import SessionCSRF +from pyramid.events import BeforeRender from pyramid.exceptions import ConfigurationError from pyramid.util import action_method from pyramid.util import as_sorted_tuple + class SecurityConfiguratorMixin(object): @action_method def set_authentication_policy(self, policy): @@ -165,6 +170,7 @@ class SecurityConfiguratorMixin(object): @action_method def set_default_csrf_options( self, + implementation=None, require_csrf=True, token='csrf_token', header='X-CSRF-Token', @@ -174,6 +180,10 @@ class SecurityConfiguratorMixin(object): """ Set the default CSRF options used by subsequent view registrations. + ``implementation`` is a class that implements the + :meth:`pyramid.interfaces.ICSRFPolicy` interface that will be used for all + CSRF functionality. Default: :class:`pyramid.csrf.SessionCSRF`. + ``require_csrf`` controls whether CSRF checks will be automatically enabled on each view in the application. This value is used as the fallback when ``require_csrf`` is left at the default of ``None`` on @@ -207,7 +217,10 @@ class SecurityConfiguratorMixin(object): options = DefaultCSRFOptions( require_csrf, token, header, safe_methods, callback, ) + if implementation is None: + implementation = SessionCSRF() def register(): + self.registry.registerUtility(implementation, ICSRFPolicy) self.registry.registerUtility(options, IDefaultCSRFOptions) intr = self.introspectable('default csrf view options', None, @@ -218,9 +231,12 @@ class SecurityConfiguratorMixin(object): intr['header'] = header intr['safe_methods'] = as_sorted_tuple(safe_methods) intr['callback'] = callback + + self.add_subscriber(csrf_token_template_global, [BeforeRender]) self.action(IDefaultCSRFOptions, register, order=PHASE1_CONFIG, introspectables=(intr,)) + @implementer(IDefaultCSRFOptions) class DefaultCSRFOptions(object): def __init__(self, require_csrf, token, header, safe_methods, callback): diff --git a/pyramid/config/views.py b/pyramid/config/views.py index 65c9da585..7a383be44 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -641,16 +641,14 @@ class ViewsConfiguratorMixin(object): 'check name'. If the value provided is ``True``, ``csrf_token`` will be used as the check name. - If CSRF checking is performed, the checked value will be the value - of ``request.params[check_name]``. This value will be compared - against the value of ``request.session.get_csrf_token()``, and the - check will pass if these two values are the same. If the check - passes, the associated view will be permitted to execute. If the + If CSRF checking is performed, the checked value will be the value of + ``request.params[check_name]``. This value will be compared against + the value of ``impl.get_csrf_token()`` (where ``impl`` is an + implementation of :meth:`pyramid.interfaces.ICSRFPolicy`), and the + check will pass if these two values are the same. If the check + passes, the associated view will be permitted to execute. If the check fails, the associated view will not be permitted to execute. - Note that using this feature requires a :term:`session factory` to - have been configured. - .. versionadded:: 1.4a2 physical_path diff --git a/pyramid/csrf.py b/pyramid/csrf.py new file mode 100644 index 000000000..c373079a4 --- /dev/null +++ b/pyramid/csrf.py @@ -0,0 +1,286 @@ +from functools import partial +import uuid + +from zope.interface import implementer + +from pyramid.compat import ( + urlparse, + bytes_ +) +from pyramid.exceptions import ( + BadCSRFOrigin, + BadCSRFToken, +) +from pyramid.interfaces import ICSRFPolicy +from pyramid.settings import aslist +from pyramid.util import ( + is_same_domain, + strings_differ +) + + +@implementer(ICSRFPolicy) +class SessionCSRF(object): + """ The default CSRF implementation, which mimics the behavior from older + versions of Pyramid. The ``new_csrf_token`` and ``get_csrf_token`` methods + are indirected to the underlying session implementation. + + Note that using this CSRF implementation requires that + a :term:`session factory` is configured. + + .. versionadded :: 1.8a1 + """ + def new_csrf_token(self, request): + """ Sets a new CSRF token into the session and returns it. """ + return request.session.new_csrf_token() + + def get_csrf_token(self, request): + """ Returns the currently active CSRF token from the session, generating + a new one if needed.""" + return request.session.get_csrf_token() + + def check_csrf_token(self, request, supplied_token): + """ Returns True if supplied_token is the same value as get_csrf_token + returns for this request. """ + expected = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected, 'ascii'), + bytes_(supplied_token, 'ascii'), + ) + +@implementer(ICSRFPolicy) +class CookieCSRF(object): + """ An alternative CSRF implementation that stores its information in + unauthenticated cookies, known as the 'Double Submit Cookie' method in the + OWASP CSRF guidelines. This gives some additional flexibility with regards + to scalingas the tokens can be generated and verified by a front-end server. + + .. versionadded :: 1.8a1 + """ + + def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, + domain=None, path='/'): + self.cookie_name = cookie_name + self.secure = secure + self.httponly = httponly + self.domain = domain + self.path = path + + def new_csrf_token(self, request): + """ Sets a new CSRF token into the request and returns it. """ + token = uuid.uuid4().hex + def set_cookie(request, response): + response.set_cookie( + self.cookie_name, + token, + httponly=self.httponly, + secure=self.secure, + domain=self.domain, + path=self.path, + overwrite=True, + ) + request.add_response_callback(set_cookie) + return token + + def get_csrf_token(self, request): + """ Returns the currently active CSRF token by checking the cookies + sent with the current request.""" + token = request.cookies.get(self.cookie_name) + if not token: + token = self.new_csrf_token(request) + return token + + def check_csrf_token(self, request, supplied_token): + """ Returns True if supplied_token is the same value as get_csrf_token + returns for this request. """ + expected = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected, 'ascii'), + bytes_(supplied_token, 'ascii'), + ) + + +def csrf_token_template_global(event): + request = event.get('request', None) + try: + registry = request.registry + except AttributeError: + return + else: + csrf = registry.getUtility(ICSRFPolicy) + if csrf is not None: + event['get_csrf_token'] = partial(csrf.get_csrf_token, request) + + +def get_csrf_token(request): + """ Get the currently active CSRF token for the request passed, generating + a new one using ``new_csrf_token(request)`` if one does not exist. This + calls the equivalent method in the chosen CSRF protection implementation. + + .. versionadded :: 1.8a1 + """ + registry = request.registry + csrf = registry.getUtility(ICSRFPolicy) + if csrf is not None: + return csrf.get_csrf_token(request) + + +def new_csrf_token(request): + """ Generate a new CSRF token for the request passed and persist it in an + implementation defined manner. This calls the equivalent method in the + chosen CSRF protection implementation. + + .. versionadded :: 1.8a1 + """ + registry = request.registry + csrf = registry.getUtility(ICSRFPolicy) + if csrf is not None: + return csrf.new_csrf_token(request) + + +def check_csrf_token(request, + token='csrf_token', + header='X-CSRF-Token', + raises=True): + """ Check the CSRF token returned by the :meth:`pyramid.interfaces.ICSRFPolicy` + implementation against the value in ``request.POST.get(token)`` (if a POST + request) or ``request.headers.get(header)``. If a ``token`` keyword is not + supplied to this function, the string ``csrf_token`` will be used to look + up the token in ``request.POST``. If a ``header`` keyword is not supplied + to this function, the string ``X-CSRF-Token`` will be used to look up the + token in ``request.headers``. + + If the value supplied by post or by header doesn't match the value supplied + by ``impl.get_csrf_token()`` (where ``impl`` is an implementation of + :meth:`pyramid.interfaces.ICSRFPolicy`), and ``raises`` is ``True``, this + function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If + the values differ and ``raises`` is ``False``, this function will return + ``False``. If the CSRF check is successful, this function will return + ``True`` unconditionally. + + See :ref:`auto_csrf_checking` for information about how to secure your + application automatically against CSRF attacks. + + .. versionadded:: 1.4a2 + + .. versionchanged:: 1.7a1 + A CSRF token passed in the query string of the request is no longer + considered valid. It must be passed in either the request body or + a header. + + .. versionchanged:: 1.8a1 + Moved from pyramid.session to pyramid.csrf + """ + supplied_token = "" + # We first check the headers for a csrf token, as that is significantly + # cheaper than checking the POST body + if header is not None: + supplied_token = request.headers.get(header, "") + + # If this is a POST/PUT/etc request, then we'll check the body to see if it + # has a token. We explicitly use request.POST here because CSRF tokens + # should never appear in an URL as doing so is a security issue. We also + # explicitly check for request.POST here as we do not support sending form + # encoded data over anything but a request.POST. + if supplied_token == "" and token is not None: + supplied_token = request.POST.get(token, "") + + policy = request.registry.getUtility(ICSRFPolicy) + if not policy.check_csrf_token(request, supplied_token): + if raises: + raise BadCSRFToken('check_csrf_token(): Invalid token') + return False + return True + + +def check_csrf_origin(request, trusted_origins=None, raises=True): + """ + Check the Origin of the request to see if it is a cross site request or + not. + + If the value supplied by the Origin or Referer header isn't one of the + trusted origins and ``raises`` is ``True``, this function will raise a + :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is + ``False`` this function will return ``False`` instead. If the CSRF origin + checks are successful this function will return ``True`` unconditionally. + + Additional trusted origins may be added by passing a list of domain (and + ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in + with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` + (the default) this list of additional domains will be pulled from the + ``pyramid.csrf_trusted_origins`` setting. + + Note that this function will do nothing if request.scheme is not https. + + .. versionadded:: 1.7 + + .. versionchanged:: 1.8a1 + Moved from pyramid.session to pyramid.csrf + """ + def _fail(reason): + if raises: + raise BadCSRFOrigin(reason) + else: + return False + + if request.scheme == "https": + # Suppose user visits http://example.com/ + # An active network attacker (man-in-the-middle, MITM) sends a + # POST form that targets https://example.com/detonate-bomb/ and + # submits it via JavaScript. + # + # The attacker will need to provide a CSRF cookie and token, but + # that's no problem for a MITM when we cannot make any assumptions + # about what kind of session storage is being used. So the MITM can + # circumvent the CSRF protection. This is true for any HTTP connection, + # but anyone using HTTPS expects better! For this reason, for + # https://example.com/ we need additional protection that treats + # http://example.com/ as completely untrusted. Under HTTPS, + # Barth et al. found that the Referer header is missing for + # same-domain requests in only about 0.2% of cases or less, so + # we can use strict Referer checking. + + # Determine the origin of this request + origin = request.headers.get("Origin") + if origin is None: + origin = request.referrer + + # Fail if we were not able to locate an origin at all + if not origin: + return _fail("Origin checking failed - no Origin or Referer.") + + # Parse our origin so we we can extract the required information from + # it. + originp = urlparse.urlparse(origin) + + # Ensure that our Referer is also secure. + if originp.scheme != "https": + return _fail( + "Referer checking failed - Referer is insecure while host is " + "secure." + ) + + # Determine which origins we trust, which by default will include the + # current origin. + if trusted_origins is None: + trusted_origins = aslist( + request.registry.settings.get( + "pyramid.csrf_trusted_origins", []) + ) + + if request.host_port not in set(["80", "443"]): + trusted_origins.append("{0.domain}:{0.host_port}".format(request)) + else: + trusted_origins.append(request.domain) + + # Actually check to see if the request's origin matches any of our + # trusted origins. + if not any(is_same_domain(originp.netloc, host) + for host in trusted_origins): + reason = ( + "Referer checking failed - {0} does not match any trusted " + "origins." + ) + return _fail(reason.format(origin)) + + return True diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py index bbb4754e4..f58ee8b58 100644 --- a/pyramid/interfaces.py +++ b/pyramid/interfaces.py @@ -981,19 +981,30 @@ class ISession(IDict): :meth:`pyramid.interfaces.ISession.flash` """ - def new_csrf_token(): - """ Create and set into the session a new, random cross-site request - forgery protection token. Return the token. It will be a string.""" - def get_csrf_token(): - """ Return a random cross-site request forgery protection token. It - will be a string. If a token was previously added to the session via +class ICSRFPolicy(Interface): + """ An object that offers the ability to verify CSRF tokens and generate + new ones""" + + def new_csrf_token(request): + """ Create and return a new, random cross-site request forgery + protection token. Return the token. It will be a string.""" + + def get_csrf_token(request): + """ Return a cross-site request forgery protection token. It + will be a string. If a token was previously set for this user via ``new_csrf_token``, that token will be returned. If no CSRF token - was previously set into the session, ``new_csrf_token`` will be - called, which will create and set a token, and this token will be - returned. + was previously set, ``new_csrf_token`` will be called, which will + create and set a token, and this token will be returned. """ + def check_csrf_token(request, supplied_token): + """ Returns a boolean that represents if supplied_token is a valid CSRF + token for this request. Comparing strings for equality must be done + using :func:`pyramid.utils.strings_differ` to avoid timing attacks. + """ + + class IIntrospector(Interface): def get(category_name, discriminator, default=None): """ Get the IIntrospectable related to the category_name and the diff --git a/pyramid/predicates.py b/pyramid/predicates.py index 7c3a778ca..3d7bb1b4b 100644 --- a/pyramid/predicates.py +++ b/pyramid/predicates.py @@ -4,7 +4,7 @@ from pyramid.exceptions import ConfigurationError from pyramid.compat import is_nonstr_iter -from pyramid.session import check_csrf_token +from pyramid.csrf import check_csrf_token from pyramid.traversal import ( find_interface, traversal_path, diff --git a/pyramid/renderers.py b/pyramid/renderers.py index 47705d5d9..7d667ba7b 100644 --- a/pyramid/renderers.py +++ b/pyramid/renderers.py @@ -447,7 +447,6 @@ class RendererHelper(object): registry = self.registry registry.notify(system_values) - result = renderer(value, system_values) return result diff --git a/pyramid/session.py b/pyramid/session.py index 47b80f617..b1ad25410 100644 --- a/pyramid/session.py +++ b/pyramid/session.py @@ -16,19 +16,11 @@ from pyramid.compat import ( text_, bytes_, native_, - urlparse, ) -from pyramid.exceptions import ( - BadCSRFOrigin, - BadCSRFToken, -) from pyramid.interfaces import ISession -from pyramid.settings import aslist -from pyramid.util import ( - is_same_domain, - strings_differ, -) +from pyramid.util import strings_differ + def manage_accessed(wrapped): """ Decorator which causes a cookie to be renewed when an accessor @@ -109,149 +101,6 @@ def signed_deserialize(serialized, secret, hmac=hmac): return pickle.loads(pickled) -def check_csrf_origin(request, trusted_origins=None, raises=True): - """ - Check the Origin of the request to see if it is a cross site request or - not. - - If the value supplied by the Origin or Referer header isn't one of the - trusted origins and ``raises`` is ``True``, this function will raise a - :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is - ``False`` this function will return ``False`` instead. If the CSRF origin - checks are successful this function will return ``True`` unconditionally. - - Additional trusted origins may be added by passing a list of domain (and - ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in - with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` - (the default) this list of additional domains will be pulled from the - ``pyramid.csrf_trusted_origins`` setting. - - Note that this function will do nothing if request.scheme is not https. - - .. versionadded:: 1.7 - """ - def _fail(reason): - if raises: - raise BadCSRFOrigin(reason) - else: - return False - - if request.scheme == "https": - # Suppose user visits http://example.com/ - # An active network attacker (man-in-the-middle, MITM) sends a - # POST form that targets https://example.com/detonate-bomb/ and - # submits it via JavaScript. - # - # The attacker will need to provide a CSRF cookie and token, but - # that's no problem for a MITM when we cannot make any assumptions - # about what kind of session storage is being used. So the MITM can - # circumvent the CSRF protection. This is true for any HTTP connection, - # but anyone using HTTPS expects better! For this reason, for - # https://example.com/ we need additional protection that treats - # http://example.com/ as completely untrusted. Under HTTPS, - # Barth et al. found that the Referer header is missing for - # same-domain requests in only about 0.2% of cases or less, so - # we can use strict Referer checking. - - # Determine the origin of this request - origin = request.headers.get("Origin") - if origin is None: - origin = request.referrer - - # Fail if we were not able to locate an origin at all - if not origin: - return _fail("Origin checking failed - no Origin or Referer.") - - # Parse our origin so we we can extract the required information from - # it. - originp = urlparse.urlparse(origin) - - # Ensure that our Referer is also secure. - if originp.scheme != "https": - return _fail( - "Referer checking failed - Referer is insecure while host is " - "secure." - ) - - # Determine which origins we trust, which by default will include the - # current origin. - if trusted_origins is None: - trusted_origins = aslist( - request.registry.settings.get( - "pyramid.csrf_trusted_origins", []) - ) - - if request.host_port not in set(["80", "443"]): - trusted_origins.append("{0.domain}:{0.host_port}".format(request)) - else: - trusted_origins.append(request.domain) - - # Actually check to see if the request's origin matches any of our - # trusted origins. - if not any(is_same_domain(originp.netloc, host) - for host in trusted_origins): - reason = ( - "Referer checking failed - {0} does not match any trusted " - "origins." - ) - return _fail(reason.format(origin)) - - return True - - -def check_csrf_token(request, - token='csrf_token', - header='X-CSRF-Token', - raises=True): - """ Check the CSRF token in the request's session against the value in - ``request.POST.get(token)`` (if a POST request) or - ``request.headers.get(header)``. If a ``token`` keyword is not supplied to - this function, the string ``csrf_token`` will be used to look up the token - in ``request.POST``. If a ``header`` keyword is not supplied to this - function, the string ``X-CSRF-Token`` will be used to look up the token in - ``request.headers``. - - If the value supplied by post or by header doesn't match the value - supplied by ``request.session.get_csrf_token()``, and ``raises`` is - ``True``, this function will raise an - :exc:`pyramid.exceptions.BadCSRFToken` exception. - If the values differ and ``raises`` is ``False``, this function will - return ``False``. If the CSRF check is successful, this function will - return ``True`` unconditionally. - - Note that using this function requires that a :term:`session factory` is - configured. - - See :ref:`auto_csrf_checking` for information about how to secure your - application automatically against CSRF attacks. - - .. versionadded:: 1.4a2 - - .. versionchanged:: 1.7a1 - A CSRF token passed in the query string of the request is no longer - considered valid. It must be passed in either the request body or - a header. - """ - supplied_token = "" - # If this is a POST/PUT/etc request, then we'll check the body to see if it - # has a token. We explicitly use request.POST here because CSRF tokens - # should never appear in an URL as doing so is a security issue. We also - # explicitly check for request.POST here as we do not support sending form - # encoded data over anything but a request.POST. - if token is not None: - supplied_token = request.POST.get(token, "") - - # If we were unable to locate a CSRF token in a request body, then we'll - # check to see if there are any headers that have a value for us. - if supplied_token == "" and header is not None: - supplied_token = request.headers.get(header, "") - - expected_token = request.session.get_csrf_token() - if strings_differ(bytes_(expected_token), bytes_(supplied_token)): - if raises: - raise BadCSRFToken('check_csrf_token(): Invalid token') - return False - return True class PickleSerializer(object): """ A serializer that uses the pickle protocol to dump Python diff --git a/pyramid/tests/test_config/test_views.py b/pyramid/tests/test_config/test_views.py index 211632730..45495f1fa 100644 --- a/pyramid/tests/test_config/test_views.py +++ b/pyramid/tests/test_config/test_views.py @@ -2373,7 +2373,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): view = lambda r: 'OK' config.set_default_csrf_options(require_csrf=True) config.add_view(view, context=Exception, renderer=null_renderer) - view_intr = introspector.introspectables[1] + view_intr = introspector.introspectables[-1] self.assertTrue(view_intr.type_name, 'view') self.assertEqual(view_intr['callable'], view) derived_view = view_intr['derived_callable'] diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py new file mode 100644 index 000000000..a74d2a07b --- /dev/null +++ b/pyramid/tests/test_csrf.py @@ -0,0 +1,172 @@ +import unittest + +from pyramid.config import Configurator +from pyramid.csrf import CookieCSRF, SessionCSRF, get_csrf_token, new_csrf_token +from pyramid.events import BeforeRender +from pyramid.interfaces import ICSRFPolicy +from pyramid.tests.test_view import BaseTest as ViewBaseTest + + +class CSRFTokenTests(ViewBaseTest, unittest.TestCase): + class DummyCSRF(object): + def new_csrf_token(self, request): + return 'e5e9e30a08b34ff9842ff7d2b958c14b' + + def get_csrf_token(self, request): + return '02821185e4c94269bdc38e6eeae0a2f8' + + def test_csrf_token_passed_to_template(self): + config = Configurator() + config.set_default_csrf_options(implementation=self.DummyCSRF) + config.commit() + + request = self._makeRequest() + request.registry = config.registry + + before = BeforeRender({'request': request}, {}) + config.registry.notify(before) + self.assertIn('get_csrf_token', before) + self.assertEqual( + before['get_csrf_token'](), + '02821185e4c94269bdc38e6eeae0a2f8' + ) + + def test_simple_api_for_tokens_from_python(self): + config = Configurator() + config.set_default_csrf_options(implementation=self.DummyCSRF) + config.commit() + + request = self._makeRequest() + request.registry = config.registry + self.assertEqual( + get_csrf_token(request), + '02821185e4c94269bdc38e6eeae0a2f8' + ) + self.assertEqual( + new_csrf_token(request), + 'e5e9e30a08b34ff9842ff7d2b958c14b' + ) + + +class SessionCSRFTests(unittest.TestCase): + class MockSession(object): + def new_csrf_token(self): + return 'e5e9e30a08b34ff9842ff7d2b958c14b' + + def get_csrf_token(self): + return '02821185e4c94269bdc38e6eeae0a2f8' + + def test_session_csrf_implementation_delegates_to_session(self): + config = Configurator() + config.set_default_csrf_options(implementation=SessionCSRF) + config.commit() + + request = DummyRequest(config.registry, session=self.MockSession()) + self.assertEqual( + config.registry.getUtility(ICSRFPolicy).get_csrf_token(request), + '02821185e4c94269bdc38e6eeae0a2f8' + ) + self.assertEqual( + config.registry.getUtility(ICSRFPolicy).new_csrf_token(request), + 'e5e9e30a08b34ff9842ff7d2b958c14b' + ) + + +class CookieCSRFTests(unittest.TestCase): + + def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): + config = Configurator() + config.set_default_csrf_options(implementation=CookieCSRF()) + config.commit() + + response = MockResponse() + request = DummyRequest(config.registry, response=response) + + token = config.registry.getUtility(ICSRFPolicy).get_csrf_token(request) + self.assertEqual( + response.called_args, + ('csrf_token', token), + ) + self.assertEqual( + response.called_kwargs, + { + 'secure': False, + 'httponly': False, + 'domain': None, + 'path': '/', + 'overwrite': True + } + ) + + def test_existing_cookie_csrf_does_not_set_cookie(self): + config = Configurator() + config.set_default_csrf_options(implementation=CookieCSRF()) + config.commit() + + response = MockResponse() + request = DummyRequest(config.registry, response=response) + request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + + token = config.registry.getUtility(ICSRFPolicy).get_csrf_token(request) + self.assertEqual( + token, + 'e6f325fee5974f3da4315a8ccf4513d2' + ) + self.assertEqual( + response.called_args, + (), + ) + self.assertEqual( + response.called_kwargs, + {} + ) + + def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): + config = Configurator() + config.set_default_csrf_options(implementation=CookieCSRF()) + config.commit() + + response = MockResponse() + request = DummyRequest(config.registry, response=response) + request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + + token = config.registry.getUtility(ICSRFPolicy).new_csrf_token(request) + self.assertEqual( + response.called_args, + ('csrf_token', token), + ) + self.assertEqual( + response.called_kwargs, + { + 'secure': False, + 'httponly': False, + 'domain': None, + 'path': '/', + 'overwrite': True + } + ) + + +class DummyRequest(object): + registry = None + session = None + cookies = {} + + def __init__(self, registry, session=None, response=None): + self.registry = registry + self.session = session + self.response = response + + def add_response_callback(self, callback): + callback(self, self.response) + + +class MockResponse(object): + def __init__(self): + self.called_args = () + self.called_kwargs = {} + + def set_cookie(self, *args, **kwargs): + self.called_args = args + self.called_kwargs = kwargs + return diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py index 3a308d08b..b51dccecc 100644 --- a/pyramid/tests/test_session.py +++ b/pyramid/tests/test_session.py @@ -661,7 +661,7 @@ class Test_signed_deserialize(unittest.TestCase): class Test_check_csrf_token(unittest.TestCase): def _callFUT(self, *args, **kwargs): - from ..session import check_csrf_token + from ..csrf import check_csrf_token return check_csrf_token(*args, **kwargs) def test_success_token(self): @@ -709,7 +709,7 @@ class Test_check_csrf_token(unittest.TestCase): class Test_check_csrf_origin(unittest.TestCase): def _callFUT(self, *args, **kwargs): - from ..session import check_csrf_origin + from ..csrf import check_csrf_origin return check_csrf_origin(*args, **kwargs) def test_success_with_http(self): diff --git a/pyramid/viewderivers.py b/pyramid/viewderivers.py index 4eb0ce704..d2869b162 100644 --- a/pyramid/viewderivers.py +++ b/pyramid/viewderivers.py @@ -6,7 +6,7 @@ from zope.interface import ( ) from pyramid.security import NO_PERMISSION_REQUIRED -from pyramid.session import ( +from pyramid.csrf import ( check_csrf_origin, check_csrf_token, ) -- cgit v1.2.3 From 313c251497f6cdb3e5ca961a8092a2356aa502fc Mon Sep 17 00:00:00 2001 From: Jure Cerjak Date: Mon, 5 Dec 2016 16:06:08 +0100 Subject: Fix tests and documentation in various places, and feedback following review regarding naming of variables and code cleanup. --- docs/api/csrf.rst | 10 +- docs/api/interfaces.rst | 2 +- docs/narr/security.rst | 34 ++- docs/narr/sessions.rst | 4 +- pyramid/config/views.py | 2 +- pyramid/csrf.py | 35 ++- pyramid/tests/test_config/test_views.py | 1 + pyramid/tests/test_csrf.py | 367 +++++++++++++++++++++++++++----- pyramid/tests/test_session.py | 138 ------------ pyramid/tests/test_viewderivers.py | 1 + 10 files changed, 369 insertions(+), 225 deletions(-) diff --git a/docs/api/csrf.rst b/docs/api/csrf.rst index 3125bdac9..89fb0c4b2 100644 --- a/docs/api/csrf.rst +++ b/docs/api/csrf.rst @@ -5,14 +5,16 @@ .. automodule:: pyramid.csrf + .. autoclass:: SessionCSRF + :members: + + .. autoclass:: CookieCSRF + :members: + .. autofunction:: get_csrf_token .. autofunction:: new_csrf_token - .. autoclass:: SessionCSRF - :members: - .. autofunction:: check_csrf_origin .. autofunction:: check_csrf_token - diff --git a/docs/api/interfaces.rst b/docs/api/interfaces.rst index 2ca472616..b88209a36 100644 --- a/docs/api/interfaces.rst +++ b/docs/api/interfaces.rst @@ -44,7 +44,7 @@ Other Interfaces .. autointerface:: IRoutePregenerator :members: - .. autointerface:: ICSRF + .. autointerface:: ICSRFPolicy :members: .. autointerface:: ISession diff --git a/docs/narr/security.rst b/docs/narr/security.rst index b4fb3b8a8..6962a0fe3 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -146,7 +146,7 @@ For example, the following view declaration protects the view named # config is an instance of pyramid.config.Configurator config.add_view('mypackage.views.blog_entry_add_view', - name='add_entry.html', + name='add_entry.html', context='mypackage.resources.Blog', permission='add') @@ -725,7 +725,7 @@ object that implements the following interface: """ Return ``True`` if any of the ``principals`` is allowed the ``permission`` in the current ``context``, else return ``False`` """ - + def principals_allowed_by_permission(self, context, permission): """ Return a set of principal identifiers allowed by the ``permission`` in ``context``. This behavior is optional; if you @@ -777,11 +777,27 @@ If the URL is one that may modify or delete data, the consequences can be dire. You can avoid most of these attacks by issuing a unique token to the browser and then requiring that it be present in all potentially unsafe requests. -:app:`Pyramid` sessions provide facilities to create and check CSRF tokens. +:app:`Pyramid` provides facilities to create and check CSRF tokens. + +By default :app:`Pyramid` comes with a session-based CSRF implementation +:class:`pyramid.csrf.SessionCSRF`. To use it, you must first enable +a :term:`session factory` as described in +:ref:`using_the_default_session_factory` or +:ref:`using_alternate_session_factories`. Alternatively, you can use +a cookie-based implementation :class:`pyramid.csrf.CookieCSRF` which gives +some additional flexibility as it does not require a session for each user. +You can also define your own implementation of +:class:`pyramid.interfaces.ICSRFPolicy` and register it with the +:meth:`pyramid.config.Configurator.set_default_csrf_options` directive. -To use CSRF tokens, you must first enable a :term:`session factory` as -described in :ref:`using_the_default_session_factory` or -:ref:`using_alternate_session_factories`. +For example: + +.. code-block:: python + + from pyramid.config import Configurator + + config = Configurator() + config.set_default_csrf_options(implementation=MyCustomCSRFPolicy()) .. index:: single: csrf.get_csrf_token @@ -866,7 +882,7 @@ Checking CSRF Tokens Manually ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In request handling code, you can check the presence and validity of a CSRF -token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it +token with :func:`pyramid.csrf.check_csrf_token`. If the token is valid, it will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, you can specify ``raises=False`` to have the check return ``False`` instead of raising an exception. @@ -876,7 +892,7 @@ named ``X-CSRF-Token``. .. code-block:: python - from pyramid.session import check_csrf_token + from pyramid.csrf import check_csrf_token def myview(request): # Require CSRF Token @@ -955,4 +971,4 @@ include ``check_csrf=True`` as a view predicate. See A mismatch of a CSRF token is treated like any other predicate miss, and the predicate system, when it doesn't find a view, raises ``HTTPNotFound`` instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different - from calling :func:`pyramid.session.check_csrf_token`. + from calling :func:`pyramid.csrf.check_csrf_token`. diff --git a/docs/narr/sessions.rst b/docs/narr/sessions.rst index 90b5f4585..86fe2a139 100644 --- a/docs/narr/sessions.rst +++ b/docs/narr/sessions.rst @@ -12,8 +12,7 @@ application. This chapter describes how to configure sessions, what session implementations :app:`Pyramid` provides out of the box, how to store and retrieve data from -sessions, and two session-specific features: flash messages, and cross-site -request forgery attack prevention. +sessions, and a session-specific feature: flash messages. .. index:: single: session factory (default) @@ -320,4 +319,3 @@ flash storage. .. index:: single: preventing cross-site request forgery attacks single: cross-site request forgery attacks, prevention - diff --git a/pyramid/config/views.py b/pyramid/config/views.py index 7a383be44..4ebd014de 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -643,7 +643,7 @@ class ViewsConfiguratorMixin(object): If CSRF checking is performed, the checked value will be the value of ``request.params[check_name]``. This value will be compared against - the value of ``impl.get_csrf_token()`` (where ``impl`` is an + the value of ``policy.get_csrf_token()`` (where ``policy`` is an implementation of :meth:`pyramid.interfaces.ICSRFPolicy`), and the check will pass if these two values are the same. If the check passes, the associated view will be permitted to execute. If the diff --git a/pyramid/csrf.py b/pyramid/csrf.py index c373079a4..7adbc9fee 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -53,11 +53,12 @@ class CookieCSRF(object): """ An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the OWASP CSRF guidelines. This gives some additional flexibility with regards - to scalingas the tokens can be generated and verified by a front-end server. - + to scaling as the tokens can be generated and verified by a front-end + server. + .. versionadded :: 1.8a1 """ - + def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, domain=None, path='/'): self.cookie_name = cookie_name @@ -108,8 +109,7 @@ def csrf_token_template_global(event): return else: csrf = registry.getUtility(ICSRFPolicy) - if csrf is not None: - event['get_csrf_token'] = partial(csrf.get_csrf_token, request) + event['get_csrf_token'] = partial(csrf.get_csrf_token, request) def get_csrf_token(request): @@ -121,8 +121,7 @@ def get_csrf_token(request): """ registry = request.registry csrf = registry.getUtility(ICSRFPolicy) - if csrf is not None: - return csrf.get_csrf_token(request) + return csrf.get_csrf_token(request) def new_csrf_token(request): @@ -134,25 +133,25 @@ def new_csrf_token(request): """ registry = request.registry csrf = registry.getUtility(ICSRFPolicy) - if csrf is not None: - return csrf.new_csrf_token(request) + return csrf.new_csrf_token(request) def check_csrf_token(request, token='csrf_token', header='X-CSRF-Token', raises=True): - """ Check the CSRF token returned by the :meth:`pyramid.interfaces.ICSRFPolicy` - implementation against the value in ``request.POST.get(token)`` (if a POST - request) or ``request.headers.get(header)``. If a ``token`` keyword is not - supplied to this function, the string ``csrf_token`` will be used to look - up the token in ``request.POST``. If a ``header`` keyword is not supplied - to this function, the string ``X-CSRF-Token`` will be used to look up the - token in ``request.headers``. + """ Check the CSRF token returned by the + :class:`pyramid.interfaces.ICSRFPolicy` implementation against the value in + ``request.POST.get(token)`` (if a POST request) or + ``request.headers.get(header)``. If a ``token`` keyword is not supplied to + this function, the string ``csrf_token`` will be used to look up the token + in ``request.POST``. If a ``header`` keyword is not supplied to this + function, the string ``X-CSRF-Token`` will be used to look up the token in + ``request.headers``. If the value supplied by post or by header doesn't match the value supplied - by ``impl.get_csrf_token()`` (where ``impl`` is an implementation of - :meth:`pyramid.interfaces.ICSRFPolicy`), and ``raises`` is ``True``, this + by ``policy.get_csrf_token()`` (where ``policy`` is an implementation of + :class:`pyramid.interfaces.ICSRFPolicy`), and ``raises`` is ``True``, this function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ and ``raises`` is ``False``, this function will return ``False``. If the CSRF check is successful, this function will return diff --git a/pyramid/tests/test_config/test_views.py b/pyramid/tests/test_config/test_views.py index 45495f1fa..0816d9958 100644 --- a/pyramid/tests/test_config/test_views.py +++ b/pyramid/tests/test_config/test_views.py @@ -18,6 +18,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): def _makeOne(self, *arg, **kw): from pyramid.config import Configurator config = Configurator(*arg, **kw) + config.set_default_csrf_options(require_csrf=False) return config def _getViewCallable(self, config, ctx_iface=None, exc_iface=None, diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index a74d2a07b..1b3f3fc3b 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -1,54 +1,109 @@ import unittest +from zope.interface.interfaces import ComponentLookupError + +from pyramid import testing from pyramid.config import Configurator -from pyramid.csrf import CookieCSRF, SessionCSRF, get_csrf_token, new_csrf_token from pyramid.events import BeforeRender -from pyramid.interfaces import ICSRFPolicy -from pyramid.tests.test_view import BaseTest as ViewBaseTest -class CSRFTokenTests(ViewBaseTest, unittest.TestCase): - class DummyCSRF(object): - def new_csrf_token(self, request): - return 'e5e9e30a08b34ff9842ff7d2b958c14b' +class Test_get_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() - def get_csrf_token(self, request): - return '02821185e4c94269bdc38e6eeae0a2f8' + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import get_csrf_token + return get_csrf_token(*args, **kwargs) + + def test_no_csrf_utility_registered(self): + request = testing.DummyRequest() + + with self.assertRaises(ComponentLookupError): + self._callFUT(request) + + def test_success(self): + self.config.set_default_csrf_options(implementation=DummyCSRF()) + request = testing.DummyRequest() + + csrf_token = self._callFUT(request) + + self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8') + + +class Test_new_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import new_csrf_token + return new_csrf_token(*args, **kwargs) + + def test_no_csrf_utility_registered(self): + request = testing.DummyRequest() + + with self.assertRaises(ComponentLookupError): + self._callFUT(request) + + def test_success(self): + self.config.set_default_csrf_options(implementation=DummyCSRF()) + request = testing.DummyRequest() + + csrf_token = self._callFUT(request) + + self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') + + +class Test_csrf_token_template_global(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import csrf_token_template_global + return csrf_token_template_global(*args, **kwargs) + + def test_event_is_missing_request(self): + event = BeforeRender({}, {}) + + self._callFUT(event) + + self.assertNotIn('get_csrf_token', event) + + def test_request_is_missing_registry(self): + request = DummyRequest(registry=None) + del request.registry + del request.__class__.registry + event = BeforeRender({'request': request}, {}) + + self._callFUT(event) + + self.assertNotIn('get_csrf_token', event) + + def test_csrf_utility_not_registered(self): + request = testing.DummyRequest() + event = BeforeRender({'request': request}, {}) + + with self.assertRaises(ComponentLookupError): + self._callFUT(event) def test_csrf_token_passed_to_template(self): config = Configurator() - config.set_default_csrf_options(implementation=self.DummyCSRF) + config.set_default_csrf_options(implementation=DummyCSRF()) config.commit() - request = self._makeRequest() + request = testing.DummyRequest() request.registry = config.registry before = BeforeRender({'request': request}, {}) config.registry.notify(before) + self.assertIn('get_csrf_token', before) self.assertEqual( before['get_csrf_token'](), '02821185e4c94269bdc38e6eeae0a2f8' ) - def test_simple_api_for_tokens_from_python(self): - config = Configurator() - config.set_default_csrf_options(implementation=self.DummyCSRF) - config.commit() - - request = self._makeRequest() - request.registry = config.registry - self.assertEqual( - get_csrf_token(request), - '02821185e4c94269bdc38e6eeae0a2f8' - ) - self.assertEqual( - new_csrf_token(request), - 'e5e9e30a08b34ff9842ff7d2b958c14b' - ) - -class SessionCSRFTests(unittest.TestCase): +class TestSessionCSRF(unittest.TestCase): class MockSession(object): def new_csrf_token(self): return 'e5e9e30a08b34ff9842ff7d2b958c14b' @@ -56,33 +111,75 @@ class SessionCSRFTests(unittest.TestCase): def get_csrf_token(self): return '02821185e4c94269bdc38e6eeae0a2f8' - def test_session_csrf_implementation_delegates_to_session(self): + def _makeOne(self): + from pyramid.csrf import SessionCSRF + return SessionCSRF() + + def test_register_session_csrf_policy(self): + from pyramid.csrf import SessionCSRF + from pyramid.interfaces import ICSRFPolicy + config = Configurator() - config.set_default_csrf_options(implementation=SessionCSRF) + config.set_default_csrf_options(implementation=self._makeOne()) config.commit() - request = DummyRequest(config.registry, session=self.MockSession()) + policy = config.registry.queryUtility(ICSRFPolicy) + + self.assertTrue(isinstance(policy, SessionCSRF)) + + def test_session_csrf_implementation_delegates_to_session(self): + policy = self._makeOne() + request = DummyRequest(session=self.MockSession()) + self.assertEqual( - config.registry.getUtility(ICSRFPolicy).get_csrf_token(request), + policy.get_csrf_token(request), '02821185e4c94269bdc38e6eeae0a2f8' ) self.assertEqual( - config.registry.getUtility(ICSRFPolicy).new_csrf_token(request), + policy.new_csrf_token(request), 'e5e9e30a08b34ff9842ff7d2b958c14b' ) + def test_verifying_token_invalid(self): + policy = self._makeOne() + request = DummyRequest(session=self.MockSession()) -class CookieCSRFTests(unittest.TestCase): + result = policy.check_csrf_token(request, 'invalid-token') + self.assertFalse(result) + + def test_verifying_token_valid(self): + policy = self._makeOne() + request = DummyRequest(session=self.MockSession()) + + result = policy.check_csrf_token( + request, '02821185e4c94269bdc38e6eeae0a2f8') + self.assertTrue(result) + + +class TestCookieCSRF(unittest.TestCase): + def _makeOne(self): + from pyramid.csrf import CookieCSRF + return CookieCSRF() + + def test_register_cookie_csrf_policy(self): + from pyramid.csrf import CookieCSRF + from pyramid.interfaces import ICSRFPolicy - def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): config = Configurator() - config.set_default_csrf_options(implementation=CookieCSRF()) + config.set_default_csrf_options(implementation=self._makeOne()) config.commit() + policy = config.registry.queryUtility(ICSRFPolicy) + + self.assertTrue(isinstance(policy, CookieCSRF)) + + def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): response = MockResponse() - request = DummyRequest(config.registry, response=response) + request = DummyRequest(response=response) + + policy = self._makeOne() + token = policy.get_csrf_token(request) - token = config.registry.getUtility(ICSRFPolicy).get_csrf_token(request) self.assertEqual( response.called_args, ('csrf_token', token), @@ -99,15 +196,13 @@ class CookieCSRFTests(unittest.TestCase): ) def test_existing_cookie_csrf_does_not_set_cookie(self): - config = Configurator() - config.set_default_csrf_options(implementation=CookieCSRF()) - config.commit() - response = MockResponse() - request = DummyRequest(config.registry, response=response) + request = DummyRequest(response=response) request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} - token = config.registry.getUtility(ICSRFPolicy).get_csrf_token(request) + policy = self._makeOne() + token = policy.get_csrf_token(request) + self.assertEqual( token, 'e6f325fee5974f3da4315a8ccf4513d2' @@ -122,15 +217,13 @@ class CookieCSRFTests(unittest.TestCase): ) def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): - config = Configurator() - config.set_default_csrf_options(implementation=CookieCSRF()) - config.commit() - response = MockResponse() - request = DummyRequest(config.registry, response=response) + request = DummyRequest(response=response) request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} - token = config.registry.getUtility(ICSRFPolicy).new_csrf_token(request) + policy = self._makeOne() + token = policy.new_csrf_token(request) + self.assertEqual( response.called_args, ('csrf_token', token), @@ -146,13 +239,177 @@ class CookieCSRFTests(unittest.TestCase): } ) + def test_verifying_token_invalid_token(self): + response = MockResponse() + request = DummyRequest(response=response) + request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + + policy = self._makeOne() + self.assertFalse( + policy.check_csrf_token(request, 'invalid-token') + ) + + def test_verifying_token_against_existing_cookie(self): + response = MockResponse() + request = DummyRequest(response=response) + request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + + policy = self._makeOne() + self.assertTrue( + policy.check_csrf_token(request, 'e6f325fee5974f3da4315a8ccf4513d2') + ) + + +class Test_check_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + # set up CSRF (this will also register SessionCSRF policy) + self.config.set_default_csrf_options(require_csrf=False) + + def _callFUT(self, *args, **kwargs): + from ..csrf import check_csrf_token + return check_csrf_token(*args, **kwargs) + + def test_success_token(self): + request = testing.DummyRequest() + request.method = "POST" + request.POST = {'csrf_token': request.session.get_csrf_token()} + self.assertEqual(self._callFUT(request, token='csrf_token'), True) + + def test_success_header(self): + request = testing.DummyRequest() + request.headers['X-CSRF-Token'] = request.session.get_csrf_token() + self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True) + + def test_success_default_token(self): + request = testing.DummyRequest() + request.method = "POST" + request.POST = {'csrf_token': request.session.get_csrf_token()} + self.assertEqual(self._callFUT(request), True) + + def test_success_default_header(self): + request = testing.DummyRequest() + request.headers['X-CSRF-Token'] = request.session.get_csrf_token() + self.assertEqual(self._callFUT(request), True) + + def test_failure_raises(self): + from pyramid.exceptions import BadCSRFToken + request = testing.DummyRequest() + self.assertRaises(BadCSRFToken, self._callFUT, request, + 'csrf_token') + + def test_failure_no_raises(self): + request = testing.DummyRequest() + result = self._callFUT(request, 'csrf_token', raises=False) + self.assertEqual(result, False) + + def test_token_differing_types(self): + from pyramid.compat import text_ + request = testing.DummyRequest() + request.method = "POST" + request.session['_csrft_'] = text_('foo') + request.POST = {'csrf_token': b'foo'} + self.assertEqual(self._callFUT(request, token='csrf_token'), True) + + +class Test_check_csrf_origin(unittest.TestCase): + def _callFUT(self, *args, **kwargs): + from ..csrf import check_csrf_origin + return check_csrf_origin(*args, **kwargs) + + def test_success_with_http(self): + request = testing.DummyRequest() + request.scheme = "http" + self.assertTrue(self._callFUT(request)) + + def test_success_with_https_and_referrer(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "https://example.com/login/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_success_with_https_and_origin(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.headers = {"Origin": "https://example.com/"} + request.referrer = "https://not-example.com/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_success_with_additional_trusted_host(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "https://not-example.com/login/" + request.registry.settings = { + "pyramid.csrf_trusted_origins": ["not-example.com"], + } + self.assertTrue(self._callFUT(request)) + + def test_success_with_nonstandard_port(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com:8080" + request.host_port = "8080" + request.referrer = "https://example.com:8080/login/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_fails_with_wrong_host(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "https://not-example.com/login/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_with_no_origin(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.referrer = None + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_when_http_to_https(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = "443" + request.referrer = "http://example.com/evil/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_with_nonstandard_port(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com:8080" + request.host_port = "8080" + request.referrer = "https://example.com/login/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + class DummyRequest(object): registry = None session = None cookies = {} - def __init__(self, registry, session=None, response=None): + def __init__(self, registry=None, session=None, response=None): self.registry = registry self.session = session self.response = response @@ -170,3 +427,11 @@ class MockResponse(object): self.called_args = args self.called_kwargs = kwargs return + + +class DummyCSRF(object): + def new_csrf_token(self, request): + return 'e5e9e30a08b34ff9842ff7d2b958c14b' + + def get_csrf_token(self, request): + return '02821185e4c94269bdc38e6eeae0a2f8' diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py index b51dccecc..ade602799 100644 --- a/pyramid/tests/test_session.py +++ b/pyramid/tests/test_session.py @@ -659,144 +659,6 @@ class Test_signed_deserialize(unittest.TestCase): result = self._callFUT(serialized, secret.decode('latin-1')) self.assertEqual(result, '123') -class Test_check_csrf_token(unittest.TestCase): - def _callFUT(self, *args, **kwargs): - from ..csrf import check_csrf_token - return check_csrf_token(*args, **kwargs) - - def test_success_token(self): - request = testing.DummyRequest() - request.method = "POST" - request.POST = {'csrf_token': request.session.get_csrf_token()} - self.assertEqual(self._callFUT(request, token='csrf_token'), True) - - def test_success_header(self): - request = testing.DummyRequest() - request.headers['X-CSRF-Token'] = request.session.get_csrf_token() - self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True) - - def test_success_default_token(self): - request = testing.DummyRequest() - request.method = "POST" - request.POST = {'csrf_token': request.session.get_csrf_token()} - self.assertEqual(self._callFUT(request), True) - - def test_success_default_header(self): - request = testing.DummyRequest() - request.headers['X-CSRF-Token'] = request.session.get_csrf_token() - self.assertEqual(self._callFUT(request), True) - - def test_failure_raises(self): - from pyramid.exceptions import BadCSRFToken - request = testing.DummyRequest() - self.assertRaises(BadCSRFToken, self._callFUT, request, - 'csrf_token') - - def test_failure_no_raises(self): - request = testing.DummyRequest() - result = self._callFUT(request, 'csrf_token', raises=False) - self.assertEqual(result, False) - - def test_token_differing_types(self): - from pyramid.compat import text_ - request = testing.DummyRequest() - request.method = "POST" - request.session['_csrft_'] = text_('foo') - request.POST = {'csrf_token': b'foo'} - self.assertEqual(self._callFUT(request, token='csrf_token'), True) - - -class Test_check_csrf_origin(unittest.TestCase): - - def _callFUT(self, *args, **kwargs): - from ..csrf import check_csrf_origin - return check_csrf_origin(*args, **kwargs) - - def test_success_with_http(self): - request = testing.DummyRequest() - request.scheme = "http" - self.assertTrue(self._callFUT(request)) - - def test_success_with_https_and_referrer(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "https://example.com/login/" - request.registry.settings = {} - self.assertTrue(self._callFUT(request)) - - def test_success_with_https_and_origin(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.headers = {"Origin": "https://example.com/"} - request.referrer = "https://not-example.com/" - request.registry.settings = {} - self.assertTrue(self._callFUT(request)) - - def test_success_with_additional_trusted_host(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "https://not-example.com/login/" - request.registry.settings = { - "pyramid.csrf_trusted_origins": ["not-example.com"], - } - self.assertTrue(self._callFUT(request)) - - def test_success_with_nonstandard_port(self): - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com:8080" - request.host_port = "8080" - request.referrer = "https://example.com:8080/login/" - request.registry.settings = {} - self.assertTrue(self._callFUT(request)) - - def test_fails_with_wrong_host(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "https://not-example.com/login/" - request.registry.settings = {} - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - - def test_fails_with_no_origin(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.referrer = None - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - - def test_fails_when_http_to_https(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com" - request.host_port = "443" - request.referrer = "http://example.com/evil/" - request.registry.settings = {} - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - - def test_fails_with_nonstandard_port(self): - from pyramid.exceptions import BadCSRFOrigin - request = testing.DummyRequest() - request.scheme = "https" - request.host = "example.com:8080" - request.host_port = "8080" - request.referrer = "https://example.com/login/" - request.registry.settings = {} - self.assertRaises(BadCSRFOrigin, self._callFUT, request) - self.assertFalse(self._callFUT(request, raises=False)) - class DummySerializer(object): def dumps(self, value): diff --git a/pyramid/tests/test_viewderivers.py b/pyramid/tests/test_viewderivers.py index 51d0bd367..6b81cc1e5 100644 --- a/pyramid/tests/test_viewderivers.py +++ b/pyramid/tests/test_viewderivers.py @@ -12,6 +12,7 @@ class TestDeriveView(unittest.TestCase): def setUp(self): self.config = testing.setUp() + self.config.set_default_csrf_options(require_csrf=False) def tearDown(self): self.config = None -- cgit v1.2.3 From 8f60e2c397a4c781d3ac2dc7fcff9321cdb16a42 Mon Sep 17 00:00:00 2001 From: Jure Cerjak Date: Wed, 7 Dec 2016 11:00:18 +0100 Subject: add to contributors list --- CONTRIBUTORS.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 566e91195..750f6d29f 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -291,6 +291,8 @@ Contributors - Mikko Ohtamaa, 2016/12/6 +- Jure Cerjak, 2016/12/7 + - Martin Frlin, 2016/12/7 - Kirill Kuzminykh, 2017/03/01 -- cgit v1.2.3 From fe0d223ad08bcab724d216b3a877b690c5795f73 Mon Sep 17 00:00:00 2001 From: Matthew Wilkes Date: Fri, 9 Dec 2016 11:25:03 +0100 Subject: Rename implementation to ICSRFStoragePolicy --- docs/api/interfaces.rst | 2 +- docs/narr/security.rst | 2 +- pyramid/config/security.py | 6 +++--- pyramid/config/views.py | 2 +- pyramid/csrf.py | 18 +++++++++--------- pyramid/interfaces.py | 2 +- pyramid/tests/test_csrf.py | 8 ++++---- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/api/interfaces.rst b/docs/api/interfaces.rst index b88209a36..e542a6be0 100644 --- a/docs/api/interfaces.rst +++ b/docs/api/interfaces.rst @@ -44,7 +44,7 @@ Other Interfaces .. autointerface:: IRoutePregenerator :members: - .. autointerface:: ICSRFPolicy + .. autointerface:: ICSRFStoragePolicy :members: .. autointerface:: ISession diff --git a/docs/narr/security.rst b/docs/narr/security.rst index 6962a0fe3..04c236e0b 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -787,7 +787,7 @@ a :term:`session factory` as described in a cookie-based implementation :class:`pyramid.csrf.CookieCSRF` which gives some additional flexibility as it does not require a session for each user. You can also define your own implementation of -:class:`pyramid.interfaces.ICSRFPolicy` and register it with the +:class:`pyramid.interfaces.ICSRFStoragePolicy` and register it with the :meth:`pyramid.config.Configurator.set_default_csrf_options` directive. For example: diff --git a/pyramid/config/security.py b/pyramid/config/security.py index 102a61e0c..c8becce1f 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -3,7 +3,7 @@ from zope.interface import implementer from pyramid.interfaces import ( IAuthorizationPolicy, IAuthenticationPolicy, - ICSRFPolicy, + ICSRFStoragePolicy, IDefaultCSRFOptions, IDefaultPermission, PHASE1_CONFIG, @@ -181,7 +181,7 @@ class SecurityConfiguratorMixin(object): Set the default CSRF options used by subsequent view registrations. ``implementation`` is a class that implements the - :meth:`pyramid.interfaces.ICSRFPolicy` interface that will be used for all + :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface that will be used for all CSRF functionality. Default: :class:`pyramid.csrf.SessionCSRF`. ``require_csrf`` controls whether CSRF checks will be automatically @@ -220,7 +220,7 @@ class SecurityConfiguratorMixin(object): if implementation is None: implementation = SessionCSRF() def register(): - self.registry.registerUtility(implementation, ICSRFPolicy) + self.registry.registerUtility(implementation, ICSRFStoragePolicy) self.registry.registerUtility(options, IDefaultCSRFOptions) intr = self.introspectable('default csrf view options', None, diff --git a/pyramid/config/views.py b/pyramid/config/views.py index 4ebd014de..e037f7706 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -644,7 +644,7 @@ class ViewsConfiguratorMixin(object): If CSRF checking is performed, the checked value will be the value of ``request.params[check_name]``. This value will be compared against the value of ``policy.get_csrf_token()`` (where ``policy`` is an - implementation of :meth:`pyramid.interfaces.ICSRFPolicy`), and the + implementation of :meth:`pyramid.interfaces.ICSRFStoragePolicy`), and the check will pass if these two values are the same. If the check passes, the associated view will be permitted to execute. If the check fails, the associated view will not be permitted to execute. diff --git a/pyramid/csrf.py b/pyramid/csrf.py index 7adbc9fee..b2788a764 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -11,7 +11,7 @@ from pyramid.exceptions import ( BadCSRFOrigin, BadCSRFToken, ) -from pyramid.interfaces import ICSRFPolicy +from pyramid.interfaces import ICSRFStoragePolicy from pyramid.settings import aslist from pyramid.util import ( is_same_domain, @@ -19,7 +19,7 @@ from pyramid.util import ( ) -@implementer(ICSRFPolicy) +@implementer(ICSRFStoragePolicy) class SessionCSRF(object): """ The default CSRF implementation, which mimics the behavior from older versions of Pyramid. The ``new_csrf_token`` and ``get_csrf_token`` methods @@ -48,7 +48,7 @@ class SessionCSRF(object): bytes_(supplied_token, 'ascii'), ) -@implementer(ICSRFPolicy) +@implementer(ICSRFStoragePolicy) class CookieCSRF(object): """ An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the @@ -108,7 +108,7 @@ def csrf_token_template_global(event): except AttributeError: return else: - csrf = registry.getUtility(ICSRFPolicy) + csrf = registry.getUtility(ICSRFStoragePolicy) event['get_csrf_token'] = partial(csrf.get_csrf_token, request) @@ -120,7 +120,7 @@ def get_csrf_token(request): .. versionadded :: 1.8a1 """ registry = request.registry - csrf = registry.getUtility(ICSRFPolicy) + csrf = registry.getUtility(ICSRFStoragePolicy) return csrf.get_csrf_token(request) @@ -132,7 +132,7 @@ def new_csrf_token(request): .. versionadded :: 1.8a1 """ registry = request.registry - csrf = registry.getUtility(ICSRFPolicy) + csrf = registry.getUtility(ICSRFStoragePolicy) return csrf.new_csrf_token(request) @@ -141,7 +141,7 @@ def check_csrf_token(request, header='X-CSRF-Token', raises=True): """ Check the CSRF token returned by the - :class:`pyramid.interfaces.ICSRFPolicy` implementation against the value in + :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the value in ``request.POST.get(token)`` (if a POST request) or ``request.headers.get(header)``. If a ``token`` keyword is not supplied to this function, the string ``csrf_token`` will be used to look up the token @@ -151,7 +151,7 @@ def check_csrf_token(request, If the value supplied by post or by header doesn't match the value supplied by ``policy.get_csrf_token()`` (where ``policy`` is an implementation of - :class:`pyramid.interfaces.ICSRFPolicy`), and ``raises`` is ``True``, this + :class:`pyramid.interfaces.ICSRFStoragePolicy`), and ``raises`` is ``True``, this function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ and ``raises`` is ``False``, this function will return ``False``. If the CSRF check is successful, this function will return @@ -184,7 +184,7 @@ def check_csrf_token(request, if supplied_token == "" and token is not None: supplied_token = request.POST.get(token, "") - policy = request.registry.getUtility(ICSRFPolicy) + policy = request.registry.getUtility(ICSRFStoragePolicy) if not policy.check_csrf_token(request, supplied_token): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py index f58ee8b58..aab5647a1 100644 --- a/pyramid/interfaces.py +++ b/pyramid/interfaces.py @@ -982,7 +982,7 @@ class ISession(IDict): """ -class ICSRFPolicy(Interface): +class ICSRFStoragePolicy(Interface): """ An object that offers the ability to verify CSRF tokens and generate new ones""" diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index 1b3f3fc3b..8866f3601 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -117,13 +117,13 @@ class TestSessionCSRF(unittest.TestCase): def test_register_session_csrf_policy(self): from pyramid.csrf import SessionCSRF - from pyramid.interfaces import ICSRFPolicy + from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() config.set_default_csrf_options(implementation=self._makeOne()) config.commit() - policy = config.registry.queryUtility(ICSRFPolicy) + policy = config.registry.queryUtility(ICSRFStoragePolicy) self.assertTrue(isinstance(policy, SessionCSRF)) @@ -163,13 +163,13 @@ class TestCookieCSRF(unittest.TestCase): def test_register_cookie_csrf_policy(self): from pyramid.csrf import CookieCSRF - from pyramid.interfaces import ICSRFPolicy + from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() config.set_default_csrf_options(implementation=self._makeOne()) config.commit() - policy = config.registry.queryUtility(ICSRFPolicy) + policy = config.registry.queryUtility(ICSRFStoragePolicy) self.assertTrue(isinstance(policy, CookieCSRF)) -- cgit v1.2.3 From f6d63a41d37b0647c49e53bb54f009f7da4d5079 Mon Sep 17 00:00:00 2001 From: Matthew Wilkes Date: Fri, 9 Dec 2016 12:00:17 +0100 Subject: Fix a bug where people that didn't configure CSRF protection but did configure a session and set explicit checks would see an exception --- pyramid/csrf.py | 8 +++++++- pyramid/tests/test_csrf.py | 26 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pyramid/csrf.py b/pyramid/csrf.py index b2788a764..f282eb569 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -184,7 +184,13 @@ def check_csrf_token(request, if supplied_token == "" and token is not None: supplied_token = request.POST.get(token, "") - policy = request.registry.getUtility(ICSRFStoragePolicy) + policy = request.registry.queryUtility(ICSRFStoragePolicy) + if policy is None: + # There is no policy set, but we are trying to validate a CSRF token + # This means explicit validation has been asked for without configuring + # the CSRF implementation. Fall back to SessionCSRF as that is the + # default + policy = SessionCSRF() if not policy.check_csrf_token(request, supplied_token): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index 8866f3601..3994a31d4 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -313,6 +313,32 @@ class Test_check_csrf_token(unittest.TestCase): self.assertEqual(self._callFUT(request, token='csrf_token'), True) +class Test_check_csrf_token_without_defaults_configured(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from ..csrf import check_csrf_token + return check_csrf_token(*args, **kwargs) + + def test_success_token(self): + request = testing.DummyRequest() + request.method = "POST" + request.POST = {'csrf_token': request.session.get_csrf_token()} + self.assertEqual(self._callFUT(request, token='csrf_token'), True) + + def test_failure_raises(self): + from pyramid.exceptions import BadCSRFToken + request = testing.DummyRequest() + self.assertRaises(BadCSRFToken, self._callFUT, request, + 'csrf_token') + + def test_failure_no_raises(self): + request = testing.DummyRequest() + result = self._callFUT(request, 'csrf_token', raises=False) + self.assertEqual(result, False) + + class Test_check_csrf_origin(unittest.TestCase): def _callFUT(self, *args, **kwargs): from ..csrf import check_csrf_origin -- cgit v1.2.3 From 7c0f098641fda4207ea6fa50c58b289926038697 Mon Sep 17 00:00:00 2001 From: Matthew Wilkes Date: Wed, 12 Apr 2017 11:57:56 +0100 Subject: Use the webob CookieProfile in the Cookie implementation, rename some implemenations based on feedback, split CSRF implementation and option configuration and make the csrf token function exposed as a system default rather than a renderer event. --- docs/api/config.rst | 1 + docs/api/csrf.rst | 4 +- docs/narr/extconfig.rst | 1 + docs/narr/security.rst | 8 +-- pyramid/config/__init__.py | 1 + pyramid/config/security.py | 31 ++++++---- pyramid/csrf.py | 52 +++++++---------- pyramid/renderers.py | 4 ++ pyramid/tests/test_csrf.py | 126 +++++++--------------------------------- pyramid/tests/test_renderers.py | 8 +++ 10 files changed, 84 insertions(+), 152 deletions(-) diff --git a/docs/api/config.rst b/docs/api/config.rst index c76d3d5ff..a785b64ad 100644 --- a/docs/api/config.rst +++ b/docs/api/config.rst @@ -37,6 +37,7 @@ .. automethod:: set_authentication_policy .. automethod:: set_authorization_policy .. automethod:: set_default_csrf_options + .. automethod:: set_csrf_storage_policy .. automethod:: set_default_permission .. automethod:: add_permission diff --git a/docs/api/csrf.rst b/docs/api/csrf.rst index 89fb0c4b2..f890ee660 100644 --- a/docs/api/csrf.rst +++ b/docs/api/csrf.rst @@ -5,10 +5,10 @@ .. automodule:: pyramid.csrf - .. autoclass:: SessionCSRF + .. autoclass:: SessionCSRFStoragePolicy :members: - .. autoclass:: CookieCSRF + .. autoclass:: CookieCSRFStoragePolicy :members: .. autofunction:: get_csrf_token diff --git a/docs/narr/extconfig.rst b/docs/narr/extconfig.rst index 4009ec1dc..c20685cbf 100644 --- a/docs/narr/extconfig.rst +++ b/docs/narr/extconfig.rst @@ -263,6 +263,7 @@ Pre-defined Phases - :meth:`pyramid.config.Configurator.override_asset` - :meth:`pyramid.config.Configurator.set_authorization_policy` - :meth:`pyramid.config.Configurator.set_default_csrf_options` +- :meth:`pyramid.config.Configurator.set_csrf_storage_policy` - :meth:`pyramid.config.Configurator.set_default_permission` - :meth:`pyramid.config.Configurator.set_view_mapper` diff --git a/docs/narr/security.rst b/docs/narr/security.rst index 04c236e0b..e67f7b98c 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -780,15 +780,15 @@ and then requiring that it be present in all potentially unsafe requests. :app:`Pyramid` provides facilities to create and check CSRF tokens. By default :app:`Pyramid` comes with a session-based CSRF implementation -:class:`pyramid.csrf.SessionCSRF`. To use it, you must first enable +:class:`pyramid.csrf.SessionCSRFStoragePolicy`. To use it, you must first enable a :term:`session factory` as described in :ref:`using_the_default_session_factory` or :ref:`using_alternate_session_factories`. Alternatively, you can use -a cookie-based implementation :class:`pyramid.csrf.CookieCSRF` which gives +a cookie-based implementation :class:`pyramid.csrf.CookieCSRFStoragePolicy` which gives some additional flexibility as it does not require a session for each user. You can also define your own implementation of :class:`pyramid.interfaces.ICSRFStoragePolicy` and register it with the -:meth:`pyramid.config.Configurator.set_default_csrf_options` directive. +:meth:`pyramid.config.Configurator.set_csrf_storage_policy` directive. For example: @@ -797,7 +797,7 @@ For example: from pyramid.config import Configurator config = Configurator() - config.set_default_csrf_options(implementation=MyCustomCSRFPolicy()) + config.set_csrf_storage_policy(MyCustomCSRFPolicy()) .. index:: single: csrf.get_csrf_token diff --git a/pyramid/config/__init__.py b/pyramid/config/__init__.py index 6c661aa59..b05effbde 100644 --- a/pyramid/config/__init__.py +++ b/pyramid/config/__init__.py @@ -380,6 +380,7 @@ class Configurator( self.add_default_view_derivers() self.add_default_route_predicates() self.add_default_tweens() + self.add_default_security() if exceptionresponse_view is not None: exceptionresponse_view = self.maybe_dotted(exceptionresponse_view) diff --git a/pyramid/config/security.py b/pyramid/config/security.py index c8becce1f..0b565e322 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -10,15 +10,17 @@ from pyramid.interfaces import ( PHASE2_CONFIG, ) -from pyramid.csrf import csrf_token_template_global -from pyramid.csrf import SessionCSRF -from pyramid.events import BeforeRender +from pyramid.csrf import SessionCSRFStoragePolicy from pyramid.exceptions import ConfigurationError from pyramid.util import action_method from pyramid.util import as_sorted_tuple class SecurityConfiguratorMixin(object): + + def add_default_security(self): + self.set_csrf_storage_policy(SessionCSRFStoragePolicy()) + @action_method def set_authentication_policy(self, policy): """ Override the :app:`Pyramid` :term:`authentication policy` in the @@ -170,7 +172,6 @@ class SecurityConfiguratorMixin(object): @action_method def set_default_csrf_options( self, - implementation=None, require_csrf=True, token='csrf_token', header='X-CSRF-Token', @@ -180,10 +181,6 @@ class SecurityConfiguratorMixin(object): """ Set the default CSRF options used by subsequent view registrations. - ``implementation`` is a class that implements the - :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface that will be used for all - CSRF functionality. Default: :class:`pyramid.csrf.SessionCSRF`. - ``require_csrf`` controls whether CSRF checks will be automatically enabled on each view in the application. This value is used as the fallback when ``require_csrf`` is left at the default of ``None`` on @@ -217,10 +214,7 @@ class SecurityConfiguratorMixin(object): options = DefaultCSRFOptions( require_csrf, token, header, safe_methods, callback, ) - if implementation is None: - implementation = SessionCSRF() def register(): - self.registry.registerUtility(implementation, ICSRFStoragePolicy) self.registry.registerUtility(options, IDefaultCSRFOptions) intr = self.introspectable('default csrf view options', None, @@ -232,10 +226,23 @@ class SecurityConfiguratorMixin(object): intr['safe_methods'] = as_sorted_tuple(safe_methods) intr['callback'] = callback - self.add_subscriber(csrf_token_template_global, [BeforeRender]) self.action(IDefaultCSRFOptions, register, order=PHASE1_CONFIG, introspectables=(intr,)) + @action_method + def set_csrf_storage_policy(self, policy): + """ + Set the CSRF storage policy used by subsequent view registrations. + + ``policy`` is a class that implements the + :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface that will be used for all + CSRF functionality. + """ + def register(): + self.registry.registerUtility(policy, ICSRFStoragePolicy) + + self.action(ICSRFStoragePolicy, register, order=PHASE1_CONFIG) + @implementer(IDefaultCSRFOptions) class DefaultCSRFOptions(object): diff --git a/pyramid/csrf.py b/pyramid/csrf.py index f282eb569..4c5a73940 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -1,8 +1,11 @@ -from functools import partial import uuid +from webob.cookies import CookieProfile from zope.interface import implementer + +from pyramid.authentication import _SimpleSerializer + from pyramid.compat import ( urlparse, bytes_ @@ -20,7 +23,7 @@ from pyramid.util import ( @implementer(ICSRFStoragePolicy) -class SessionCSRF(object): +class SessionCSRFStoragePolicy(object): """ The default CSRF implementation, which mimics the behavior from older versions of Pyramid. The ``new_csrf_token`` and ``get_csrf_token`` methods are indirected to the underlying session implementation. @@ -49,7 +52,7 @@ class SessionCSRF(object): ) @implementer(ICSRFStoragePolicy) -class CookieCSRF(object): +class CookieCSRFStoragePolicy(object): """ An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the OWASP CSRF guidelines. This gives some additional flexibility with regards @@ -60,25 +63,25 @@ class CookieCSRF(object): """ def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, - domain=None, path='/'): - self.cookie_name = cookie_name - self.secure = secure - self.httponly = httponly + domain=None, max_age=None, path='/'): + serializer = _SimpleSerializer() + self.cookie_profile = CookieProfile( + cookie_name=cookie_name, + secure=secure, + max_age=max_age, + httponly=httponly, + path=path, + serializer=serializer + ) self.domain = domain - self.path = path def new_csrf_token(self, request): """ Sets a new CSRF token into the request and returns it. """ token = uuid.uuid4().hex def set_cookie(request, response): - response.set_cookie( - self.cookie_name, + self.cookie_profile.set_cookies( + response, token, - httponly=self.httponly, - secure=self.secure, - domain=self.domain, - path=self.path, - overwrite=True, ) request.add_response_callback(set_cookie) return token @@ -86,7 +89,8 @@ class CookieCSRF(object): def get_csrf_token(self, request): """ Returns the currently active CSRF token by checking the cookies sent with the current request.""" - token = request.cookies.get(self.cookie_name) + bound_cookies = self.cookie_profile.bind(request) + token = bound_cookies.get_value() if not token: token = self.new_csrf_token(request) return token @@ -100,18 +104,6 @@ class CookieCSRF(object): bytes_(supplied_token, 'ascii'), ) - -def csrf_token_template_global(event): - request = event.get('request', None) - try: - registry = request.registry - except AttributeError: - return - else: - csrf = registry.getUtility(ICSRFStoragePolicy) - event['get_csrf_token'] = partial(csrf.get_csrf_token, request) - - def get_csrf_token(request): """ Get the currently active CSRF token for the request passed, generating a new one using ``new_csrf_token(request)`` if one does not exist. This @@ -188,9 +180,9 @@ def check_csrf_token(request, if policy is None: # There is no policy set, but we are trying to validate a CSRF token # This means explicit validation has been asked for without configuring - # the CSRF implementation. Fall back to SessionCSRF as that is the + # the CSRF implementation. Fall back to SessionCSRFStoragePolicy as that is the # default - policy = SessionCSRF() + policy = SessionCSRFStoragePolicy() if not policy.check_csrf_token(request, supplied_token): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') diff --git a/pyramid/renderers.py b/pyramid/renderers.py index 7d667ba7b..6019f50fb 100644 --- a/pyramid/renderers.py +++ b/pyramid/renderers.py @@ -1,3 +1,4 @@ +from functools import partial import json import os import re @@ -19,6 +20,7 @@ from pyramid.compat import ( text_type, ) +from pyramid.csrf import get_csrf_token from pyramid.decorator import reify from pyramid.events import BeforeRender @@ -428,6 +430,7 @@ class RendererHelper(object): 'context':context, 'request':request, 'req':request, + 'get_csrf_token':partial(get_csrf_token, request), } return self.render_to_response(response, system, request=request) @@ -441,6 +444,7 @@ class RendererHelper(object): 'context':getattr(request, 'context', None), 'request':request, 'req':request, + 'get_csrf_token':partial(get_csrf_token, request), } system_values = BeforeRender(system_values, value) diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index 3994a31d4..e6ae05eec 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -22,7 +22,7 @@ class Test_get_csrf_token(unittest.TestCase): self._callFUT(request) def test_success(self): - self.config.set_default_csrf_options(implementation=DummyCSRF()) + self.config.set_csrf_storage_policy(DummyCSRF()) request = testing.DummyRequest() csrf_token = self._callFUT(request) @@ -45,7 +45,7 @@ class Test_new_csrf_token(unittest.TestCase): self._callFUT(request) def test_success(self): - self.config.set_default_csrf_options(implementation=DummyCSRF()) + self.config.set_csrf_storage_policy(DummyCSRF()) request = testing.DummyRequest() csrf_token = self._callFUT(request) @@ -53,57 +53,7 @@ class Test_new_csrf_token(unittest.TestCase): self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') -class Test_csrf_token_template_global(unittest.TestCase): - def setUp(self): - self.config = testing.setUp() - - def _callFUT(self, *args, **kwargs): - from pyramid.csrf import csrf_token_template_global - return csrf_token_template_global(*args, **kwargs) - - def test_event_is_missing_request(self): - event = BeforeRender({}, {}) - - self._callFUT(event) - - self.assertNotIn('get_csrf_token', event) - - def test_request_is_missing_registry(self): - request = DummyRequest(registry=None) - del request.registry - del request.__class__.registry - event = BeforeRender({'request': request}, {}) - - self._callFUT(event) - - self.assertNotIn('get_csrf_token', event) - - def test_csrf_utility_not_registered(self): - request = testing.DummyRequest() - event = BeforeRender({'request': request}, {}) - - with self.assertRaises(ComponentLookupError): - self._callFUT(event) - - def test_csrf_token_passed_to_template(self): - config = Configurator() - config.set_default_csrf_options(implementation=DummyCSRF()) - config.commit() - - request = testing.DummyRequest() - request.registry = config.registry - - before = BeforeRender({'request': request}, {}) - config.registry.notify(before) - - self.assertIn('get_csrf_token', before) - self.assertEqual( - before['get_csrf_token'](), - '02821185e4c94269bdc38e6eeae0a2f8' - ) - - -class TestSessionCSRF(unittest.TestCase): +class TestSessionCSRFStoragePolicy(unittest.TestCase): class MockSession(object): def new_csrf_token(self): return 'e5e9e30a08b34ff9842ff7d2b958c14b' @@ -112,20 +62,20 @@ class TestSessionCSRF(unittest.TestCase): return '02821185e4c94269bdc38e6eeae0a2f8' def _makeOne(self): - from pyramid.csrf import SessionCSRF - return SessionCSRF() + from pyramid.csrf import SessionCSRFStoragePolicy + return SessionCSRFStoragePolicy() def test_register_session_csrf_policy(self): - from pyramid.csrf import SessionCSRF + from pyramid.csrf import SessionCSRFStoragePolicy from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() - config.set_default_csrf_options(implementation=self._makeOne()) + config.set_csrf_storage_policy(self._makeOne()) config.commit() policy = config.registry.queryUtility(ICSRFStoragePolicy) - self.assertTrue(isinstance(policy, SessionCSRF)) + self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) def test_session_csrf_implementation_delegates_to_session(self): policy = self._makeOne() @@ -156,22 +106,22 @@ class TestSessionCSRF(unittest.TestCase): self.assertTrue(result) -class TestCookieCSRF(unittest.TestCase): +class TestCookieCSRFStoragePolicy(unittest.TestCase): def _makeOne(self): - from pyramid.csrf import CookieCSRF - return CookieCSRF() + from pyramid.csrf import CookieCSRFStoragePolicy + return CookieCSRFStoragePolicy() def test_register_cookie_csrf_policy(self): - from pyramid.csrf import CookieCSRF + from pyramid.csrf import CookieCSRFStoragePolicy from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() - config.set_default_csrf_options(implementation=self._makeOne()) + config.set_csrf_storage_policy(self._makeOne()) config.commit() policy = config.registry.queryUtility(ICSRFStoragePolicy) - self.assertTrue(isinstance(policy, CookieCSRF)) + self.assertTrue(isinstance(policy, CookieCSRFStoragePolicy)) def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): response = MockResponse() @@ -179,20 +129,9 @@ class TestCookieCSRF(unittest.TestCase): policy = self._makeOne() token = policy.get_csrf_token(request) - self.assertEqual( - response.called_args, - ('csrf_token', token), - ) - self.assertEqual( - response.called_kwargs, - { - 'secure': False, - 'httponly': False, - 'domain': None, - 'path': '/', - 'overwrite': True - } + response.headerlist, + [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] ) def test_existing_cookie_csrf_does_not_set_cookie(self): @@ -208,12 +147,8 @@ class TestCookieCSRF(unittest.TestCase): 'e6f325fee5974f3da4315a8ccf4513d2' ) self.assertEqual( - response.called_args, - (), - ) - self.assertEqual( - response.called_kwargs, - {} + response.headerlist, + [], ) def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): @@ -223,20 +158,9 @@ class TestCookieCSRF(unittest.TestCase): policy = self._makeOne() token = policy.new_csrf_token(request) - - self.assertEqual( - response.called_args, - ('csrf_token', token), - ) self.assertEqual( - response.called_kwargs, - { - 'secure': False, - 'httponly': False, - 'domain': None, - 'path': '/', - 'overwrite': True - } + response.headerlist, + [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] ) def test_verifying_token_invalid_token(self): @@ -264,7 +188,7 @@ class Test_check_csrf_token(unittest.TestCase): def setUp(self): self.config = testing.setUp() - # set up CSRF (this will also register SessionCSRF policy) + # set up CSRF (this will also register SessionCSRFStoragePolicy policy) self.config.set_default_csrf_options(require_csrf=False) def _callFUT(self, *args, **kwargs): @@ -446,13 +370,7 @@ class DummyRequest(object): class MockResponse(object): def __init__(self): - self.called_args = () - self.called_kwargs = {} - - def set_cookie(self, *args, **kwargs): - self.called_args = args - self.called_kwargs = kwargs - return + self.headerlist = [] class DummyCSRF(object): diff --git a/pyramid/tests/test_renderers.py b/pyramid/tests/test_renderers.py index 65bfa5582..86d8b582a 100644 --- a/pyramid/tests/test_renderers.py +++ b/pyramid/tests/test_renderers.py @@ -203,6 +203,7 @@ class TestRendererHelper(unittest.TestCase): self.assertEqual(helper.get_renderer(), factory.respond) def test_render_view(self): + import pyramid.csrf self._registerRendererFactory() self._registerResponseFactory() request = Dummy() @@ -212,6 +213,9 @@ class TestRendererHelper(unittest.TestCase): request = testing.DummyRequest() response = 'response' response = helper.render_view(request, response, view, context) + get_csrf = response.app_iter[1].pop('get_csrf_token') + self.assertEqual(get_csrf.args, (request, )) + self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token) self.assertEqual(response.app_iter[0], 'response') self.assertEqual(response.app_iter[1], {'renderer_info': helper, @@ -242,12 +246,16 @@ class TestRendererHelper(unittest.TestCase): self.assertEqual(reg.event.__class__.__name__, 'BeforeRender') def test_render_system_values_is_None(self): + import pyramid.csrf self._registerRendererFactory() request = Dummy() context = Dummy() request.context = context helper = self._makeOne('loo.foo') result = helper.render('values', None, request=request) + get_csrf = result[1].pop('get_csrf_token') + self.assertEqual(get_csrf.args, (request, )) + self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token) system = {'request':request, 'context':context, 'renderer_name':'loo.foo', -- cgit v1.2.3 From 2ded2fc216b4caaf0d97813413943e0838b6eaaa Mon Sep 17 00:00:00 2001 From: Matthew Wilkes Date: Wed, 26 Apr 2017 15:41:47 +0100 Subject: Apply drafting changes to documentation. --- CHANGES.txt | 2 +- docs/glossary.rst | 5 +++++ docs/narr/security.rst | 4 ++++ docs/narr/sessions.rst | 4 ---- pyramid/config/security.py | 2 +- pyramid/config/views.py | 6 ++++++ pyramid/csrf.py | 43 ++++++++++++++++++++++--------------------- pyramid/interfaces.py | 4 ++-- 8 files changed, 41 insertions(+), 29 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9d6264688..762550053 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -25,7 +25,7 @@ Features appropriately. See https://github.com/Pylons/pyramid/pull/2989 - A new CSRF implementation, :class:`pyramid.csrf.SessionCSRF` has been added, - which deleagates all CSRF generation to the current session, following the + which delegates all CSRF generation to the current session, following the old API for this. A ``get_csrf_token()`` method is now available in template global scope, to make it easy for template developers to get the current CSRF token without adding it to Python code. diff --git a/docs/glossary.rst b/docs/glossary.rst index 0a46fac3b..0cf96f488 100644 --- a/docs/glossary.rst +++ b/docs/glossary.rst @@ -891,6 +891,11 @@ Glossary :meth:`pyramid.config.Configurator.set_session_factory` for more information. + CSRF storage policy + A utility that implements :class:`pyramid.interfaces.ICSRFStoragePolicy` + which is responsible for allocating CSRF tokens to a user and verifying + that a provided token is acceptable. + Mako `Mako `_ is a template language which refines the familiar ideas of componentized layout and inheritance diff --git a/docs/narr/security.rst b/docs/narr/security.rst index e67f7b98c..86e5c1ef4 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -766,6 +766,10 @@ a secret across two different subsystems might drop the security of signing to zero. Keys should not be re-used across different contexts where an attacker has the possibility of providing a chosen plaintext. +.. index:: + single: preventing cross-site request forgery attacks + single: cross-site request forgery attacks, prevention + Preventing Cross-Site Request Forgery Attacks --------------------------------------------- diff --git a/docs/narr/sessions.rst b/docs/narr/sessions.rst index 86fe2a139..7e2469d54 100644 --- a/docs/narr/sessions.rst +++ b/docs/narr/sessions.rst @@ -315,7 +315,3 @@ flash storage. ['info message'] >>> request.session.peek_flash() [] - -.. index:: - single: preventing cross-site request forgery attacks - single: cross-site request forgery attacks, prevention diff --git a/pyramid/config/security.py b/pyramid/config/security.py index 0b565e322..6f5b36d3a 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -232,7 +232,7 @@ class SecurityConfiguratorMixin(object): @action_method def set_csrf_storage_policy(self, policy): """ - Set the CSRF storage policy used by subsequent view registrations. + Set the :term:`CSRF storage policy` used by subsequent view registrations. ``policy`` is a class that implements the :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface that will be used for all diff --git a/pyramid/config/views.py b/pyramid/config/views.py index e037f7706..2fc243fac 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -651,6 +651,12 @@ class ViewsConfiguratorMixin(object): .. versionadded:: 1.4a2 + .. versionchanged:: 1.9 + This feature requires either a :term:`session factory` to have been + configured, or a :term:`CSRF storage policy` other than the default + to be in use. + + physical_path If specified, this value should be a string or a tuple representing diff --git a/pyramid/csrf.py b/pyramid/csrf.py index 4c5a73940..ffc7b5fe3 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -31,7 +31,7 @@ class SessionCSRFStoragePolicy(object): Note that using this CSRF implementation requires that a :term:`session factory` is configured. - .. versionadded :: 1.8a1 + .. versionadded :: 1.9 """ def new_csrf_token(self, request): """ Sets a new CSRF token into the session and returns it. """ @@ -43,8 +43,8 @@ class SessionCSRFStoragePolicy(object): return request.session.get_csrf_token() def check_csrf_token(self, request, supplied_token): - """ Returns True if supplied_token is the same value as get_csrf_token - returns for this request. """ + """ Returns ``True`` if ``supplied_token is`` the same value as + ``get_csrf_token(request)``.""" expected = self.get_csrf_token(request) return not strings_differ( bytes_(expected, 'ascii'), @@ -55,11 +55,11 @@ class SessionCSRFStoragePolicy(object): class CookieCSRFStoragePolicy(object): """ An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the - OWASP CSRF guidelines. This gives some additional flexibility with regards - to scaling as the tokens can be generated and verified by a front-end - server. + `OWASP CSRF guidelines `_. + This gives some additional flexibility with regards to scaling as the tokens + can be generated and verified by a front-end server. - .. versionadded :: 1.8a1 + .. versionadded :: 1.9 """ def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, @@ -96,8 +96,8 @@ class CookieCSRFStoragePolicy(object): return token def check_csrf_token(self, request, supplied_token): - """ Returns True if supplied_token is the same value as get_csrf_token - returns for this request. """ + """ Returns ``True`` if ``supplied_token is`` the same value as + ``get_csrf_token(request)``.""" expected = self.get_csrf_token(request) return not strings_differ( bytes_(expected, 'ascii'), @@ -109,7 +109,7 @@ def get_csrf_token(request): a new one using ``new_csrf_token(request)`` if one does not exist. This calls the equivalent method in the chosen CSRF protection implementation. - .. versionadded :: 1.8a1 + .. versionadded :: 1.9 """ registry = request.registry csrf = registry.getUtility(ICSRFStoragePolicy) @@ -121,7 +121,7 @@ def new_csrf_token(request): implementation defined manner. This calls the equivalent method in the chosen CSRF protection implementation. - .. versionadded :: 1.8a1 + .. versionadded :: 1.9 """ registry = request.registry csrf = registry.getUtility(ICSRFStoragePolicy) @@ -159,8 +159,8 @@ def check_csrf_token(request, considered valid. It must be passed in either the request body or a header. - .. versionchanged:: 1.8a1 - Moved from pyramid.session to pyramid.csrf + .. versionchanged:: 1.9 + Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` """ supplied_token = "" # We first check the headers for a csrf token, as that is significantly @@ -192,27 +192,28 @@ def check_csrf_token(request, def check_csrf_origin(request, trusted_origins=None, raises=True): """ - Check the Origin of the request to see if it is a cross site request or + Check the ``Origin`` of the request to see if it is a cross site request or not. - If the value supplied by the Origin or Referer header isn't one of the + If the value supplied by the ``Origin`` or ``Referer`` header isn't one of the trusted origins and ``raises`` is ``True``, this function will raise a - :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is - ``False`` this function will return ``False`` instead. If the CSRF origin + :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is + ``False``, this function will return ``False`` instead. If the CSRF origin checks are successful this function will return ``True`` unconditionally. Additional trusted origins may be added by passing a list of domain (and - ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in + ports if nonstandard like ``['example.com', 'dev.example.com:8080']``) in with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` (the default) this list of additional domains will be pulled from the ``pyramid.csrf_trusted_origins`` setting. - Note that this function will do nothing if request.scheme is not https. + Note that this function will do nothing if ``request.scheme`` is not + ``https``. .. versionadded:: 1.7 - .. versionchanged:: 1.8a1 - Moved from pyramid.session to pyramid.csrf + .. versionchanged:: 1.9 + Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` """ def _fail(reason): if raises: diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py index aab5647a1..c3b6b164d 100644 --- a/pyramid/interfaces.py +++ b/pyramid/interfaces.py @@ -999,8 +999,8 @@ class ICSRFStoragePolicy(Interface): """ def check_csrf_token(request, supplied_token): - """ Returns a boolean that represents if supplied_token is a valid CSRF - token for this request. Comparing strings for equality must be done + """ Returns a boolean that represents if ``supplied_token`` is a valid + CSRF token for this request. Comparing strings for equality must be done using :func:`pyramid.utils.strings_differ` to avoid timing attacks. """ -- cgit v1.2.3 From 4b3603ad2f4850605c45e1b7bf4f077584303641 Mon Sep 17 00:00:00 2001 From: Matthew Wilkes Date: Wed, 26 Apr 2017 15:43:18 +0100 Subject: Move CSRF storage policy registration out of PHASE_1 config and simplify tests given previous improvements to CSRF. --- docs/narr/extconfig.rst | 1 - pyramid/config/security.py | 2 +- pyramid/csrf.py | 6 ------ pyramid/testing.py | 1 + pyramid/tests/test_csrf.py | 14 +++++--------- 5 files changed, 7 insertions(+), 17 deletions(-) diff --git a/docs/narr/extconfig.rst b/docs/narr/extconfig.rst index c20685cbf..4009ec1dc 100644 --- a/docs/narr/extconfig.rst +++ b/docs/narr/extconfig.rst @@ -263,7 +263,6 @@ Pre-defined Phases - :meth:`pyramid.config.Configurator.override_asset` - :meth:`pyramid.config.Configurator.set_authorization_policy` - :meth:`pyramid.config.Configurator.set_default_csrf_options` -- :meth:`pyramid.config.Configurator.set_csrf_storage_policy` - :meth:`pyramid.config.Configurator.set_default_permission` - :meth:`pyramid.config.Configurator.set_view_mapper` diff --git a/pyramid/config/security.py b/pyramid/config/security.py index 6f5b36d3a..9d59ca78e 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -241,7 +241,7 @@ class SecurityConfiguratorMixin(object): def register(): self.registry.registerUtility(policy, ICSRFStoragePolicy) - self.action(ICSRFStoragePolicy, register, order=PHASE1_CONFIG) + self.action(ICSRFStoragePolicy, register) @implementer(IDefaultCSRFOptions) diff --git a/pyramid/csrf.py b/pyramid/csrf.py index ffc7b5fe3..5d183bb57 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -177,12 +177,6 @@ def check_csrf_token(request, supplied_token = request.POST.get(token, "") policy = request.registry.queryUtility(ICSRFStoragePolicy) - if policy is None: - # There is no policy set, but we are trying to validate a CSRF token - # This means explicit validation has been asked for without configuring - # the CSRF implementation. Fall back to SessionCSRFStoragePolicy as that is the - # default - policy = SessionCSRFStoragePolicy() if not policy.check_csrf_token(request, supplied_token): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') diff --git a/pyramid/testing.py b/pyramid/testing.py index 877b351db..69b30e83f 100644 --- a/pyramid/testing.py +++ b/pyramid/testing.py @@ -479,6 +479,7 @@ def setUp(registry=None, request=None, hook_zca=True, autocommit=True, config.add_default_view_derivers() config.add_default_route_predicates() config.add_default_tweens() + config.add_default_security() config.commit() global have_zca try: diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index e6ae05eec..fcb6333ee 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -15,11 +15,9 @@ class Test_get_csrf_token(unittest.TestCase): from pyramid.csrf import get_csrf_token return get_csrf_token(*args, **kwargs) - def test_no_csrf_utility_registered(self): + def test_no_override_csrf_utility_registered(self): request = testing.DummyRequest() - - with self.assertRaises(ComponentLookupError): - self._callFUT(request) + self._callFUT(request) def test_success(self): self.config.set_csrf_storage_policy(DummyCSRF()) @@ -38,11 +36,9 @@ class Test_new_csrf_token(unittest.TestCase): from pyramid.csrf import new_csrf_token return new_csrf_token(*args, **kwargs) - def test_no_csrf_utility_registered(self): + def test_no_override_csrf_utility_registered(self): request = testing.DummyRequest() - - with self.assertRaises(ComponentLookupError): - self._callFUT(request) + self._callFUT(request) def test_success(self): self.config.set_csrf_storage_policy(DummyCSRF()) @@ -188,7 +184,7 @@ class Test_check_csrf_token(unittest.TestCase): def setUp(self): self.config = testing.setUp() - # set up CSRF (this will also register SessionCSRFStoragePolicy policy) + # set up CSRF self.config.set_default_csrf_options(require_csrf=False) def _callFUT(self, *args, **kwargs): -- cgit v1.2.3 From 682a9b9df6f42f8261daa077f04b47b65bf00c34 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Sat, 29 Apr 2017 01:43:38 -0500 Subject: final cleanup of csrf decoupling in #2854 - Renamed `SessionCSRFStoragePolicy` to `LegacySessionCSRFStoragePolicy` for the version that uses the legacy `ISession.get_csrf_token` and `ISession.new_csrf_token` apis and set that as the default. - Added new `SessionCSRFStoragePolicy` that stores data in the session similar to how the `SessionAuthenticationPolicy` works. - `CookieCSRFStoragePolicy` did not properly return the newly generated token from `get_csrf_token` after calling `new_csrf_token`. It needed to cache the new value since the response callback does not affect the current request. - `CookieCSRFStoragePolicy` was not forwarding the `domain` value to the `CookieProfile` causing that setting to be ignored. - Removed `check_csrf_token` from the `ICSRFStoragePolicy` interface to simplify implementations of storage policies. - Added an introspectable item for the configured storage policy so that it appears on the debugtoolbar. - Added a change note on `ISession` that it no longer required the csrf methods. - Leave deprecated shims in ``pyramid.session`` for ``check_csrf_origin`` and ``check_csrf_token``. --- CHANGES.txt | 13 +++--- docs/api/csrf.rst | 3 ++ docs/narr/security.rst | 1 + docs/narr/templates.rst | 4 ++ pyramid/config/security.py | 20 ++++++--- pyramid/csrf.py | 109 +++++++++++++++++++++++++++++---------------- pyramid/interfaces.py | 30 ++++++++----- pyramid/session.py | 14 ++++++ pyramid/tests/test_csrf.py | 108 ++++++++++++++++++++++---------------------- pyramid/tests/test_util.py | 6 ++- 10 files changed, 190 insertions(+), 118 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 762550053..7d70abbb8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,12 +24,13 @@ Features can be alleviated by invoking ``config.begin()`` and ``config.end()`` appropriately. See https://github.com/Pylons/pyramid/pull/2989 -- A new CSRF implementation, :class:`pyramid.csrf.SessionCSRF` has been added, - which delegates all CSRF generation to the current session, following the - old API for this. A ``get_csrf_token()`` method is now available in template - global scope, to make it easy for template developers to get the current CSRF - token without adding it to Python code. - See https://github.com/Pylons/pyramid/pull/2854 +- A new CSRF implementation, ``pyramid.csrf.SessionCSRFStoragePolicy``, + has been added which delegates all CSRF generation to the current session, + following the old API for this. A ``pyramid.csrf.get_csrf_token()`` api is now + available in template global scope, to make it easy for template developers + to get the current CSRF token without adding it to Python code. + See https://github.com/Pylons/pyramid/pull/2854 and + https://github.com/Pylons/pyramid/pull/3019 Bug Fixes diff --git a/docs/api/csrf.rst b/docs/api/csrf.rst index f890ee660..38501546e 100644 --- a/docs/api/csrf.rst +++ b/docs/api/csrf.rst @@ -5,6 +5,9 @@ .. automodule:: pyramid.csrf + .. autoclass:: LegacySessionCSRFStoragePolicy + :members: + .. autoclass:: SessionCSRFStoragePolicy :members: diff --git a/docs/narr/security.rst b/docs/narr/security.rst index 86e5c1ef4..ddf496b69 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -824,6 +824,7 @@ If no CSRF token previously existed for this user, then a new token will be set into the session and returned. The newly created token will be opaque and randomized. +.. _get_csrf_token_in_templates: Using the ``get_csrf_token`` global in templates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/narr/templates.rst b/docs/narr/templates.rst index 6b3b5fcce..4eadbd2f0 100644 --- a/docs/narr/templates.rst +++ b/docs/narr/templates.rst @@ -228,6 +228,10 @@ These values are provided to the template: provided if the template is rendered as the result of a ``renderer=`` argument to the view configuration being used. +``get_csrf_token()`` + A convenience function to access the current CSRF token. See + :ref:`get_csrf_token_in_templates` for more information. + ``renderer_name`` The renderer name used to perform the rendering, e.g., ``mypackage:templates/foo.pt``. diff --git a/pyramid/config/security.py b/pyramid/config/security.py index 9d59ca78e..8e4c908d3 100644 --- a/pyramid/config/security.py +++ b/pyramid/config/security.py @@ -10,7 +10,7 @@ from pyramid.interfaces import ( PHASE2_CONFIG, ) -from pyramid.csrf import SessionCSRFStoragePolicy +from pyramid.csrf import LegacySessionCSRFStoragePolicy from pyramid.exceptions import ConfigurationError from pyramid.util import action_method from pyramid.util import as_sorted_tuple @@ -19,7 +19,7 @@ from pyramid.util import as_sorted_tuple class SecurityConfiguratorMixin(object): def add_default_security(self): - self.set_csrf_storage_policy(SessionCSRFStoragePolicy()) + self.set_csrf_storage_policy(LegacySessionCSRFStoragePolicy()) @action_method def set_authentication_policy(self, policy): @@ -232,16 +232,22 @@ class SecurityConfiguratorMixin(object): @action_method def set_csrf_storage_policy(self, policy): """ - Set the :term:`CSRF storage policy` used by subsequent view registrations. + Set the :term:`CSRF storage policy` used by subsequent view + registrations. ``policy`` is a class that implements the - :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface that will be used for all - CSRF functionality. + :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface and defines + how to generate and persist CSRF tokens. + """ def register(): self.registry.registerUtility(policy, ICSRFStoragePolicy) - - self.action(ICSRFStoragePolicy, register) + intr = self.introspectable('csrf storage policy', + None, + policy, + 'csrf storage policy') + intr['policy'] = policy + self.action(ICSRFStoragePolicy, register, introspectables=(intr,)) @implementer(IDefaultCSRFOptions) diff --git a/pyramid/csrf.py b/pyramid/csrf.py index 5d183bb57..1910e4ec8 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -7,8 +7,9 @@ from zope.interface import implementer from pyramid.authentication import _SimpleSerializer from pyramid.compat import ( + bytes_, urlparse, - bytes_ + text_, ) from pyramid.exceptions import ( BadCSRFOrigin, @@ -23,44 +24,79 @@ from pyramid.util import ( @implementer(ICSRFStoragePolicy) -class SessionCSRFStoragePolicy(object): - """ The default CSRF implementation, which mimics the behavior from older - versions of Pyramid. The ``new_csrf_token`` and ``get_csrf_token`` methods - are indirected to the underlying session implementation. +class LegacySessionCSRFStoragePolicy(object): + """ A CSRF storage policy that defers control of CSRF storage to the + session. + + This policy maintains compatibility with legacy ISession implementations + that know how to manage CSRF tokens themselves via + ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``. Note that using this CSRF implementation requires that a :term:`session factory` is configured. - .. versionadded :: 1.9 + .. versionadded:: 1.9 + """ def new_csrf_token(self, request): """ Sets a new CSRF token into the session and returns it. """ return request.session.new_csrf_token() def get_csrf_token(self, request): - """ Returns the currently active CSRF token from the session, generating - a new one if needed.""" + """ Returns the currently active CSRF token from the session, + generating a new one if needed.""" return request.session.get_csrf_token() - def check_csrf_token(self, request, supplied_token): - """ Returns ``True`` if ``supplied_token is`` the same value as - ``get_csrf_token(request)``.""" - expected = self.get_csrf_token(request) - return not strings_differ( - bytes_(expected, 'ascii'), - bytes_(supplied_token, 'ascii'), - ) + +@implementer(ICSRFStoragePolicy) +class SessionCSRFStoragePolicy(object): + """ A CSRF storage policy that persists the CSRF token in the session. + + Note that using this CSRF implementation requires that + a :term:`session factory` is configured. + + ``key`` + + The session key where the CSRF token will be stored. + Default: `_csrft_`. + + .. versionadded:: 1.9 + + """ + _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) + + def __init__(self, key='_csrft_'): + self.key = key + + def new_csrf_token(self, request): + """ Sets a new CSRF token into the session and returns it. """ + token = self._token_factory() + request.session[self.key] = token + return token + + def get_csrf_token(self, request): + """ Returns the currently active CSRF token from the session, + generating a new one if needed.""" + token = request.session.get(self.key, None) + if not token: + token = self.new_csrf_token(request) + return token + @implementer(ICSRFStoragePolicy) class CookieCSRFStoragePolicy(object): """ An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the - `OWASP CSRF guidelines `_. - This gives some additional flexibility with regards to scaling as the tokens - can be generated and verified by a front-end server. + `OWASP CSRF guidelines `_. This gives some additional flexibility with + regards to scaling as the tokens can be generated and verified by a + front-end server. + + .. versionadded:: 1.9 - .. versionadded :: 1.9 """ + _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, domain=None, max_age=None, path='/'): @@ -71,13 +107,15 @@ class CookieCSRFStoragePolicy(object): max_age=max_age, httponly=httponly, path=path, + domains=[domain], serializer=serializer ) - self.domain = domain + self.cookie_name = cookie_name def new_csrf_token(self, request): """ Sets a new CSRF token into the request and returns it. """ - token = uuid.uuid4().hex + token = self._token_factory() + request.cookies[self.cookie_name] = token def set_cookie(request, response): self.cookie_profile.set_cookies( response, @@ -95,14 +133,6 @@ class CookieCSRFStoragePolicy(object): token = self.new_csrf_token(request) return token - def check_csrf_token(self, request, supplied_token): - """ Returns ``True`` if ``supplied_token is`` the same value as - ``get_csrf_token(request)``.""" - expected = self.get_csrf_token(request) - return not strings_differ( - bytes_(expected, 'ascii'), - bytes_(supplied_token, 'ascii'), - ) def get_csrf_token(request): """ Get the currently active CSRF token for the request passed, generating @@ -133,8 +163,8 @@ def check_csrf_token(request, header='X-CSRF-Token', raises=True): """ Check the CSRF token returned by the - :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the value in - ``request.POST.get(token)`` (if a POST request) or + :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the + value in ``request.POST.get(token)`` (if a POST request) or ``request.headers.get(header)``. If a ``token`` keyword is not supplied to this function, the string ``csrf_token`` will be used to look up the token in ``request.POST``. If a ``header`` keyword is not supplied to this @@ -143,11 +173,12 @@ def check_csrf_token(request, If the value supplied by post or by header doesn't match the value supplied by ``policy.get_csrf_token()`` (where ``policy`` is an implementation of - :class:`pyramid.interfaces.ICSRFStoragePolicy`), and ``raises`` is ``True``, this - function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If - the values differ and ``raises`` is ``False``, this function will return - ``False``. If the CSRF check is successful, this function will return - ``True`` unconditionally. + :class:`pyramid.interfaces.ICSRFStoragePolicy`), and ``raises`` is + ``True``, this function will raise an + :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ + and ``raises`` is ``False``, this function will return ``False``. If the + CSRF check is successful, this function will return ``True`` + unconditionally. See :ref:`auto_csrf_checking` for information about how to secure your application automatically against CSRF attacks. @@ -176,8 +207,8 @@ def check_csrf_token(request, if supplied_token == "" and token is not None: supplied_token = request.POST.get(token, "") - policy = request.registry.queryUtility(ICSRFStoragePolicy) - if not policy.check_csrf_token(request, supplied_token): + expected_token = get_csrf_token(request) + if strings_differ(bytes_(expected_token), bytes_(supplied_token)): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') return False diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py index c3b6b164d..853e8fcdd 100644 --- a/pyramid/interfaces.py +++ b/pyramid/interfaces.py @@ -927,6 +927,13 @@ class ISession(IDict): usually accessed via ``request.session``. Keys and values of a session must be pickleable. + + .. versionchanged:: 1.9 + + Sessions are no longer required to implement ``get_csrf_token`` and + ``new_csrf_token``. CSRF token support was moved to the pluggable + :class:`pyramid.interfaces.ICSRFStoragePolicy` configuration hook. + """ # attributes @@ -984,24 +991,23 @@ class ISession(IDict): class ICSRFStoragePolicy(Interface): """ An object that offers the ability to verify CSRF tokens and generate - new ones""" + new ones.""" def new_csrf_token(request): - """ Create and return a new, random cross-site request forgery - protection token. Return the token. It will be a string.""" + """ Create and return a new, random cross-site request forgery + protection token. The token will be an ascii-compatible unicode + string. + + """ def get_csrf_token(request): """ Return a cross-site request forgery protection token. It - will be a string. If a token was previously set for this user via - ``new_csrf_token``, that token will be returned. If no CSRF token - was previously set, ``new_csrf_token`` will be called, which will - create and set a token, and this token will be returned. - """ + will be an ascii-compatible unicode string. If a token was previously + set for this user via ``new_csrf_token``, that token will be returned. + If no CSRF token was previously set, ``new_csrf_token`` will be + called, which will create and set a token, and this token will be + returned. - def check_csrf_token(request, supplied_token): - """ Returns a boolean that represents if ``supplied_token`` is a valid - CSRF token for this request. Comparing strings for equality must be done - using :func:`pyramid.utils.strings_differ` to avoid timing attacks. """ diff --git a/pyramid/session.py b/pyramid/session.py index b1ad25410..33119343b 100644 --- a/pyramid/session.py +++ b/pyramid/session.py @@ -17,6 +17,10 @@ from pyramid.compat import ( bytes_, native_, ) +from pyramid.csrf import ( + check_csrf_origin, + check_csrf_token, +) from pyramid.interfaces import ISession from pyramid.util import strings_differ @@ -608,3 +612,13 @@ def SignedCookieSessionFactory( reissue_time=reissue_time, set_on_exception=set_on_exception, ) + +check_csrf_origin = check_csrf_origin # api +deprecated('check_csrf_origin', + 'pyramid.session.check_csrf_origin is deprecated as of Pyramid ' + '1.9. Use pyramid.csrf.check_csrf_origin instead.') + +check_csrf_token = check_csrf_token # api +deprecated('check_csrf_token', + 'pyramid.session.check_csrf_token is deprecated as of Pyramid ' + '1.9. Use pyramid.csrf.check_csrf_token instead.') diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index fcb6333ee..cd7ba2951 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -49,7 +49,7 @@ class Test_new_csrf_token(unittest.TestCase): self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') -class TestSessionCSRFStoragePolicy(unittest.TestCase): +class TestLegacySessionCSRFStoragePolicy(unittest.TestCase): class MockSession(object): def new_csrf_token(self): return 'e5e9e30a08b34ff9842ff7d2b958c14b' @@ -58,11 +58,11 @@ class TestSessionCSRFStoragePolicy(unittest.TestCase): return '02821185e4c94269bdc38e6eeae0a2f8' def _makeOne(self): - from pyramid.csrf import SessionCSRFStoragePolicy - return SessionCSRFStoragePolicy() + from pyramid.csrf import LegacySessionCSRFStoragePolicy + return LegacySessionCSRFStoragePolicy() def test_register_session_csrf_policy(self): - from pyramid.csrf import SessionCSRFStoragePolicy + from pyramid.csrf import LegacySessionCSRFStoragePolicy from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() @@ -71,7 +71,7 @@ class TestSessionCSRFStoragePolicy(unittest.TestCase): policy = config.registry.queryUtility(ICSRFStoragePolicy) - self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) + self.assertTrue(isinstance(policy, LegacySessionCSRFStoragePolicy)) def test_session_csrf_implementation_delegates_to_session(self): policy = self._makeOne() @@ -86,26 +86,46 @@ class TestSessionCSRFStoragePolicy(unittest.TestCase): 'e5e9e30a08b34ff9842ff7d2b958c14b' ) - def test_verifying_token_invalid(self): + +class TestSessionCSRFStoragePolicy(unittest.TestCase): + def _makeOne(self, **kw): + from pyramid.csrf import SessionCSRFStoragePolicy + return SessionCSRFStoragePolicy(**kw) + + def test_register_session_csrf_policy(self): + from pyramid.csrf import SessionCSRFStoragePolicy + from pyramid.interfaces import ICSRFStoragePolicy + + config = Configurator() + config.set_csrf_storage_policy(self._makeOne()) + config.commit() + + policy = config.registry.queryUtility(ICSRFStoragePolicy) + + self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) + + def test_it_creates_a_new_token(self): + request = DummyRequest(session={}) + policy = self._makeOne() - request = DummyRequest(session=self.MockSession()) + policy._token_factory = lambda: 'foo' + self.assertEqual(policy.get_csrf_token(request), 'foo') - result = policy.check_csrf_token(request, 'invalid-token') - self.assertFalse(result) + def test_get_csrf_token_returns_the_new_token(self): + request = DummyRequest(session={'_csrft_': 'foo'}) - def test_verifying_token_valid(self): policy = self._makeOne() - request = DummyRequest(session=self.MockSession()) + self.assertEqual(policy.get_csrf_token(request), 'foo') - result = policy.check_csrf_token( - request, '02821185e4c94269bdc38e6eeae0a2f8') - self.assertTrue(result) + token = policy.new_csrf_token(request) + self.assertNotEqual(token, 'foo') + self.assertEqual(token, policy.get_csrf_token(request)) class TestCookieCSRFStoragePolicy(unittest.TestCase): - def _makeOne(self): + def _makeOne(self, **kw): from pyramid.csrf import CookieCSRFStoragePolicy - return CookieCSRFStoragePolicy() + return CookieCSRFStoragePolicy(**kw) def test_register_cookie_csrf_policy(self): from pyramid.csrf import CookieCSRFStoragePolicy @@ -121,18 +141,18 @@ class TestCookieCSRFStoragePolicy(unittest.TestCase): def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): response = MockResponse() - request = DummyRequest(response=response) + request = DummyRequest() policy = self._makeOne() token = policy.get_csrf_token(request) + request.response_callback(request, response) self.assertEqual( response.headerlist, [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] ) def test_existing_cookie_csrf_does_not_set_cookie(self): - response = MockResponse() - request = DummyRequest(response=response) + request = DummyRequest() request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} policy = self._makeOne() @@ -142,42 +162,32 @@ class TestCookieCSRFStoragePolicy(unittest.TestCase): token, 'e6f325fee5974f3da4315a8ccf4513d2' ) - self.assertEqual( - response.headerlist, - [], - ) + self.assertIsNone(request.response_callback) def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): - response = MockResponse() - request = DummyRequest(response=response) + request = DummyRequest() request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} policy = self._makeOne() token = policy.new_csrf_token(request) + + response = MockResponse() + request.response_callback(request, response) self.assertEqual( response.headerlist, [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] ) - def test_verifying_token_invalid_token(self): - response = MockResponse() - request = DummyRequest(response=response) - request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + def test_get_csrf_token_returns_the_new_token(self): + request = DummyRequest() + request.cookies = {'csrf_token': 'foo'} policy = self._makeOne() - self.assertFalse( - policy.check_csrf_token(request, 'invalid-token') - ) - - def test_verifying_token_against_existing_cookie(self): - response = MockResponse() - request = DummyRequest(response=response) - request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} + self.assertEqual(policy.get_csrf_token(request), 'foo') - policy = self._makeOne() - self.assertTrue( - policy.check_csrf_token(request, 'e6f325fee5974f3da4315a8ccf4513d2') - ) + token = policy.new_csrf_token(request) + self.assertNotEqual(token, 'foo') + self.assertEqual(token, policy.get_csrf_token(request)) class Test_check_csrf_token(unittest.TestCase): @@ -224,14 +234,6 @@ class Test_check_csrf_token(unittest.TestCase): result = self._callFUT(request, 'csrf_token', raises=False) self.assertEqual(result, False) - def test_token_differing_types(self): - from pyramid.compat import text_ - request = testing.DummyRequest() - request.method = "POST" - request.session['_csrft_'] = text_('foo') - request.POST = {'csrf_token': b'foo'} - self.assertEqual(self._callFUT(request, token='csrf_token'), True) - class Test_check_csrf_token_without_defaults_configured(unittest.TestCase): def setUp(self): @@ -353,15 +355,15 @@ class Test_check_csrf_origin(unittest.TestCase): class DummyRequest(object): registry = None session = None - cookies = {} + response_callback = None - def __init__(self, registry=None, session=None, response=None): + def __init__(self, registry=None, session=None): self.registry = registry self.session = session - self.response = response + self.cookies = {} def add_response_callback(self, callback): - callback(self, self.response) + self.response_callback = callback class MockResponse(object): diff --git a/pyramid/tests/test_util.py b/pyramid/tests/test_util.py index bbf6103f4..d64f0a73f 100644 --- a/pyramid/tests/test_util.py +++ b/pyramid/tests/test_util.py @@ -369,12 +369,16 @@ class Test_strings_differ(unittest.TestCase): from pyramid.util import strings_differ return strings_differ(*args, **kw) - def test_it(self): + def test_it_bytes(self): self.assertFalse(self._callFUT(b'foo', b'foo')) self.assertTrue(self._callFUT(b'123', b'345')) self.assertTrue(self._callFUT(b'1234', b'123')) self.assertTrue(self._callFUT(b'123', b'1234')) + def test_it_native_str(self): + self.assertFalse(self._callFUT('123', '123')) + self.assertTrue(self._callFUT('123', '1234')) + def test_it_with_internal_comparator(self): result = self._callFUT(b'foo', b'foo', compare_digest=None) self.assertFalse(result) -- cgit v1.2.3 From 3f14d63c009ae7f101b7aeb4525bab2dfe66fa11 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Sun, 30 Apr 2017 02:00:48 -0500 Subject: restore the ``ICSRFStoragePolicy.check_csrf_token`` api --- pyramid/csrf.py | 35 ++++++++++--- pyramid/interfaces.py | 10 ++++ pyramid/tests/test_csrf.py | 121 +++++++++++++++++++++++++++------------------ 3 files changed, 113 insertions(+), 53 deletions(-) diff --git a/pyramid/csrf.py b/pyramid/csrf.py index 1910e4ec8..c8f097777 100644 --- a/pyramid/csrf.py +++ b/pyramid/csrf.py @@ -47,6 +47,12 @@ class LegacySessionCSRFStoragePolicy(object): generating a new one if needed.""" return request.session.get_csrf_token() + def check_csrf_token(self, request, supplied_token): + """ Returns ``True`` if the ``supplied_token`` is valid.""" + expected_token = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected_token), bytes_(supplied_token)) + @implementer(ICSRFStoragePolicy) class SessionCSRFStoragePolicy(object): @@ -82,6 +88,12 @@ class SessionCSRFStoragePolicy(object): token = self.new_csrf_token(request) return token + def check_csrf_token(self, request, supplied_token): + """ Returns ``True`` if the ``supplied_token`` is valid.""" + expected_token = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected_token), bytes_(supplied_token)) + @implementer(ICSRFStoragePolicy) class CookieCSRFStoragePolicy(object): @@ -133,6 +145,12 @@ class CookieCSRFStoragePolicy(object): token = self.new_csrf_token(request) return token + def check_csrf_token(self, request, supplied_token): + """ Returns ``True`` if the ``supplied_token`` is valid.""" + expected_token = self.get_csrf_token(request) + return not strings_differ( + bytes_(expected_token), bytes_(supplied_token)) + def get_csrf_token(request): """ Get the currently active CSRF token for the request passed, generating @@ -140,6 +158,7 @@ def get_csrf_token(request): calls the equivalent method in the chosen CSRF protection implementation. .. versionadded :: 1.9 + """ registry = request.registry csrf = registry.getUtility(ICSRFStoragePolicy) @@ -152,6 +171,7 @@ def new_csrf_token(request): chosen CSRF protection implementation. .. versionadded :: 1.9 + """ registry = request.registry csrf = registry.getUtility(ICSRFStoragePolicy) @@ -171,9 +191,8 @@ def check_csrf_token(request, function, the string ``X-CSRF-Token`` will be used to look up the token in ``request.headers``. - If the value supplied by post or by header doesn't match the value supplied - by ``policy.get_csrf_token()`` (where ``policy`` is an implementation of - :class:`pyramid.interfaces.ICSRFStoragePolicy`), and ``raises`` is + If the value supplied by post or by header cannot be verified by the + :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is ``True``, this function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ and ``raises`` is ``False``, this function will return ``False``. If the @@ -191,7 +210,10 @@ def check_csrf_token(request, a header. .. versionchanged:: 1.9 - Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` + Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated + to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to + verify the CSRF token. + """ supplied_token = "" # We first check the headers for a csrf token, as that is significantly @@ -207,8 +229,8 @@ def check_csrf_token(request, if supplied_token == "" and token is not None: supplied_token = request.POST.get(token, "") - expected_token = get_csrf_token(request) - if strings_differ(bytes_(expected_token), bytes_(supplied_token)): + policy = request.registry.getUtility(ICSRFStoragePolicy) + if not policy.check_csrf_token(request, text_(supplied_token)): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') return False @@ -239,6 +261,7 @@ def check_csrf_origin(request, trusted_origins=None, raises=True): .. versionchanged:: 1.9 Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` + """ def _fail(reason): if raises: diff --git a/pyramid/interfaces.py b/pyramid/interfaces.py index 853e8fcdd..ab83813c8 100644 --- a/pyramid/interfaces.py +++ b/pyramid/interfaces.py @@ -1010,6 +1010,16 @@ class ICSRFStoragePolicy(Interface): """ + def check_csrf_token(request, token): + """ Determine if the supplied ``token`` is valid. Most implementations + should simply compare the ``token`` to the current value of + ``get_csrf_token`` but it is possible to verify the token using + any mechanism necessary using this method. + + Returns ``True`` if the ``token`` is valid, otherwise ``False``. + + """ + class IIntrospector(Interface): def get(category_name, discriminator, default=None): diff --git a/pyramid/tests/test_csrf.py b/pyramid/tests/test_csrf.py index cd7ba2951..f01780ad8 100644 --- a/pyramid/tests/test_csrf.py +++ b/pyramid/tests/test_csrf.py @@ -1,61 +1,20 @@ import unittest -from zope.interface.interfaces import ComponentLookupError - from pyramid import testing from pyramid.config import Configurator -from pyramid.events import BeforeRender - - -class Test_get_csrf_token(unittest.TestCase): - def setUp(self): - self.config = testing.setUp() - - def _callFUT(self, *args, **kwargs): - from pyramid.csrf import get_csrf_token - return get_csrf_token(*args, **kwargs) - - def test_no_override_csrf_utility_registered(self): - request = testing.DummyRequest() - self._callFUT(request) - - def test_success(self): - self.config.set_csrf_storage_policy(DummyCSRF()) - request = testing.DummyRequest() - - csrf_token = self._callFUT(request) - - self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8') - - -class Test_new_csrf_token(unittest.TestCase): - def setUp(self): - self.config = testing.setUp() - - def _callFUT(self, *args, **kwargs): - from pyramid.csrf import new_csrf_token - return new_csrf_token(*args, **kwargs) - - def test_no_override_csrf_utility_registered(self): - request = testing.DummyRequest() - self._callFUT(request) - - def test_success(self): - self.config.set_csrf_storage_policy(DummyCSRF()) - request = testing.DummyRequest() - - csrf_token = self._callFUT(request) - - self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') class TestLegacySessionCSRFStoragePolicy(unittest.TestCase): class MockSession(object): + def __init__(self, current_token='02821185e4c94269bdc38e6eeae0a2f8'): + self.current_token = current_token + def new_csrf_token(self): - return 'e5e9e30a08b34ff9842ff7d2b958c14b' + self.current_token = 'e5e9e30a08b34ff9842ff7d2b958c14b' + return self.current_token def get_csrf_token(self): - return '02821185e4c94269bdc38e6eeae0a2f8' + return self.current_token def _makeOne(self): from pyramid.csrf import LegacySessionCSRFStoragePolicy @@ -86,6 +45,13 @@ class TestLegacySessionCSRFStoragePolicy(unittest.TestCase): 'e5e9e30a08b34ff9842ff7d2b958c14b' ) + def test_check_csrf_token(self): + request = DummyRequest(session=self.MockSession('foo')) + + policy = self._makeOne() + self.assertTrue(policy.check_csrf_token(request, 'foo')) + self.assertFalse(policy.check_csrf_token(request, 'bar')) + class TestSessionCSRFStoragePolicy(unittest.TestCase): def _makeOne(self, **kw): @@ -121,6 +87,16 @@ class TestSessionCSRFStoragePolicy(unittest.TestCase): self.assertNotEqual(token, 'foo') self.assertEqual(token, policy.get_csrf_token(request)) + def test_check_csrf_token(self): + request = DummyRequest(session={}) + + policy = self._makeOne() + self.assertFalse(policy.check_csrf_token(request, 'foo')) + + request.session = {'_csrft_': 'foo'} + self.assertTrue(policy.check_csrf_token(request, 'foo')) + self.assertFalse(policy.check_csrf_token(request, 'bar')) + class TestCookieCSRFStoragePolicy(unittest.TestCase): def _makeOne(self, **kw): @@ -189,6 +165,57 @@ class TestCookieCSRFStoragePolicy(unittest.TestCase): self.assertNotEqual(token, 'foo') self.assertEqual(token, policy.get_csrf_token(request)) + def test_check_csrf_token(self): + request = DummyRequest() + + policy = self._makeOne() + self.assertFalse(policy.check_csrf_token(request, 'foo')) + + request.cookies = {'csrf_token': 'foo'} + self.assertTrue(policy.check_csrf_token(request, 'foo')) + self.assertFalse(policy.check_csrf_token(request, 'bar')) + +class Test_get_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import get_csrf_token + return get_csrf_token(*args, **kwargs) + + def test_no_override_csrf_utility_registered(self): + request = testing.DummyRequest() + self._callFUT(request) + + def test_success(self): + self.config.set_csrf_storage_policy(DummyCSRF()) + request = testing.DummyRequest() + + csrf_token = self._callFUT(request) + + self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8') + + +class Test_new_csrf_token(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def _callFUT(self, *args, **kwargs): + from pyramid.csrf import new_csrf_token + return new_csrf_token(*args, **kwargs) + + def test_no_override_csrf_utility_registered(self): + request = testing.DummyRequest() + self._callFUT(request) + + def test_success(self): + self.config.set_csrf_storage_policy(DummyCSRF()) + request = testing.DummyRequest() + + csrf_token = self._callFUT(request) + + self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') + class Test_check_csrf_token(unittest.TestCase): def setUp(self): -- cgit v1.2.3