diff options
| author | Donald Stufft <donald@stufft.io> | 2016-04-15 20:42:20 -0400 |
|---|---|---|
| committer | Donald Stufft <donald@stufft.io> | 2016-04-16 16:00:45 -0400 |
| commit | 65dee6e4ca0c0c607e97db0c9e55768f10591a58 (patch) | |
| tree | 6185b4704a6de2261d5568773c260d50e209d0aa | |
| parent | 1799be9dd8666d10d6b4a04a9b75fc57f8626c6f (diff) | |
| download | pyramid-65dee6e4ca0c0c607e97db0c9e55768f10591a58.tar.gz pyramid-65dee6e4ca0c0c607e97db0c9e55768f10591a58.tar.bz2 pyramid-65dee6e4ca0c0c607e97db0c9e55768f10591a58.zip | |
In addition to CSRF token, verify the origin too
Add an additional layer of protection against CSRF by verifying the actual
origin of the request in addition to the CSRF token. We only do this check on
sites hosted behind HTTPS because only HTTPS sites have evidence to show that
the Referrer header is not being spuriously removed by random middleware
boxes.
| -rw-r--r-- | CHANGES.txt | 12 | ||||
| -rw-r--r-- | docs/api/exceptions.rst | 2 | ||||
| -rw-r--r-- | docs/api/session.rst | 2 | ||||
| -rw-r--r-- | docs/narr/sessions.rst | 8 | ||||
| -rw-r--r-- | docs/narr/viewconfig.rst | 3 | ||||
| -rw-r--r-- | pyramid/config/settings.py | 4 | ||||
| -rw-r--r-- | pyramid/exceptions.py | 15 | ||||
| -rw-r--r-- | pyramid/session.py | 103 | ||||
| -rw-r--r-- | pyramid/tests/test_config/test_views.py | 3 | ||||
| -rw-r--r-- | pyramid/tests/test_session.py | 93 | ||||
| -rw-r--r-- | pyramid/tests/test_util.py | 21 | ||||
| -rw-r--r-- | pyramid/tests/test_viewderivers.py | 51 | ||||
| -rw-r--r-- | pyramid/util.py | 17 | ||||
| -rw-r--r-- | pyramid/viewderivers.py | 6 |
14 files changed, 337 insertions, 3 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 0a7bdef1a..0cd2c0c9a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,6 +35,18 @@ Features https://github.com/Pylons/pyramid/pull/2413 and https://github.com/Pylons/pyramid/pull/2500 +- Added an additional CSRF validation that checks the origin/referrer of a + request and makes sure it matches the current ``request.domain``. This + particular check is only active when accessing a site over HTTPS as otherwise + browsers don't always send the required information. If this additional CSRF + validation fails a ``BadCSRFOrigin`` exception will be raised and may be + caught by exception views (the default response is ``400 Bad Request``). + Additional allowed origins may be configured by setting + ``pyramid.csrf_trusted_origins`` to a list of domain names (with ports if on + a non standard port) to allow. Subdomains are not allowed unless the domain + name has been prefixed with a ``.``. See: + https://github.com/Pylons/pyramid/pull/2501 + - Pyramid HTTPExceptions will now take into account the best match for the clients Accept header, and depending on what is requested will return text/html, application/json or text/plain. The default for */* is still diff --git a/docs/api/exceptions.rst b/docs/api/exceptions.rst index faca0fbb6..cb411458d 100644 --- a/docs/api/exceptions.rst +++ b/docs/api/exceptions.rst @@ -5,6 +5,8 @@ .. automodule:: pyramid.exceptions + .. autoexception:: BadCSRFOrigin + .. autoexception:: BadCSRFToken .. autoexception:: PredicateMismatch diff --git a/docs/api/session.rst b/docs/api/session.rst index 474e2bb32..56c4f52d7 100644 --- a/docs/api/session.rst +++ b/docs/api/session.rst @@ -9,6 +9,8 @@ .. autofunction:: signed_deserialize + .. autofunction:: check_csrf_origin + .. autofunction:: check_csrf_token .. autofunction:: SignedCookieSessionFactory diff --git a/docs/narr/sessions.rst b/docs/narr/sessions.rst index 0e895ff81..7cf96ac7d 100644 --- a/docs/narr/sessions.rst +++ b/docs/narr/sessions.rst @@ -437,6 +437,14 @@ It is always possible to pass the token in the ``X-CSRF-Token`` header as well. There is currently no way to define an alternate name for this header without performing CSRF checking manually. +In addition to token based CSRF checks, the automatic CSRF checking will also +check the referrer of the request to ensure that it matches one of the trusted +origins. By default the only trusted origin is the current host, however +additional origins may be configured by setting +``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they +are non standard). If a host in the list of domains starts with a ``.`` then +that will allow all subdomains as well as the domain without the ``.``. + If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` exception will be raised. This exception may be caught and handled by an :term:`exception view` but, by default, will result in a ``400 Bad Request`` diff --git a/docs/narr/viewconfig.rst b/docs/narr/viewconfig.rst index 3b8f0353a..cd5b8feb0 100644 --- a/docs/narr/viewconfig.rst +++ b/docs/narr/viewconfig.rst @@ -215,6 +215,9 @@ Non-Predicate Arguments If this option is set to ``False`` then CSRF checks will be disabled regardless of the ``pyramid.require_default_csrf`` setting. + In addition, if this option is set to ``True`` or a string then CSRF origin + checking will be enabled. + See :ref:`auto_csrf_checking` for more information. .. versionadded:: 1.7 diff --git a/pyramid/config/settings.py b/pyramid/config/settings.py index b66986327..9e5c3b62d 100644 --- a/pyramid/config/settings.py +++ b/pyramid/config/settings.py @@ -124,6 +124,8 @@ class Settings(dict): config_prevent_cachebust)) require_default_csrf = self.get('pyramid.require_default_csrf') eff_require_default_csrf = require_default_csrf + csrf_trusted_origins = self.get("pyramid.csrf_trusted_origins", []) + eff_csrf_trusted_origins = csrf_trusted_origins update = { 'debug_authorization': eff_debug_all or eff_debug_auth, @@ -137,6 +139,7 @@ class Settings(dict): 'prevent_http_cache':eff_prevent_http_cache, 'prevent_cachebust':eff_prevent_cachebust, 'require_default_csrf':eff_require_default_csrf, + 'csrf_trusted_origins':eff_csrf_trusted_origins, 'pyramid.debug_authorization': eff_debug_all or eff_debug_auth, 'pyramid.debug_notfound': eff_debug_all or eff_debug_notfound, @@ -149,6 +152,7 @@ class Settings(dict): 'pyramid.prevent_http_cache':eff_prevent_http_cache, 'pyramid.prevent_cachebust':eff_prevent_cachebust, 'pyramid.require_default_csrf':eff_require_default_csrf, + 'pyramid.csrf_trusted_origins':eff_csrf_trusted_origins, } self.update(update) diff --git a/pyramid/exceptions.py b/pyramid/exceptions.py index c1481ce9c..a8a10f927 100644 --- a/pyramid/exceptions.py +++ b/pyramid/exceptions.py @@ -9,6 +9,21 @@ Forbidden = HTTPForbidden # bw compat CR = '\n' + +class BadCSRFOrigin(HTTPBadRequest): + """ + This exception indicates the request has failed cross-site request forgery + origin validation. + """ + title = "Bad CSRF Origin" + explanation = ( + "Access is denied. This server can not verify that the origin or " + "referrer of your request matches the current site. Either your " + "browser supplied the wrong Origin or Referrer or it did not supply " + "one at all." + ) + + class BadCSRFToken(HTTPBadRequest): """ This exception indicates the request has failed cross-site request diff --git a/pyramid/session.py b/pyramid/session.py index 6136e26a0..8e5ee8dd6 100644 --- a/pyramid/session.py +++ b/pyramid/session.py @@ -16,11 +16,19 @@ from pyramid.compat import ( text_, bytes_, native_, + urlparse, ) -from pyramid.exceptions import BadCSRFToken +from pyramid.exceptions import ( + BadCSRFOrigin, + BadCSRFToken, +) from pyramid.interfaces import ISession -from pyramid.util import strings_differ +from pyramid.settings import aslist +from pyramid.util import ( + is_same_domain, + strings_differ, +) def manage_accessed(wrapped): """ Decorator which causes a cookie to be renewed when an accessor @@ -101,6 +109,97 @@ def signed_deserialize(serialized, secret, hmac=hmac): return pickle.loads(pickled) + +def check_csrf_origin(request, trusted_origins=None, raises=True): + """ + Check the Origin of the request to see if it is a cross site request or + not. + + If the value supplied by the Origin or Referer header isn't one of the + trusted origins and ``raises`` is ``True``, this function will raise a + :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is + ``False`` this function will return ``False`` instead. If the CSRF origin + checks are successful this function will return ``True`` unconditionally. + + Additional trusted origins may be added by passing a list of domain (and + ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in + with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` + (the default) this list of additional domains will be pulled from the + ``pyramid.csrf_trusted_origins`` setting. + + Note that this function will do nothing if request.scheme is not https. + + .. versionadded:: 1.7 + """ + def _fail(reason): + if raises: + raise BadCSRFOrigin(reason) + else: + return False + + if request.scheme == "https": + # Suppose user visits http://example.com/ + # An active network attacker (man-in-the-middle, MITM) sends a + # POST form that targets https://example.com/detonate-bomb/ and + # submits it via JavaScript. + # + # The attacker will need to provide a CSRF cookie and token, but + # that's no problem for a MITM when we cannot make any assumptions + # about what kind of session storage is being used. So the MITM can + # circumvent the CSRF protection. This is true for any HTTP connection, + # but anyone using HTTPS expects better! For this reason, for + # https://example.com/ we need additional protection that treats + # http://example.com/ as completely untrusted. Under HTTPS, + # Barth et al. found that the Referer header is missing for + # same-domain requests in only about 0.2% of cases or less, so + # we can use strict Referer checking. + + # Determine the origin of this request + origin = request.headers.get("Origin") + if origin is None: + origin = request.referrer + + # Fail if we were not able to locate an origin at all + if not origin: + return _fail("Origin checking failed - no Origin or Referer.") + + # Parse our origin so we we can extract the required information from + # it. + originp = urlparse.urlparse(origin) + + # Ensure that our Referer is also secure. + if originp.scheme != "https": + return _fail( + "Referer checking failed - Referer is insecure while host is " + "secure." + ) + + # Determine which origins we trust, which by default will include the + # current origin. + if trusted_origins is None: + trusted_origins = aslist( + request.registry.settings.get( + "pyramid.csrf_trusted_origins", []) + ) + + if request.host_port not in {80, 443}: + trusted_origins.append("{0.domain}:{0.host_port}".format(request)) + else: + trusted_origins.append(request.domain) + + # Actually check to see if the request's origin matches any of our + # trusted origins. + if not any(is_same_domain(originp.netloc, host) + for host in trusted_origins): + reason = ( + "Referer checking failed - {} does not match any trusted " + "origins." + ) + return _fail(reason.format(origin)) + + return True + + def check_csrf_token(request, token='csrf_token', header='X-CSRF-Token', diff --git a/pyramid/tests/test_config/test_views.py b/pyramid/tests/test_config/test_views.py index 7be72257d..21ed24f44 100644 --- a/pyramid/tests/test_config/test_views.py +++ b/pyramid/tests/test_config/test_views.py @@ -1595,6 +1595,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): config.add_view(view, require_csrf='st', renderer=null_renderer) view = self._getViewCallable(config) request = self._makeRequest(config) + request.scheme = "http" request.method = 'POST' request.POST = {'st': 'foo'} request.headers = {} @@ -1609,6 +1610,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): config.add_view(view, require_csrf=True, renderer=null_renderer) view = self._getViewCallable(config) request = self._makeRequest(config) + request.scheme = "http" request.method = 'POST' request.POST = {} request.headers = {'X-CSRF-Token': 'foo'} @@ -1623,6 +1625,7 @@ class TestViewsConfigurationMixin(unittest.TestCase): config.add_view(view, require_csrf=True, renderer=null_renderer) view = self._getViewCallable(config) request = self._makeRequest(config) + request.scheme = "http" request.method = 'POST' request.POST = {} request.headers = {} diff --git a/pyramid/tests/test_session.py b/pyramid/tests/test_session.py index 4ec8f94a4..e08f9a919 100644 --- a/pyramid/tests/test_session.py +++ b/pyramid/tests/test_session.py @@ -705,6 +705,99 @@ class Test_check_csrf_token(unittest.TestCase): request.POST = {'csrf_token': b'foo'} self.assertEqual(self._callFUT(request, token='csrf_token'), True) + +class Test_check_csrf_origin(unittest.TestCase): + + def _callFUT(self, *args, **kwargs): + from ..session import check_csrf_origin + return check_csrf_origin(*args, **kwargs) + + def test_success_with_http(self): + request = testing.DummyRequest() + request.scheme = "http" + self.assertTrue(self._callFUT(request)) + + def test_success_with_https_and_referrer(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = 443 + request.referrer = "https://example.com/login/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_success_with_https_and_origin(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = 443 + request.headers = {"Origin": "https://example.com/"} + request.referrer = "https://not-example.com/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_success_with_additional_trusted_host(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = 443 + request.referrer = "https://not-example.com/login/" + request.registry.settings = { + "pyramid.csrf_trusted_origins": ["not-example.com"], + } + self.assertTrue(self._callFUT(request)) + + def test_success_with_nonstandard_port(self): + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com:8080" + request.host_port = 8080 + request.referrer = "https://example.com:8080/login/" + request.registry.settings = {} + self.assertTrue(self._callFUT(request)) + + def test_fails_with_wrong_host(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = 443 + request.referrer = "https://not-example.com/login/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_with_no_origin(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.referrer = None + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_when_http_to_https(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com" + request.host_port = 443 + request.referrer = "http://example.com/evil/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + def test_fails_with_nonstandard_port(self): + from pyramid.exceptions import BadCSRFOrigin + request = testing.DummyRequest() + request.scheme = "https" + request.host = "example.com:8080" + request.host_port = 8080 + request.referrer = "https://example.com/login/" + request.registry.settings = {} + self.assertRaises(BadCSRFOrigin, self._callFUT, request) + self.assertFalse(self._callFUT(request, raises=False)) + + class DummySerializer(object): def dumps(self, value): return base64.b64encode(json.dumps(value).encode('utf-8')) diff --git a/pyramid/tests/test_util.py b/pyramid/tests/test_util.py index c606a4b6b..bbf6103f4 100644 --- a/pyramid/tests/test_util.py +++ b/pyramid/tests/test_util.py @@ -856,3 +856,24 @@ def dummyfunc(): pass class Dummy(object): pass + + +class Test_is_same_domain(unittest.TestCase): + def _callFUT(self, *args, **kw): + from pyramid.util import is_same_domain + return is_same_domain(*args, **kw) + + def test_it(self): + self.assertTrue(self._callFUT("example.com", "example.com")) + self.assertFalse(self._callFUT("evil.com", "example.com")) + self.assertFalse(self._callFUT("evil.example.com", "example.com")) + self.assertFalse(self._callFUT("example.com", "")) + + def test_with_wildcard(self): + self.assertTrue(self._callFUT("example.com", ".example.com")) + self.assertTrue(self._callFUT("good.example.com", ".example.com")) + + def test_with_port(self): + self.assertTrue(self._callFUT("example.com:8080", "example.com:8080")) + self.assertFalse(self._callFUT("example.com:8080", "example.com")) + self.assertFalse(self._callFUT("example.com", "example.com:8080")) diff --git a/pyramid/tests/test_viewderivers.py b/pyramid/tests/test_viewderivers.py index 4613762f5..6bfe353e5 100644 --- a/pyramid/tests/test_viewderivers.py +++ b/pyramid/tests/test_viewderivers.py @@ -1119,6 +1119,7 @@ class TestDeriveView(unittest.TestCase): def inner_view(request): return response request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.POST = {} request.session = DummySession({'csrf_token': 'foo'}) @@ -1132,6 +1133,23 @@ class TestDeriveView(unittest.TestCase): def inner_view(request): return response request = self._makeRequest() + request.scheme = "http" + request.method = 'POST' + request.session = DummySession({'csrf_token': 'foo'}) + request.POST = {'DUMMY': 'foo'} + view = self.config._derive_view(inner_view, require_csrf='DUMMY') + result = view(None, request) + self.assertTrue(result is response) + + def test_csrf_view_https_domain(self): + response = DummyResponse() + def inner_view(request): + return response + request = self._makeRequest() + request.scheme = "https" + request.domain = "example.com" + request.host_port = 443 + request.referrer = "https://example.com/login/" request.method = 'POST' request.session = DummySession({'csrf_token': 'foo'}) request.POST = {'DUMMY': 'foo'} @@ -1153,6 +1171,7 @@ class TestDeriveView(unittest.TestCase): from pyramid.exceptions import BadCSRFToken def inner_view(request): pass request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.session = DummySession({'csrf_token': 'foo'}) request.POST = {'DUMMY': 'bar'} @@ -1163,6 +1182,7 @@ class TestDeriveView(unittest.TestCase): from pyramid.exceptions import BadCSRFToken def inner_view(request): pass request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.POST = {} request.session = DummySession({'csrf_token': 'foo'}) @@ -1174,6 +1194,7 @@ class TestDeriveView(unittest.TestCase): from pyramid.exceptions import BadCSRFToken def inner_view(request): pass request = self._makeRequest() + request.scheme = "http" request.method = 'PUT' request.POST = {} request.session = DummySession({'csrf_token': 'foo'}) @@ -1181,11 +1202,38 @@ class TestDeriveView(unittest.TestCase): view = self.config._derive_view(inner_view, require_csrf='DUMMY') self.assertRaises(BadCSRFToken, lambda: view(None, request)) + def test_csrf_view_fails_on_bad_referrer(self): + from pyramid.exceptions import BadCSRFOrigin + def inner_view(request): pass + request = self._makeRequest() + request.method = "POST" + request.scheme = "https" + request.host_port = 443 + request.domain = "example.com" + request.referrer = "https://not-example.com/evil/" + request.registry.settings = {} + view = self.config._derive_view(inner_view, require_csrf='DUMMY') + self.assertRaises(BadCSRFOrigin, lambda: view(None, request)) + + def test_csrf_view_fails_on_bad_origin(self): + from pyramid.exceptions import BadCSRFOrigin + def inner_view(request): pass + request = self._makeRequest() + request.method = "POST" + request.scheme = "https" + request.host_port = 443 + request.domain = "example.com" + request.headers = {"Origin": "https://not-example.com/evil/"} + request.registry.settings = {} + view = self.config._derive_view(inner_view, require_csrf='DUMMY') + self.assertRaises(BadCSRFOrigin, lambda: view(None, request)) + def test_csrf_view_uses_config_setting_truthy(self): response = DummyResponse() def inner_view(request): return response request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.session = DummySession({'csrf_token': 'foo'}) request.POST = {'csrf_token': 'foo'} @@ -1199,6 +1247,7 @@ class TestDeriveView(unittest.TestCase): def inner_view(request): return response request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.session = DummySession({'csrf_token': 'foo'}) request.POST = {'DUMMY': 'foo'} @@ -1225,6 +1274,7 @@ class TestDeriveView(unittest.TestCase): def inner_view(request): return response request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.session = DummySession({'csrf_token': 'foo'}) request.POST = {'DUMMY': 'foo'} @@ -1238,6 +1288,7 @@ class TestDeriveView(unittest.TestCase): def inner_view(request): return response request = self._makeRequest() + request.scheme = "http" request.method = 'POST' request.session = DummySession({'csrf_token': 'foo'}) request.POST = {'DUMMY': 'foo'} diff --git a/pyramid/util.py b/pyramid/util.py index fc1d52af5..4936dcb24 100644 --- a/pyramid/util.py +++ b/pyramid/util.py @@ -614,3 +614,20 @@ def hide_attrs(obj, *attrs): obj_vals[name] = saved_val elif name in obj_vals: del obj_vals[name] + + +def is_same_domain(host, pattern): + """ + Return ``True`` if the host is either an exact match or a match + to the wildcard pattern. + Any pattern beginning with a period matches a domain and all of its + subdomains. (e.g. ``.example.com`` matches ``example.com`` and + ``foo.example.com``). Anything else is an exact string match. + """ + if not pattern: + return False + + pattern = pattern.lower() + return (pattern[0] == "." and + (host.endswith(pattern) or host == pattern[1:]) or + pattern == host) diff --git a/pyramid/viewderivers.py b/pyramid/viewderivers.py index e9ff09416..c6152e382 100644 --- a/pyramid/viewderivers.py +++ b/pyramid/viewderivers.py @@ -6,7 +6,10 @@ from zope.interface import ( ) from pyramid.security import NO_PERMISSION_REQUIRED -from pyramid.session import check_csrf_token +from pyramid.session import ( + check_csrf_origin, + check_csrf_token, +) from pyramid.response import Response from pyramid.interfaces import ( @@ -491,6 +494,7 @@ def csrf_view(view, info): # Assume that anything not defined as 'safe' by RFC2616 needs # protection if request.method not in {"GET", "HEAD", "OPTIONS", "TRACE"}: + check_csrf_origin(request, raises=True) check_csrf_token(request, val, raises=True) return view(context, request) wrapped_view = csrf_view |
