From 7df825dcb19d03608ded3c5c84d1552d1232647c Mon Sep 17 00:00:00 2001 From: Chris McDonough Date: Thu, 17 Sep 2009 20:42:01 +0000 Subject: Move view-related helper functions from zcml.py to view.py. --- repoze/bfg/tests/test_view.py | 344 ++++++++++++++++++++++++++++++++++++++++++ repoze/bfg/tests/test_zcml.py | 344 ------------------------------------------ repoze/bfg/view.py | 114 +++++++++++++- repoze/bfg/zcml.py | 116 +------------- 4 files changed, 457 insertions(+), 461 deletions(-) diff --git a/repoze/bfg/tests/test_view.py b/repoze/bfg/tests/test_view.py index c4bbb719c..959ad939b 100644 --- a/repoze/bfg/tests/test_view.py +++ b/repoze/bfg/tests/test_view.py @@ -1266,6 +1266,329 @@ class Test_rendered_response(unittest.TestCase): 'repoze.bfg.tests:fixtures/minimal.txt', response, request=request) self.assertEqual(result.cache_control.max_age, 100) +class TestDeriveView(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, view, *arg, **kw): + from repoze.bfg.view import derive_view + return derive_view(view, *arg, **kw) + + def _registerLogger(self): + from repoze.bfg.interfaces import ILogger + from zope.component import getSiteManager + logger = DummyLogger() + sm = getSiteManager() + sm.registerUtility(logger, ILogger, 'repoze.bfg.debug') + return logger + + def _registerSettings(self, **d): + from repoze.bfg.interfaces import ISettings + from zope.component import getSiteManager + settings = DummySettings(d) + sm = getSiteManager() + sm.registerUtility(settings, ISettings) + + def _registerSecurityPolicy(self, permissive): + from repoze.bfg.interfaces import IAuthenticationPolicy + from repoze.bfg.interfaces import IAuthorizationPolicy + from zope.component import getSiteManager + policy = DummySecurityPolicy(permissive) + sm = getSiteManager() + sm.registerUtility(policy, IAuthenticationPolicy) + sm.registerUtility(policy, IAuthorizationPolicy) + + def test_view_as_function_context_and_request(self): + def view(context, request): + return 'OK' + result = self._callFUT(view) + self.failUnless(result is view) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(view(None, None), 'OK') + + def test_view_as_function_requestonly(self): + def view(request): + return 'OK' + result = self._callFUT(view) + self.failIf(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_as_newstyle_class_context_and_request(self): + class view(object): + def __init__(self, context, request): + pass + def __call__(self): + return 'OK' + result = self._callFUT(view) + self.failIf(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_as_newstyle_class_requestonly(self): + class view(object): + def __init__(self, context, request): + pass + def __call__(self): + return 'OK' + result = self._callFUT(view) + self.failIf(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_as_oldstyle_class_context_and_request(self): + class view: + def __init__(self, context, request): + pass + def __call__(self): + return 'OK' + result = self._callFUT(view) + self.failIf(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_as_oldstyle_class_requestonly(self): + class view: + def __init__(self, context, request): + pass + def __call__(self): + return 'OK' + result = self._callFUT(view) + self.failIf(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_as_instance_context_and_request(self): + class View: + def __call__(self, context, request): + return 'OK' + view = View() + result = self._callFUT(view) + self.failUnless(result is view) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_as_instance_requestonly(self): + class View: + def __call__(self, request): + return 'OK' + view = View() + result = self._callFUT(view) + self.failIf(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.failUnless('instance' in result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), 'OK') + + def test_view_with_debug_authorization_no_authpol(self): + def view(context, request): + return 'OK' + self._registerSettings(debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + result = self._callFUT(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + request = DummyRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), 'OK') + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): Allowed " + "(no authorization policy in use)") + + def test_view_with_debug_authorization_no_permission(self): + def view(context, request): + return 'OK' + self._registerSettings(debug_authorization=True, reload_templates=True) + self._registerSecurityPolicy(True) + logger = self._registerLogger() + result = self._callFUT(view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.failIf(hasattr(result, '__call_permissive__')) + request = DummyRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), 'OK') + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): Allowed (" + "no permission registered)") + + def test_view_with_debug_authorization_permission_authpol_permitted(self): + def view(context, request): + return 'OK' + self._registerSettings(debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(True) + result = self._callFUT(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertEqual(result.__call_permissive__, view) + request = DummyRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), 'OK') + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): True") + + def test_view_with_debug_authorization_permission_authpol_denied(self): + from repoze.bfg.security import Unauthorized + def view(context, request): + """ """ + self._registerSettings(debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(False) + result = self._callFUT(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertEqual(result.__call_permissive__, view) + request = DummyRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertRaises(Unauthorized, result, None, request) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): False") + + def test_view_with_debug_authorization_permission_authpol_denied2(self): + def view(context, request): + """ """ + self._registerSettings(debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(False) + result = self._callFUT(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + request = DummyRequest() + request.view_name = 'view_name' + request.url = 'url' + permitted = result.__permitted__(None, None) + self.assertEqual(permitted, False) + + def test_view_with_predicates_all(self): + def view(context, request): + return '123' + predicates = [] + def predicate1(context, request): + predicates.append(True) + return True + def predicate2(context, request): + predicates.append(True) + return True + result = self._callFUT(view, predicates=[predicate1, predicate2]) + request = DummyRequest() + request.method = 'POST' + next = result(None, None) + self.assertEqual(next, '123') + self.assertEqual(predicates, [True, True]) + + def test_view_with_predicates_notall(self): + from repoze.bfg.view import NotFound + def view(context, request): + """ """ + predicates = [] + def predicate1(context, request): + predicates.append(True) + return True + def predicate2(context, request): + predicates.append(True) + return False + result = self._callFUT(view, predicates=[predicate1, predicate2]) + request = DummyRequest() + request.method = 'POST' + self.assertRaises(NotFound, result, None, None) + self.assertEqual(predicates, [True, True]) + + def test_view_with_predicates_checker(self): + def view(context, request): + """ """ + predicates = [] + def predicate1(context, request): + predicates.append(True) + return True + def predicate2(context, request): + predicates.append(True) + return True + result = self._callFUT(view, predicates=[predicate1, predicate2]) + request = DummyRequest() + request.method = 'POST' + next = result.__predicated__(None, None) + self.assertEqual(next, True) + self.assertEqual(predicates, [True, True]) + + def test_view_with_wrapper_viewname(self): + from webob import Response + from zope.component import getSiteManager + from repoze.bfg.interfaces import IView + inner_response = Response('OK') + def inner_view(context, request): + return inner_response + def outer_view(context, request): + self.assertEqual(request.wrapped_response, inner_response) + self.assertEqual(request.wrapped_body, inner_response.body) + self.assertEqual(request.wrapped_view, inner_view) + return Response('outer ' + request.wrapped_body) + sm = getSiteManager() + sm.registerAdapter(outer_view, (None, None), IView, 'owrap') + result = self._callFUT(inner_view, viewname='inner', + wrapper_viewname='owrap') + self.failIf(result is inner_view) + self.assertEqual(inner_view.__module__, result.__module__) + self.assertEqual(inner_view.__doc__, result.__doc__) + request = DummyRequest() + response = result(None, request) + self.assertEqual(response.body, 'outer OK') + + def test_view_with_wrapper_viewname_notfound(self): + from webob import Response + inner_response = Response('OK') + def inner_view(context, request): + return inner_response + request = DummyRequest() + wrapped = self._callFUT( + inner_view, viewname='inner', wrapper_viewname='owrap') + result = self.assertRaises(ValueError, wrapped, None, request) + +class TestAll(unittest.TestCase): + def test_it(self): + from repoze.bfg.view import all + self.assertEqual(all([True, True]), True) + self.assertEqual(all([False, False]), False) + self.assertEqual(all([False, True]), False) + + class DummyContext: pass @@ -1296,3 +1619,24 @@ class DummyResponse: else: self.app_iter = [body] +class DummyLogger: + def __init__(self): + self.messages = [] + def info(self, msg): + self.messages.append(msg) + warn = info + debug = info + +class DummySettings(dict): + def __getattr__(self, name): + return self[name] + +class DummySecurityPolicy: + def __init__(self, permitted=True): + self.permitted = permitted + + def effective_principals(self, request): + return [] + + def permits(self, context, principals, permission): + return self.permitted diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py index 6161f6567..0dcda485f 100644 --- a/repoze/bfg/tests/test_zcml.py +++ b/repoze/bfg/tests/test_zcml.py @@ -1080,321 +1080,6 @@ class TestACLAuthorizationPolicyDirective(unittest.TestCase): policy = getUtility(IAuthorizationPolicy) self.assertEqual(policy.__class__, ACLAuthorizationPolicy) -class TestDeriveView(unittest.TestCase): - def setUp(self): - cleanUp() - - def tearDown(self): - cleanUp() - - def _callFUT(self, view, *arg, **kw): - from repoze.bfg.zcml import derive_view - return derive_view(view, *arg, **kw) - - def _registerLogger(self): - from repoze.bfg.interfaces import ILogger - from zope.component import getSiteManager - logger = DummyLogger() - sm = getSiteManager() - sm.registerUtility(logger, ILogger, 'repoze.bfg.debug') - return logger - - def _registerSettings(self, **d): - from repoze.bfg.interfaces import ISettings - from zope.component import getSiteManager - settings = DummySettings(d) - sm = getSiteManager() - sm.registerUtility(settings, ISettings) - - def _registerSecurityPolicy(self, permissive): - from repoze.bfg.interfaces import IAuthenticationPolicy - from repoze.bfg.interfaces import IAuthorizationPolicy - from zope.component import getSiteManager - policy = DummySecurityPolicy(permissive) - sm = getSiteManager() - sm.registerUtility(policy, IAuthenticationPolicy) - sm.registerUtility(policy, IAuthorizationPolicy) - - def test_view_as_function_context_and_request(self): - def view(context, request): - return 'OK' - result = self._callFUT(view) - self.failUnless(result is view) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(view(None, None), 'OK') - - def test_view_as_function_requestonly(self): - def view(request): - return 'OK' - result = self._callFUT(view) - self.failIf(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_as_newstyle_class_context_and_request(self): - class view(object): - def __init__(self, context, request): - pass - def __call__(self): - return 'OK' - result = self._callFUT(view) - self.failIf(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_as_newstyle_class_requestonly(self): - class view(object): - def __init__(self, context, request): - pass - def __call__(self): - return 'OK' - result = self._callFUT(view) - self.failIf(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_as_oldstyle_class_context_and_request(self): - class view: - def __init__(self, context, request): - pass - def __call__(self): - return 'OK' - result = self._callFUT(view) - self.failIf(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_as_oldstyle_class_requestonly(self): - class view: - def __init__(self, context, request): - pass - def __call__(self): - return 'OK' - result = self._callFUT(view) - self.failIf(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_as_instance_context_and_request(self): - class View: - def __call__(self, context, request): - return 'OK' - view = View() - result = self._callFUT(view) - self.failUnless(result is view) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_as_instance_requestonly(self): - class View: - def __call__(self, request): - return 'OK' - view = View() - result = self._callFUT(view) - self.failIf(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.failUnless('instance' in result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), 'OK') - - def test_view_with_debug_authorization_no_authpol(self): - def view(context, request): - return 'OK' - self._registerSettings(debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - result = self._callFUT(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - request = DummyRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), 'OK') - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)") - - def test_view_with_debug_authorization_no_permission(self): - def view(context, request): - return 'OK' - self._registerSettings(debug_authorization=True, reload_templates=True) - self._registerSecurityPolicy(True) - logger = self._registerLogger() - result = self._callFUT(view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.failIf(hasattr(result, '__call_permissive__')) - request = DummyRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), 'OK') - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed (" - "no permission registered)") - - def test_view_with_debug_authorization_permission_authpol_permitted(self): - def view(context, request): - return 'OK' - self._registerSettings(debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(True) - result = self._callFUT(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertEqual(result.__call_permissive__, view) - request = DummyRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), 'OK') - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): True") - - def test_view_with_debug_authorization_permission_authpol_denied(self): - from repoze.bfg.security import Unauthorized - def view(context, request): - """ """ - self._registerSettings(debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(False) - result = self._callFUT(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertEqual(result.__call_permissive__, view) - request = DummyRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertRaises(Unauthorized, result, None, request) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): False") - - def test_view_with_debug_authorization_permission_authpol_denied2(self): - def view(context, request): - """ """ - self._registerSettings(debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(False) - result = self._callFUT(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - request = DummyRequest() - request.view_name = 'view_name' - request.url = 'url' - permitted = result.__permitted__(None, None) - self.assertEqual(permitted, False) - - def test_view_with_predicates_all(self): - def view(context, request): - return '123' - predicates = [] - def predicate1(context, request): - predicates.append(True) - return True - def predicate2(context, request): - predicates.append(True) - return True - result = self._callFUT(view, predicates=[predicate1, predicate2]) - request = DummyRequest() - request.method = 'POST' - next = result(None, None) - self.assertEqual(next, '123') - self.assertEqual(predicates, [True, True]) - - def test_view_with_predicates_notall(self): - from repoze.bfg.view import NotFound - def view(context, request): - """ """ - predicates = [] - def predicate1(context, request): - predicates.append(True) - return True - def predicate2(context, request): - predicates.append(True) - return False - result = self._callFUT(view, predicates=[predicate1, predicate2]) - request = DummyRequest() - request.method = 'POST' - self.assertRaises(NotFound, result, None, None) - self.assertEqual(predicates, [True, True]) - - def test_view_with_predicates_checker(self): - def view(context, request): - """ """ - predicates = [] - def predicate1(context, request): - predicates.append(True) - return True - def predicate2(context, request): - predicates.append(True) - return True - result = self._callFUT(view, predicates=[predicate1, predicate2]) - request = DummyRequest() - request.method = 'POST' - next = result.__predicated__(None, None) - self.assertEqual(next, True) - self.assertEqual(predicates, [True, True]) - - def test_view_with_wrapper_viewname(self): - from webob import Response - from zope.component import getSiteManager - from repoze.bfg.interfaces import IView - inner_response = Response('OK') - def inner_view(context, request): - return inner_response - def outer_view(context, request): - self.assertEqual(request.wrapped_response, inner_response) - self.assertEqual(request.wrapped_body, inner_response.body) - self.assertEqual(request.wrapped_view, inner_view) - return Response('outer ' + request.wrapped_body) - sm = getSiteManager() - sm.registerAdapter(outer_view, (None, None), IView, 'owrap') - result = self._callFUT(inner_view, viewname='inner', - wrapper_viewname='owrap') - self.failIf(result is inner_view) - self.assertEqual(inner_view.__module__, result.__module__) - self.assertEqual(inner_view.__doc__, result.__doc__) - request = DummyRequest() - response = result(None, request) - self.assertEqual(response.body, 'outer OK') - - def test_view_with_wrapper_viewname_notfound(self): - from webob import Response - inner_response = Response('OK') - def inner_view(context, request): - return inner_response - request = DummyRequest() - wrapped = self._callFUT( - inner_view, viewname='inner', wrapper_viewname='owrap') - result = self.assertRaises(ValueError, wrapped, None, request) - class TestConnectRouteFunction(unittest.TestCase): def setUp(self): cleanUp() @@ -2219,13 +1904,6 @@ class TestExcludeFunction(unittest.TestCase): self.assertEqual(self._callFUT('.foo'), True) self.assertEqual(self._callFUT('foo'), False) -class TestAll(unittest.TestCase): - def test_it(self): - from repoze.bfg.zcml import all - self.assertEqual(all([True, True]), True) - self.assertEqual(all([False, False]), False) - self.assertEqual(all([False, True]), False) - class DummyModule: __path__ = "foo" __name__ = "dummy" @@ -2282,14 +1960,6 @@ from zope.interface import Interface class IDummy(Interface): pass -class DummyLogger: - def __init__(self): - self.messages = [] - def info(self, msg): - self.messages.append(msg) - warn = info - debug = info - class DummyRequest: subpath = () def __init__(self, environ=None): @@ -2315,17 +1985,3 @@ class DummyPackage: def __init__(self, name): self.__name__ = name -class DummySettings(dict): - def __getattr__(self, name): - return self[name] - -class DummySecurityPolicy: - def __init__(self, permitted=True): - self.permitted = permitted - - def effective_principals(self, request): - return [] - - def permits(self, context, principals, permission): - return self.permitted - diff --git a/repoze/bfg/view.py b/repoze/bfg/view.py index e5a6e5398..12c8b6f46 100644 --- a/repoze/bfg/view.py +++ b/repoze/bfg/view.py @@ -25,6 +25,9 @@ from zope.interface import implements from zope.deprecation import deprecated +from repoze.bfg.interfaces import IAuthenticationPolicy +from repoze.bfg.interfaces import IAuthorizationPolicy +from repoze.bfg.interfaces import ILogger from repoze.bfg.interfaces import IResponseFactory from repoze.bfg.interfaces import IRendererFactory from repoze.bfg.interfaces import IView @@ -32,11 +35,20 @@ from repoze.bfg.interfaces import IMultiView from repoze.bfg.interfaces import ITemplateRenderer from repoze.bfg.path import caller_package - +from repoze.bfg.security import Unauthorized +from repoze.bfg.settings import get_settings from repoze.bfg.static import PackageURLParser - from repoze.bfg.renderers import renderer_from_name +try: + all = all +except NameError: # pragma: no cover + def all(iterable): + for element in iterable: + if not element: + return False + return True + deprecated('view_execution_permitted', "('from repoze.bfg.view import view_execution_permitted' was " "deprecated as of repoze.bfg 1.0; instead use 'from " @@ -608,3 +620,101 @@ def decorate_view(wrapped_view, original_view): pass return True return False + +def derive_view(original_view, permission=None, predicates=(), attr=None, + renderer=None, wrapper_viewname=None, viewname=None): + mapped_view = map_view(original_view, attr, renderer) + owrapped_view = owrap_view(mapped_view, viewname, wrapper_viewname) + secured_view = secure_view(owrapped_view, permission) + debug_view = authdebug_view(secured_view, permission) + derived_view = predicate_wrap(debug_view, predicates) + return derived_view + +def owrap_view(view, viewname, wrapper_viewname): + if not wrapper_viewname: + return view + def _owrapped_view(context, request): + response = view(context, request) + request.wrapped_response = response + request.wrapped_body = response.body + request.wrapped_view = view + wrapped_response = render_view_to_response(context, request, + wrapper_viewname) + if wrapped_response is None: + raise ValueError( + 'No wrapper view named %r found when executing view named %r' % + (wrapper_viewname, viewname)) + return wrapped_response + decorate_view(_owrapped_view, view) + return _owrapped_view + +def predicate_wrap(view, predicates): + if not predicates: + return view + def _wrapped(context, request): + if all((predicate(context, request) for predicate in predicates)): + return view(context, request) + raise NotFound('predicate mismatch for view %s' % view) + def checker(context, request): + return all((predicate(context, request) for predicate in predicates)) + _wrapped.__predicated__ = checker + decorate_view(_wrapped, view) + return _wrapped + +def secure_view(view, permission): + wrapped_view = view + authn_policy = queryUtility(IAuthenticationPolicy) + authz_policy = queryUtility(IAuthorizationPolicy) + if authn_policy and authz_policy and (permission is not None): + def _secured_view(context, request): + principals = authn_policy.effective_principals(request) + if authz_policy.permits(context, principals, permission): + return view(context, request) + msg = getattr(request, 'authdebug_message', + 'Unauthorized: %s failed permission check' % view) + raise Unauthorized(msg) + _secured_view.__call_permissive__ = view + def _permitted(context, request): + principals = authn_policy.effective_principals(request) + return authz_policy.permits(context, principals, permission) + _secured_view.__permitted__ = _permitted + wrapped_view = _secured_view + decorate_view(wrapped_view, view) + + return wrapped_view + +def authdebug_view(view, permission): + wrapped_view = view + authn_policy = queryUtility(IAuthenticationPolicy) + authz_policy = queryUtility(IAuthorizationPolicy) + settings = get_settings() + debug_authorization = getattr(settings, 'debug_authorization', False) + if debug_authorization: + def _authdebug_view(context, request): + view_name = getattr(request, 'view_name', None) + + if authn_policy and authz_policy: + if permission is None: + msg = 'Allowed (no permission registered)' + else: + principals = authn_policy.effective_principals(request) + msg = str(authz_policy.permits(context, principals, + permission)) + else: + msg = 'Allowed (no authorization policy in use)' + + view_name = getattr(request, 'view_name', None) + url = getattr(request, 'url', None) + msg = ('debug_authorization of url %s (view name %r against ' + 'context %r): %s' % (url, view_name, context, msg)) + logger = queryUtility(ILogger, 'repoze.bfg.debug') + logger and logger.debug(msg) + if request is not None: + request.authdebug_message = msg + return view(context, request) + + wrapped_view = _authdebug_view + decorate_view(wrapped_view, view) + + return wrapped_view + diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py index 8ad0c3ec9..bbfc8164e 100644 --- a/repoze/bfg/zcml.py +++ b/repoze/bfg/zcml.py @@ -35,7 +35,6 @@ from repoze.bfg.interfaces import IAuthorizationPolicy from repoze.bfg.interfaces import ISecuredView from repoze.bfg.interfaces import IMultiView from repoze.bfg.interfaces import IView -from repoze.bfg.interfaces import ILogger from repoze.bfg.interfaces import IPackageOverrides from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IRouteRequest @@ -47,32 +46,16 @@ from repoze.bfg.resource import PackageOverrides from repoze.bfg.request import create_route_request_factory -from repoze.bfg.security import Unauthorized - -from repoze.bfg.settings import get_settings - from repoze.bfg.static import StaticRootFactory from repoze.bfg.traversal import find_interface from repoze.bfg.view import static as static_view -from repoze.bfg.view import NotFound from repoze.bfg.view import MultiView -from repoze.bfg.view import map_view -from repoze.bfg.view import decorate_view -from repoze.bfg.view import render_view_to_response +from repoze.bfg.view import derive_view import martian -try: - all = all -except NameError: # pragma: no cover - def all(iterable): - for element in iterable: - if not element: - return False - return True - def view( _context, permission=None, @@ -253,103 +236,6 @@ def notfound(_context, view): def forbidden(_context, view): view_utility(_context, view, IForbiddenView) -def derive_view(original_view, permission=None, predicates=(), attr=None, - renderer=None, wrapper_viewname=None, viewname=None): - mapped_view = map_view(original_view, attr, renderer) - owrapped_view = owrap_view(mapped_view, viewname, wrapper_viewname) - secured_view = secure_view(owrapped_view, permission) - debug_view = authdebug_view(secured_view, permission) - derived_view = predicate_wrap(debug_view, predicates) - return derived_view - -def owrap_view(view, viewname, wrapper_viewname): - if not wrapper_viewname: - return view - def _owrapped_view(context, request): - response = view(context, request) - request.wrapped_response = response - request.wrapped_body = response.body - request.wrapped_view = view - wrapped_response = render_view_to_response(context, request, - wrapper_viewname) - if wrapped_response is None: - raise ValueError( - 'No wrapper view named %r found when executing view named %r' % - (wrapper_viewname, viewname)) - return wrapped_response - decorate_view(_owrapped_view, view) - return _owrapped_view - -def predicate_wrap(view, predicates): - if not predicates: - return view - def _wrapped(context, request): - if all((predicate(context, request) for predicate in predicates)): - return view(context, request) - raise NotFound('predicate mismatch for view %s' % view) - def checker(context, request): - return all((predicate(context, request) for predicate in predicates)) - _wrapped.__predicated__ = checker - decorate_view(_wrapped, view) - return _wrapped - -def secure_view(view, permission): - wrapped_view = view - authn_policy = queryUtility(IAuthenticationPolicy) - authz_policy = queryUtility(IAuthorizationPolicy) - if authn_policy and authz_policy and (permission is not None): - def _secured_view(context, request): - principals = authn_policy.effective_principals(request) - if authz_policy.permits(context, principals, permission): - return view(context, request) - msg = getattr(request, 'authdebug_message', - 'Unauthorized: %s failed permission check' % view) - raise Unauthorized(msg) - _secured_view.__call_permissive__ = view - def _permitted(context, request): - principals = authn_policy.effective_principals(request) - return authz_policy.permits(context, principals, permission) - _secured_view.__permitted__ = _permitted - wrapped_view = _secured_view - decorate_view(wrapped_view, view) - - return wrapped_view - -def authdebug_view(view, permission): - wrapped_view = view - authn_policy = queryUtility(IAuthenticationPolicy) - authz_policy = queryUtility(IAuthorizationPolicy) - settings = get_settings() - debug_authorization = getattr(settings, 'debug_authorization', False) - if debug_authorization: - def _authdebug_view(context, request): - view_name = getattr(request, 'view_name', None) - - if authn_policy and authz_policy: - if permission is None: - msg = 'Allowed (no permission registered)' - else: - principals = authn_policy.effective_principals(request) - msg = str(authz_policy.permits(context, principals, - permission)) - else: - msg = 'Allowed (no authorization policy in use)' - - view_name = getattr(request, 'view_name', None) - url = getattr(request, 'url', None) - msg = ('debug_authorization of url %s (view name %r against ' - 'context %r): %s' % (url, view_name, context, msg)) - logger = getUtility(ILogger, 'repoze.bfg.debug') - logger and logger.debug(msg) - if request is not None: - request.authdebug_message = msg - return view(context, request) - - wrapped_view = _authdebug_view - decorate_view(wrapped_view, view) - - return wrapped_view - def scan(_context, package, martian=martian): # martian overrideable only for unit tests module_grokker = martian.ModuleGrokker() -- cgit v1.2.3