From edf7ef0c379361f3a056014b068a01657decfb76 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 9 Mar 2019 12:02:24 -0800 Subject: Implement secured view deriver. Some tests still need fixing. --- src/pyramid/viewderivers.py | 36 ++++++------ tests/test_viewderivers.py | 134 ++++++++------------------------------------ 2 files changed, 38 insertions(+), 132 deletions(-) diff --git a/src/pyramid/viewderivers.py b/src/pyramid/viewderivers.py index 181cc9e5c..22659d2a3 100644 --- a/src/pyramid/viewderivers.py +++ b/src/pyramid/viewderivers.py @@ -7,12 +7,11 @@ from pyramid.csrf import check_csrf_origin, check_csrf_token from pyramid.response import Response from pyramid.interfaces import ( - IAuthenticationPolicy, - IAuthorizationPolicy, IDefaultCSRFOptions, IDefaultPermission, IDebugLogger, IResponse, + ISecurityPolicy, IViewMapper, IViewMapperFactory, ) @@ -308,19 +307,17 @@ def _secured_view(view, info): # permission, replacing it with no permission at all permission = None - wrapped_view = view - authn_policy = info.registry.queryUtility(IAuthenticationPolicy) - authz_policy = info.registry.queryUtility(IAuthorizationPolicy) + policy = info.registry.queryUtility(ISecurityPolicy) # no-op on exception-only views without an explicit permission if explicit_val is None and info.exception_only: return view - if authn_policy and authz_policy and (permission is not None): + if policy and (permission is not None): def permitted(context, request): - principals = authn_policy.effective_principals(request) - return authz_policy.permits(context, principals, permission) + identity = policy.identify(request) + return policy.permits(request, context, identity, permission) def secured_view(context, request): result = permitted(context, request) @@ -334,12 +331,12 @@ def _secured_view(view, info): ) raise HTTPForbidden(msg, result=result) - wrapped_view = secured_view - wrapped_view.__call_permissive__ = view - wrapped_view.__permitted__ = permitted - wrapped_view.__permission__ = permission - - return wrapped_view + secured_view.__call_permissive__ = view + secured_view.__permitted__ = permitted + secured_view.__permission__ = permission + return secured_view + else: + return view def _authdebug_view(view, info): @@ -348,8 +345,7 @@ def _authdebug_view(view, info): permission = explicit_val = info.options.get('permission') if permission is None: permission = info.registry.queryUtility(IDefaultPermission) - authn_policy = info.registry.queryUtility(IAuthenticationPolicy) - authz_policy = info.registry.queryUtility(IAuthorizationPolicy) + policy = info.registry.queryUtility(ISecurityPolicy) logger = info.registry.queryUtility(IDebugLogger) # no-op on exception-only views without an explicit permission @@ -361,18 +357,18 @@ def _authdebug_view(view, info): def authdebug_view(context, request): view_name = getattr(request, 'view_name', None) - if authn_policy and authz_policy: + if policy: if permission is NO_PERMISSION_REQUIRED: msg = 'Allowed (NO_PERMISSION_REQUIRED)' elif permission is None: msg = 'Allowed (no permission registered)' else: - principals = authn_policy.effective_principals(request) + identity = policy.identify(request) msg = str( - authz_policy.permits(context, principals, permission) + policy.permits(request, context, identity, permission) ) else: - msg = 'Allowed (no authorization policy in use)' + msg = 'Allowed (no security policy in use)' view_name = getattr(request, 'view_name', None) url = getattr(request, 'url', None) diff --git a/tests/test_viewderivers.py b/tests/test_viewderivers.py index f01cb490e..9a61ea9f1 100644 --- a/tests/test_viewderivers.py +++ b/tests/test_viewderivers.py @@ -28,12 +28,11 @@ class TestDeriveView(unittest.TestCase): return logger def _registerSecurityPolicy(self, permissive): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy + from pyramid.interfaces import ISecurityPolicy policy = DummySecurityPolicy(permissive) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) + self.config.registry.registerUtility(policy, ISecurityPolicy) + return policy def test_function_returns_non_adaptable(self): def view(request): @@ -421,7 +420,7 @@ class TestDeriveView(unittest.TestCase): self.assertFalse(hasattr(result, '__call_permissive__')) self.assertEqual(result(None, None), response) - def test_with_debug_authorization_no_authpol(self): + def test_with_debug_authorization_no_security_policy(self): response = DummyResponse() view = lambda *arg: response self.config.registry.settings = dict( @@ -442,59 +441,7 @@ class TestDeriveView(unittest.TestCase): logger.messages[0], "debug_authorization of url url (view name " "'view_name' against context None): Allowed " - "(no authorization policy in use)", - ) - - def test_with_debug_authorization_authn_policy_no_authz_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict(debug_authorization=True) - from pyramid.interfaces import IAuthenticationPolicy - - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual( - logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)", - ) - - def test_with_debug_authorization_authz_policy_no_authn_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict(debug_authorization=True) - from pyramid.interfaces import IAuthorizationPolicy - - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual( - logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)", + "(no security policy in use)", ) def test_with_debug_authorization_no_permission(self): @@ -665,32 +612,11 @@ class TestDeriveView(unittest.TestCase): "'view_name' against context Exception()): True", ) - def test_secured_view_authn_policy_no_authz_policy(self): + def test_secured_view_authn_policy_no_security_policy(self): response = DummyResponse() view = lambda *arg: response self.config.registry.settings = {} - from pyramid.interfaces import IAuthenticationPolicy - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - - def test_secured_view_authz_policy_no_authn_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = {} - from pyramid.interfaces import IAuthorizationPolicy - - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) result = self.config._derive_view(view, permission='view') self.assertEqual(view.__module__, result.__module__) self.assertEqual(view.__doc__, result.__doc__) @@ -702,53 +628,41 @@ class TestDeriveView(unittest.TestCase): self.assertEqual(result(None, request), response) def test_secured_view_raises_forbidden_no_name(self): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy from pyramid.httpexceptions import HTTPForbidden response = DummyResponse() view = lambda *arg: response self.config.registry.settings = {} - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) + self._registerSecurityPolicy(False) result = self.config._derive_view(view, permission='view') request = self._makeRequest() request.view_name = 'view_name' request.url = 'url' - try: + with self.assertRaises(HTTPForbidden) as cm: result(None, request) - except HTTPForbidden as e: - self.assertEqual( - e.message, 'Unauthorized: failed permission check' - ) - else: # pragma: no cover - raise AssertionError + self.assertEqual( + cm.exception.message, + 'Unauthorized: failed permission check', + ) def test_secured_view_raises_forbidden_with_name(self): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy from pyramid.httpexceptions import HTTPForbidden def myview(request): # pragma: no cover pass self.config.registry.settings = {} - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) + self._registerSecurityPolicy(False) result = self.config._derive_view(myview, permission='view') request = self._makeRequest() request.view_name = 'view_name' request.url = 'url' - try: + with self.assertRaises(HTTPForbidden) as cm: result(None, request) - except HTTPForbidden as e: - self.assertEqual( - e.message, 'Unauthorized: myview failed permission check' - ) - else: # pragma: no cover - raise AssertionError + self.assertEqual( + cm.exception.message, + 'Unauthorized: myview failed permission check', + ) def test_secured_view_skipped_by_default_on_exception_view(self): from pyramid.request import Request @@ -794,12 +708,8 @@ class TestDeriveView(unittest.TestCase): app = self.config.make_wsgi_app() request = Request.blank('/foo', base_url='http://example.com') request.method = 'POST' - try: + with self.assertRaises(HTTPForbidden): request.get_response(app) - except HTTPForbidden: - pass - else: # pragma: no cover - raise AssertionError def test_secured_view_passed_on_explicit_exception_view(self): from pyramid.request import Request @@ -2130,10 +2040,10 @@ class DummySecurityPolicy: def __init__(self, permitted=True): self.permitted = permitted - def effective_principals(self, request): - return [] + def identify(self, request): + return 123 - def permits(self, context, principals, permission): + def permits(self, request, context, identity, permission): return self.permitted -- cgit v1.2.3