From ecc9d82ef6bd50a08dcbee7286ba4581d3caa902 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Wed, 6 Apr 2016 21:22:36 -0500 Subject: rename pyramid.config.derivations to pyramid.viewderivers --- pyramid/config/derivations.py | 453 -------- pyramid/config/views.py | 8 +- pyramid/tests/test_config/test_derivations.py | 1376 ------------------------- pyramid/tests/test_viewderivers.py | 1376 +++++++++++++++++++++++++ pyramid/viewderivers.py | 453 ++++++++ 5 files changed, 1833 insertions(+), 1833 deletions(-) delete mode 100644 pyramid/config/derivations.py delete mode 100644 pyramid/tests/test_config/test_derivations.py create mode 100644 pyramid/tests/test_viewderivers.py create mode 100644 pyramid/viewderivers.py diff --git a/pyramid/config/derivations.py b/pyramid/config/derivations.py deleted file mode 100644 index 99baf46f9..000000000 --- a/pyramid/config/derivations.py +++ /dev/null @@ -1,453 +0,0 @@ -import inspect - -from zope.interface import ( - implementer, - provider, - ) - -from pyramid.security import NO_PERMISSION_REQUIRED -from pyramid.response import Response - -from pyramid.interfaces import ( - IAuthenticationPolicy, - IAuthorizationPolicy, - IDebugLogger, - IResponse, - IViewMapper, - IViewMapperFactory, - ) - -from pyramid.compat import ( - is_bound_method, - is_unbound_method, - ) - -from pyramid.config.util import ( - DEFAULT_PHASH, - MAX_ORDER, - takes_one_arg, - ) - -from pyramid.exceptions import ( - ConfigurationError, - PredicateMismatch, - ) -from pyramid.httpexceptions import HTTPForbidden -from pyramid.util import object_description -from pyramid.view import render_view_to_response -from pyramid import renderers - - -def view_description(view): - try: - return view.__text__ - except AttributeError: - # custom view mappers might not add __text__ - return object_description(view) - -def requestonly(view, attr=None): - return takes_one_arg(view, attr=attr, argname='request') - -@implementer(IViewMapper) -@provider(IViewMapperFactory) -class DefaultViewMapper(object): - def __init__(self, **kw): - self.attr = kw.get('attr') - - def __call__(self, view): - if is_unbound_method(view) and self.attr is None: - raise ConfigurationError(( - 'Unbound method calls are not supported, please set the class ' - 'as your `view` and the method as your `attr`' - )) - - if inspect.isclass(view): - view = self.map_class(view) - else: - view = self.map_nonclass(view) - return view - - def map_class(self, view): - ronly = requestonly(view, self.attr) - if ronly: - mapped_view = self.map_class_requestonly(view) - else: - mapped_view = self.map_class_native(view) - mapped_view.__text__ = 'method %s of %s' % ( - self.attr or '__call__', object_description(view)) - return mapped_view - - def map_nonclass(self, view): - # We do more work here than appears necessary to avoid wrapping the - # view unless it actually requires wrapping (to avoid function call - # overhead). - mapped_view = view - ronly = requestonly(view, self.attr) - if ronly: - mapped_view = self.map_nonclass_requestonly(view) - elif self.attr: - mapped_view = self.map_nonclass_attr(view) - if inspect.isroutine(mapped_view): - # This branch will be true if the view is a function or a method. - # We potentially mutate an unwrapped object here if it's a - # function. We do this to avoid function call overhead of - # injecting another wrapper. However, we must wrap if the - # function is a bound method because we can't set attributes on a - # bound method. - if is_bound_method(view): - _mapped_view = mapped_view - def mapped_view(context, request): - return _mapped_view(context, request) - if self.attr is not None: - mapped_view.__text__ = 'attr %s of %s' % ( - self.attr, object_description(view)) - else: - mapped_view.__text__ = object_description(view) - return mapped_view - - def map_class_requestonly(self, view): - # its a class that has an __init__ which only accepts request - attr = self.attr - def _class_requestonly_view(context, request): - inst = view(request) - request.__view__ = inst - if attr is None: - response = inst() - else: - response = getattr(inst, attr)() - return response - return _class_requestonly_view - - def map_class_native(self, view): - # its a class that has an __init__ which accepts both context and - # request - attr = self.attr - def _class_view(context, request): - inst = view(context, request) - request.__view__ = inst - if attr is None: - response = inst() - else: - response = getattr(inst, attr)() - return response - return _class_view - - def map_nonclass_requestonly(self, view): - # its a function that has a __call__ which accepts only a single - # request argument - attr = self.attr - def _requestonly_view(context, request): - if attr is None: - response = view(request) - else: - response = getattr(view, attr)(request) - return response - return _requestonly_view - - def map_nonclass_attr(self, view): - # its a function that has a __call__ which accepts both context and - # request, but still has an attr - def _attr_view(context, request): - response = getattr(view, self.attr)(context, request) - return response - return _attr_view - - -def wraps_view(wrapper): - def inner(view, info): - wrapper_view = wrapper(view, info) - return preserve_view_attrs(view, wrapper_view) - return inner - -def preserve_view_attrs(view, wrapper): - if view is None: - return wrapper - - if wrapper is view: - return view - - original_view = getattr(view, '__original_view__', None) - - if original_view is None: - original_view = view - - wrapper.__wraps__ = view - wrapper.__original_view__ = original_view - wrapper.__module__ = view.__module__ - wrapper.__doc__ = view.__doc__ - - try: - wrapper.__name__ = view.__name__ - except AttributeError: - wrapper.__name__ = repr(view) - - # attrs that may not exist on "view", but, if so, must be attached to - # "wrapped view" - for attr in ('__permitted__', '__call_permissive__', '__permission__', - '__predicated__', '__predicates__', '__accept__', - '__order__', '__text__'): - try: - setattr(wrapper, attr, getattr(view, attr)) - except AttributeError: - pass - - return wrapper - -def mapped_view(view, info): - mapper = info.options.get('mapper') - if mapper is None: - mapper = getattr(view, '__view_mapper__', None) - if mapper is None: - mapper = info.registry.queryUtility(IViewMapperFactory) - if mapper is None: - mapper = DefaultViewMapper - - mapped_view = mapper(**info.options)(view) - return mapped_view - -mapped_view.options = ('mapper', 'attr') - -def owrapped_view(view, info): - wrapper_viewname = info.options.get('wrapper') - viewname = info.options.get('name') - if not wrapper_viewname: - return view - def _owrapped_view(context, request): - response = view(context, request) - request.wrapped_response = response - request.wrapped_body = response.body - request.wrapped_view = view - wrapped_response = render_view_to_response(context, request, - wrapper_viewname) - if wrapped_response is None: - raise ValueError( - 'No wrapper view named %r found when executing view ' - 'named %r' % (wrapper_viewname, viewname)) - return wrapped_response - return _owrapped_view - -owrapped_view.options = ('name', 'wrapper') - -def http_cached_view(view, info): - if info.settings.get('prevent_http_cache', False): - return view - - seconds = info.options.get('http_cache') - - if seconds is None: - return view - - options = {} - - if isinstance(seconds, (tuple, list)): - try: - seconds, options = seconds - except ValueError: - raise ConfigurationError( - 'If http_cache parameter is a tuple or list, it must be ' - 'in the form (seconds, options); not %s' % (seconds,)) - - def wrapper(context, request): - response = view(context, request) - prevent_caching = getattr(response.cache_control, 'prevent_auto', - False) - if not prevent_caching: - response.cache_expires(seconds, **options) - return response - - return wrapper - -http_cached_view.options = ('http_cache',) - -def secured_view(view, info): - permission = info.options.get('permission') - if permission == NO_PERMISSION_REQUIRED: - # allow views registered within configurations that have a - # default permission to explicitly override the default - # permission, replacing it with no permission at all - permission = None - - wrapped_view = view - authn_policy = info.registry.queryUtility(IAuthenticationPolicy) - authz_policy = info.registry.queryUtility(IAuthorizationPolicy) - - if authn_policy and authz_policy and (permission is not None): - def _permitted(context, request): - principals = authn_policy.effective_principals(request) - return authz_policy.permits(context, principals, permission) - def _secured_view(context, request): - result = _permitted(context, request) - if result: - return view(context, request) - view_name = getattr(view, '__name__', view) - msg = getattr( - request, 'authdebug_message', - 'Unauthorized: %s failed permission check' % view_name) - raise HTTPForbidden(msg, result=result) - _secured_view.__call_permissive__ = view - _secured_view.__permitted__ = _permitted - _secured_view.__permission__ = permission - wrapped_view = _secured_view - - return wrapped_view - -secured_view.options = ('permission',) - -def authdebug_view(view, info): - wrapped_view = view - settings = info.settings - permission = info.options.get('permission') - authn_policy = info.registry.queryUtility(IAuthenticationPolicy) - authz_policy = info.registry.queryUtility(IAuthorizationPolicy) - logger = info.registry.queryUtility(IDebugLogger) - if settings and settings.get('debug_authorization', False): - def _authdebug_view(context, request): - view_name = getattr(request, 'view_name', None) - - if authn_policy and authz_policy: - if permission is NO_PERMISSION_REQUIRED: - msg = 'Allowed (NO_PERMISSION_REQUIRED)' - elif permission is None: - msg = 'Allowed (no permission registered)' - else: - principals = authn_policy.effective_principals(request) - msg = str(authz_policy.permits( - context, principals, permission)) - else: - msg = 'Allowed (no authorization policy in use)' - - view_name = getattr(request, 'view_name', None) - url = getattr(request, 'url', None) - msg = ('debug_authorization of url %s (view name %r against ' - 'context %r): %s' % (url, view_name, context, msg)) - if logger: - logger.debug(msg) - if request is not None: - request.authdebug_message = msg - return view(context, request) - - wrapped_view = _authdebug_view - - return wrapped_view - -authdebug_view.options = ('permission',) - -def predicated_view(view, info): - preds = info.predicates - if not preds: - return view - def predicate_wrapper(context, request): - for predicate in preds: - if not predicate(context, request): - view_name = getattr(view, '__name__', view) - raise PredicateMismatch( - 'predicate mismatch for view %s (%s)' % ( - view_name, predicate.text())) - return view(context, request) - def checker(context, request): - return all((predicate(context, request) for predicate in - preds)) - predicate_wrapper.__predicated__ = checker - predicate_wrapper.__predicates__ = preds - return predicate_wrapper - -def attr_wrapped_view(view, info): - accept, order, phash = (info.options.get('accept', None), - getattr(info, 'order', MAX_ORDER), - getattr(info, 'phash', DEFAULT_PHASH)) - # this is a little silly but we don't want to decorate the original - # function with attributes that indicate accept, order, and phash, - # so we use a wrapper - if ( - (accept is None) and - (order == MAX_ORDER) and - (phash == DEFAULT_PHASH) - ): - return view # defaults - def attr_view(context, request): - return view(context, request) - attr_view.__accept__ = accept - attr_view.__order__ = order - attr_view.__phash__ = phash - attr_view.__view_attr__ = info.options.get('attr') - attr_view.__permission__ = info.options.get('permission') - return attr_view - -attr_wrapped_view.options = ('accept', 'attr', 'permission') - -def rendered_view(view, info): - # one way or another this wrapper must produce a Response (unless - # the renderer is a NullRendererHelper) - renderer = info.options.get('renderer') - if renderer is None: - # register a default renderer if you want super-dynamic - # rendering. registering a default renderer will also allow - # override_renderer to work if a renderer is left unspecified for - # a view registration. - def viewresult_to_response(context, request): - result = view(context, request) - if result.__class__ is Response: # common case - response = result - else: - response = info.registry.queryAdapterOrSelf(result, IResponse) - if response is None: - if result is None: - append = (' You may have forgotten to return a value ' - 'from the view callable.') - elif isinstance(result, dict): - append = (' You may have forgotten to define a ' - 'renderer in the view configuration.') - else: - append = '' - - msg = ('Could not convert return value of the view ' - 'callable %s into a response object. ' - 'The value returned was %r.' + append) - - raise ValueError(msg % (view_description(view), result)) - - return response - - return viewresult_to_response - - if renderer is renderers.null_renderer: - return view - - def rendered_view(context, request): - result = view(context, request) - if result.__class__ is Response: # potential common case - response = result - else: - # this must adapt, it can't do a simple interface check - # (avoid trying to render webob responses) - response = info.registry.queryAdapterOrSelf(result, IResponse) - if response is None: - attrs = getattr(request, '__dict__', {}) - if 'override_renderer' in attrs: - # renderer overridden by newrequest event or other - renderer_name = attrs.pop('override_renderer') - view_renderer = renderers.RendererHelper( - name=renderer_name, - package=info.package, - registry=info.registry) - else: - view_renderer = renderer.clone() - if '__view__' in attrs: - view_inst = attrs.pop('__view__') - else: - view_inst = getattr(view, '__original_view__', view) - response = view_renderer.render_view( - request, result, view_inst, context) - return response - - return rendered_view - -rendered_view.options = ('renderer',) - -def decorated_view(view, info): - decorator = info.options.get('decorator') - if decorator is None: - return view - return decorator(view) - -decorated_view.options = ('decorator',) diff --git a/pyramid/config/views.py b/pyramid/config/views.py index 1516743ad..2a019726f 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -76,9 +76,9 @@ from pyramid.util import ( ) import pyramid.config.predicates -import pyramid.config.derivations +import pyramid.viewderivers -from pyramid.config.derivations import ( +from pyramid.viewderivers import ( preserve_view_attrs, view_description, requestonly, @@ -1028,7 +1028,7 @@ class ViewsConfiguratorMixin(object): raise ConfigurationError('Unknown view options: %s' % (kw,)) def _apply_view_derivers(self, info): - d = pyramid.config.derivations + d = pyramid.viewderivers # These derivations have fixed order outer_derivers = [('attr_wrapped_view', d.attr_wrapped_view), ('predicated_view', d.predicated_view)] @@ -1146,7 +1146,7 @@ class ViewsConfiguratorMixin(object): order=PHASE1_CONFIG) # must be registered before add_view def add_default_view_derivers(self): - d = pyramid.config.derivations + d = pyramid.viewderivers derivers = [ ('authdebug_view', d.authdebug_view), ('secured_view', d.secured_view), diff --git a/pyramid/tests/test_config/test_derivations.py b/pyramid/tests/test_config/test_derivations.py deleted file mode 100644 index d93b37f38..000000000 --- a/pyramid/tests/test_config/test_derivations.py +++ /dev/null @@ -1,1376 +0,0 @@ -import unittest -from zope.interface import implementer - -from pyramid import testing -from pyramid.exceptions import ConfigurationError -from pyramid.interfaces import ( - IResponse, - IRequest, - ) - -class TestDeriveView(unittest.TestCase): - - def setUp(self): - self.config = testing.setUp() - - def tearDown(self): - self.config = None - testing.tearDown() - - def _makeRequest(self): - request = DummyRequest() - request.registry = self.config.registry - return request - - def _registerLogger(self): - from pyramid.interfaces import IDebugLogger - logger = DummyLogger() - self.config.registry.registerUtility(logger, IDebugLogger) - return logger - - def _registerSecurityPolicy(self, permissive): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy - policy = DummySecurityPolicy(permissive) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - - def test_function_returns_non_adaptable(self): - def view(request): - return None - result = self.config.derive_view(view) - self.assertFalse(result is view) - try: - result(None, None) - except ValueError as e: - self.assertEqual( - e.args[0], - 'Could not convert return value of the view callable function ' - 'pyramid.tests.test_config.test_derivations.view into a response ' - 'object. The value returned was None. You may have forgotten ' - 'to return a value from the view callable.' - ) - else: # pragma: no cover - raise AssertionError - - def test_function_returns_non_adaptable_dict(self): - def view(request): - return {'a':1} - result = self.config.derive_view(view) - self.assertFalse(result is view) - try: - result(None, None) - except ValueError as e: - self.assertEqual( - e.args[0], - "Could not convert return value of the view callable function " - "pyramid.tests.test_config.test_derivations.view into a response " - "object. The value returned was {'a': 1}. You may have " - "forgotten to define a renderer in the view configuration." - ) - else: # pragma: no cover - raise AssertionError - - def test_instance_returns_non_adaptable(self): - class AView(object): - def __call__(self, request): - return None - view = AView() - result = self.config.derive_view(view) - self.assertFalse(result is view) - try: - result(None, None) - except ValueError as e: - msg = e.args[0] - self.assertTrue(msg.startswith( - 'Could not convert return value of the view callable object ' - ' into a response object. The value returned was None. You ' - 'may have forgotten to return a value from the view callable.')) - else: # pragma: no cover - raise AssertionError - - def test_function_returns_true_Response_no_renderer(self): - from pyramid.response import Response - r = Response('Hello') - def view(request): - return r - result = self.config.derive_view(view) - self.assertFalse(result is view) - response = result(None, None) - self.assertEqual(response, r) - - def test_function_returns_true_Response_with_renderer(self): - from pyramid.response import Response - r = Response('Hello') - def view(request): - return r - renderer = object() - result = self.config.derive_view(view) - self.assertFalse(result is view) - response = result(None, None) - self.assertEqual(response, r) - - def test_requestonly_default_method_returns_non_adaptable(self): - request = DummyRequest() - class AView(object): - def __init__(self, request): - pass - def __call__(self): - return None - result = self.config.derive_view(AView) - self.assertFalse(result is AView) - try: - result(None, request) - except ValueError as e: - self.assertEqual( - e.args[0], - 'Could not convert return value of the view callable ' - 'method __call__ of ' - 'class pyramid.tests.test_config.test_derivations.AView into a ' - 'response object. The value returned was None. You may have ' - 'forgotten to return a value from the view callable.' - ) - else: # pragma: no cover - raise AssertionError - - def test_requestonly_nondefault_method_returns_non_adaptable(self): - request = DummyRequest() - class AView(object): - def __init__(self, request): - pass - def theviewmethod(self): - return None - result = self.config.derive_view(AView, attr='theviewmethod') - self.assertFalse(result is AView) - try: - result(None, request) - except ValueError as e: - self.assertEqual( - e.args[0], - 'Could not convert return value of the view callable ' - 'method theviewmethod of ' - 'class pyramid.tests.test_config.test_derivations.AView into a ' - 'response object. The value returned was None. You may have ' - 'forgotten to return a value from the view callable.' - ) - else: # pragma: no cover - raise AssertionError - - def test_requestonly_function(self): - response = DummyResponse() - def view(request): - return response - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(result(None, None), response) - - def test_requestonly_function_with_renderer(self): - response = DummyResponse() - class moo(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, 'OK') - self.assertEqual(view_inst, view) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - def view(request): - return 'OK' - result = self.config.derive_view(view, renderer=moo()) - self.assertFalse(result.__wraps__ is view) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_requestonly_function_with_renderer_request_override(self): - def moo(info): - def inner(value, system): - self.assertEqual(value, 'OK') - self.assertEqual(system['request'], request) - self.assertEqual(system['context'], context) - return b'moo' - return inner - def view(request): - return 'OK' - self.config.add_renderer('moo', moo) - result = self.config.derive_view(view, renderer='string') - self.assertFalse(result is view) - request = self._makeRequest() - request.override_renderer = 'moo' - context = testing.DummyResource() - self.assertEqual(result(context, request).body, b'moo') - - def test_requestonly_function_with_renderer_request_has_view(self): - response = DummyResponse() - class moo(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, 'OK') - self.assertEqual(view_inst, 'view') - self.assertEqual(ctx, context) - return response - def clone(self): - return self - def view(request): - return 'OK' - result = self.config.derive_view(view, renderer=moo()) - self.assertFalse(result.__wraps__ is view) - request = self._makeRequest() - request.__view__ = 'view' - context = testing.DummyResource() - r = result(context, request) - self.assertEqual(r, response) - self.assertFalse(hasattr(request, '__view__')) - - def test_class_without_attr(self): - response = DummyResponse() - class View(object): - def __init__(self, request): - pass - def __call__(self): - return response - result = self.config.derive_view(View) - request = self._makeRequest() - self.assertEqual(result(None, request), response) - self.assertEqual(request.__view__.__class__, View) - - def test_class_with_attr(self): - response = DummyResponse() - class View(object): - def __init__(self, request): - pass - def another(self): - return response - result = self.config.derive_view(View, attr='another') - request = self._makeRequest() - self.assertEqual(result(None, request), response) - self.assertEqual(request.__view__.__class__, View) - - def test_as_function_context_and_request(self): - def view(context, request): - return 'OK' - result = self.config.derive_view(view) - self.assertTrue(result.__wraps__ is view) - self.assertFalse(hasattr(result, '__call_permissive__')) - self.assertEqual(view(None, None), 'OK') - - def test_as_function_requestonly(self): - response = DummyResponse() - def view(request): - return response - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), response) - - def test_as_newstyle_class_context_and_request(self): - response = DummyResponse() - class view(object): - def __init__(self, context, request): - pass - def __call__(self): - return response - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - self.assertEqual(result(None, request), response) - self.assertEqual(request.__view__.__class__, view) - - def test_as_newstyle_class_requestonly(self): - response = DummyResponse() - class view(object): - def __init__(self, context, request): - pass - def __call__(self): - return response - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - self.assertEqual(result(None, request), response) - self.assertEqual(request.__view__.__class__, view) - - def test_as_oldstyle_class_context_and_request(self): - response = DummyResponse() - class view: - def __init__(self, context, request): - pass - def __call__(self): - return response - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - self.assertEqual(result(None, request), response) - self.assertEqual(request.__view__.__class__, view) - - def test_as_oldstyle_class_requestonly(self): - response = DummyResponse() - class view: - def __init__(self, context, request): - pass - def __call__(self): - return response - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - self.assertEqual(result(None, request), response) - self.assertEqual(request.__view__.__class__, view) - - def test_as_instance_context_and_request(self): - response = DummyResponse() - class View: - def __call__(self, context, request): - return response - view = View() - result = self.config.derive_view(view) - self.assertTrue(result.__wraps__ is view) - self.assertFalse(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), response) - - def test_as_instance_requestonly(self): - response = DummyResponse() - class View: - def __call__(self, request): - return response - view = View() - result = self.config.derive_view(view) - self.assertFalse(result is view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertTrue('test_derivations' in result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - self.assertEqual(result(None, None), response) - - def test_with_debug_authorization_no_authpol(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)") - - def test_with_debug_authorization_authn_policy_no_authz_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict(debug_authorization=True) - from pyramid.interfaces import IAuthenticationPolicy - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)") - - def test_with_debug_authorization_authz_policy_no_authn_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict(debug_authorization=True) - from pyramid.interfaces import IAuthorizationPolicy - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)") - - def test_with_debug_authorization_no_permission(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - self._registerSecurityPolicy(True) - logger = self._registerLogger() - result = self.config._derive_view(view) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed (" - "no permission registered)") - - def test_debug_auth_permission_authpol_permitted(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(True) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertEqual(result.__call_permissive__.__wraps__, view) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): True") - - def test_debug_auth_permission_authpol_permitted_no_request(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(True) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertEqual(result.__call_permissive__.__wraps__, view) - self.assertEqual(result(None, None), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url None (view name " - "None against context None): True") - - def test_debug_auth_permission_authpol_denied(self): - from pyramid.httpexceptions import HTTPForbidden - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(False) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertEqual(result.__call_permissive__.__wraps__, view) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertRaises(HTTPForbidden, result, None, request) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): False") - - def test_debug_auth_permission_authpol_denied2(self): - view = lambda *arg: 'OK' - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - self._registerLogger() - self._registerSecurityPolicy(False) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - permitted = result.__permitted__(None, None) - self.assertEqual(permitted, False) - - def test_debug_auth_permission_authpol_overridden(self): - from pyramid.security import NO_PERMISSION_REQUIRED - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict( - debug_authorization=True, reload_templates=True) - logger = self._registerLogger() - self._registerSecurityPolicy(False) - result = self.config._derive_view(view, permission=NO_PERMISSION_REQUIRED) - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual(logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): " - "Allowed (NO_PERMISSION_REQUIRED)") - - def test_secured_view_authn_policy_no_authz_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = {} - from pyramid.interfaces import IAuthenticationPolicy - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - - def test_secured_view_authz_policy_no_authn_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = {} - from pyramid.interfaces import IAuthorizationPolicy - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - - def test_secured_view_raises_forbidden_no_name(self): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy - from pyramid.httpexceptions import HTTPForbidden - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = {} - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - result = self.config._derive_view(view, permission='view') - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - try: - result(None, request) - except HTTPForbidden as e: - self.assertEqual(e.message, - 'Unauthorized: failed permission check') - else: # pragma: no cover - raise AssertionError - - def test_secured_view_raises_forbidden_with_name(self): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy - from pyramid.httpexceptions import HTTPForbidden - def myview(request): pass - self.config.registry.settings = {} - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - result = self.config._derive_view(myview, permission='view') - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - try: - result(None, request) - except HTTPForbidden as e: - self.assertEqual(e.message, - 'Unauthorized: myview failed permission check') - else: # pragma: no cover - raise AssertionError - - def test_predicate_mismatch_view_has_no_name(self): - from pyramid.exceptions import PredicateMismatch - response = DummyResponse() - view = lambda *arg: response - def predicate1(context, request): - return False - predicate1.text = lambda *arg: 'text' - result = self.config._derive_view(view, predicates=[predicate1]) - request = self._makeRequest() - request.method = 'POST' - try: - result(None, None) - except PredicateMismatch as e: - self.assertEqual(e.detail, - 'predicate mismatch for view (text)') - else: # pragma: no cover - raise AssertionError - - def test_predicate_mismatch_view_has_name(self): - from pyramid.exceptions import PredicateMismatch - def myview(request): pass - def predicate1(context, request): - return False - predicate1.text = lambda *arg: 'text' - result = self.config._derive_view(myview, predicates=[predicate1]) - request = self._makeRequest() - request.method = 'POST' - try: - result(None, None) - except PredicateMismatch as e: - self.assertEqual(e.detail, - 'predicate mismatch for view myview (text)') - else: # pragma: no cover - raise AssertionError - - def test_predicate_mismatch_exception_has_text_in_detail(self): - from pyramid.exceptions import PredicateMismatch - def myview(request): pass - def predicate1(context, request): - return True - predicate1.text = lambda *arg: 'pred1' - def predicate2(context, request): - return False - predicate2.text = lambda *arg: 'pred2' - result = self.config._derive_view(myview, - predicates=[predicate1, predicate2]) - request = self._makeRequest() - request.method = 'POST' - try: - result(None, None) - except PredicateMismatch as e: - self.assertEqual(e.detail, - 'predicate mismatch for view myview (pred2)') - else: # pragma: no cover - raise AssertionError - - def test_with_predicates_all(self): - response = DummyResponse() - view = lambda *arg: response - predicates = [] - def predicate1(context, request): - predicates.append(True) - return True - def predicate2(context, request): - predicates.append(True) - return True - result = self.config._derive_view(view, - predicates=[predicate1, predicate2]) - request = self._makeRequest() - request.method = 'POST' - next = result(None, None) - self.assertEqual(next, response) - self.assertEqual(predicates, [True, True]) - - def test_with_predicates_checker(self): - view = lambda *arg: 'OK' - predicates = [] - def predicate1(context, request): - predicates.append(True) - return True - def predicate2(context, request): - predicates.append(True) - return True - result = self.config._derive_view(view, - predicates=[predicate1, predicate2]) - request = self._makeRequest() - request.method = 'POST' - next = result.__predicated__(None, None) - self.assertEqual(next, True) - self.assertEqual(predicates, [True, True]) - - def test_with_predicates_notall(self): - from pyramid.httpexceptions import HTTPNotFound - view = lambda *arg: 'OK' - predicates = [] - def predicate1(context, request): - predicates.append(True) - return True - predicate1.text = lambda *arg: 'text' - def predicate2(context, request): - predicates.append(True) - return False - predicate2.text = lambda *arg: 'text' - result = self.config._derive_view(view, - predicates=[predicate1, predicate2]) - request = self._makeRequest() - request.method = 'POST' - self.assertRaises(HTTPNotFound, result, None, None) - self.assertEqual(predicates, [True, True]) - - def test_with_wrapper_viewname(self): - from pyramid.response import Response - from pyramid.interfaces import IView - from pyramid.interfaces import IViewClassifier - inner_response = Response('OK') - def inner_view(context, request): - return inner_response - def outer_view(context, request): - self.assertEqual(request.wrapped_response, inner_response) - self.assertEqual(request.wrapped_body, inner_response.body) - self.assertEqual(request.wrapped_view.__original_view__, - inner_view) - return Response(b'outer ' + request.wrapped_body) - self.config.registry.registerAdapter( - outer_view, (IViewClassifier, None, None), IView, 'owrap') - result = self.config._derive_view(inner_view, viewname='inner', - wrapper_viewname='owrap') - self.assertFalse(result is inner_view) - self.assertEqual(inner_view.__module__, result.__module__) - self.assertEqual(inner_view.__doc__, result.__doc__) - request = self._makeRequest() - response = result(None, request) - self.assertEqual(response.body, b'outer OK') - - def test_with_wrapper_viewname_notfound(self): - from pyramid.response import Response - inner_response = Response('OK') - def inner_view(context, request): - return inner_response - wrapped = self.config._derive_view(inner_view, viewname='inner', - wrapper_viewname='owrap') - request = self._makeRequest() - self.assertRaises(ValueError, wrapped, None, request) - - def test_as_newstyle_class_context_and_request_attr_and_renderer(self): - response = DummyResponse() - class renderer(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, {'a':'1'}) - self.assertEqual(view_inst.__class__, View) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - class View(object): - def __init__(self, context, request): - pass - def index(self): - return {'a':'1'} - result = self.config._derive_view(View, - renderer=renderer(), attr='index') - self.assertFalse(result is View) - self.assertEqual(result.__module__, View.__module__) - self.assertEqual(result.__doc__, View.__doc__) - self.assertEqual(result.__name__, View.__name__) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_as_newstyle_class_requestonly_attr_and_renderer(self): - response = DummyResponse() - class renderer(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, {'a':'1'}) - self.assertEqual(view_inst.__class__, View) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - class View(object): - def __init__(self, request): - pass - def index(self): - return {'a':'1'} - result = self.config.derive_view(View, - renderer=renderer(), attr='index') - self.assertFalse(result is View) - self.assertEqual(result.__module__, View.__module__) - self.assertEqual(result.__doc__, View.__doc__) - self.assertEqual(result.__name__, View.__name__) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_as_oldstyle_cls_context_request_attr_and_renderer(self): - response = DummyResponse() - class renderer(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, {'a':'1'}) - self.assertEqual(view_inst.__class__, View) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - class View: - def __init__(self, context, request): - pass - def index(self): - return {'a':'1'} - result = self.config.derive_view(View, - renderer=renderer(), attr='index') - self.assertFalse(result is View) - self.assertEqual(result.__module__, View.__module__) - self.assertEqual(result.__doc__, View.__doc__) - self.assertEqual(result.__name__, View.__name__) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_as_oldstyle_cls_requestonly_attr_and_renderer(self): - response = DummyResponse() - class renderer(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, {'a':'1'}) - self.assertEqual(view_inst.__class__, View) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - class View: - def __init__(self, request): - pass - def index(self): - return {'a':'1'} - result = self.config.derive_view(View, - renderer=renderer(), attr='index') - self.assertFalse(result is View) - self.assertEqual(result.__module__, View.__module__) - self.assertEqual(result.__doc__, View.__doc__) - self.assertEqual(result.__name__, View.__name__) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_as_instance_context_and_request_attr_and_renderer(self): - response = DummyResponse() - class renderer(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, {'a':'1'}) - self.assertEqual(view_inst, view) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - class View: - def index(self, context, request): - return {'a':'1'} - view = View() - result = self.config.derive_view(view, - renderer=renderer(), attr='index') - self.assertFalse(result is view) - self.assertEqual(result.__module__, view.__module__) - self.assertEqual(result.__doc__, view.__doc__) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_as_instance_requestonly_attr_and_renderer(self): - response = DummyResponse() - class renderer(object): - def render_view(inself, req, resp, view_inst, ctx): - self.assertEqual(req, request) - self.assertEqual(resp, {'a':'1'}) - self.assertEqual(view_inst, view) - self.assertEqual(ctx, context) - return response - def clone(self): - return self - class View: - def index(self, request): - return {'a':'1'} - view = View() - result = self.config.derive_view(view, - renderer=renderer(), attr='index') - self.assertFalse(result is view) - self.assertEqual(result.__module__, view.__module__) - self.assertEqual(result.__doc__, view.__doc__) - request = self._makeRequest() - context = testing.DummyResource() - self.assertEqual(result(context, request), response) - - def test_with_view_mapper_config_specified(self): - response = DummyResponse() - class mapper(object): - def __init__(self, **kw): - self.kw = kw - def __call__(self, view): - def wrapped(context, request): - return response - return wrapped - def view(context, request): return 'NOTOK' - result = self.config._derive_view(view, mapper=mapper) - self.assertFalse(result.__wraps__ is view) - self.assertEqual(result(None, None), response) - - def test_with_view_mapper_view_specified(self): - from pyramid.response import Response - response = Response() - def mapper(**kw): - def inner(view): - def superinner(context, request): - self.assertEqual(request, None) - return response - return superinner - return inner - def view(context, request): return 'NOTOK' - view.__view_mapper__ = mapper - result = self.config.derive_view(view) - self.assertFalse(result.__wraps__ is view) - self.assertEqual(result(None, None), response) - - def test_with_view_mapper_default_mapper_specified(self): - from pyramid.response import Response - response = Response() - def mapper(**kw): - def inner(view): - def superinner(context, request): - self.assertEqual(request, None) - return response - return superinner - return inner - self.config.set_view_mapper(mapper) - def view(context, request): return 'NOTOK' - result = self.config.derive_view(view) - self.assertFalse(result.__wraps__ is view) - self.assertEqual(result(None, None), response) - - def test_attr_wrapped_view_branching_default_phash(self): - from pyramid.config.util import DEFAULT_PHASH - def view(context, request): pass - result = self.config._derive_view(view, phash=DEFAULT_PHASH) - self.assertEqual(result.__wraps__, view) - - def test_attr_wrapped_view_branching_nondefault_phash(self): - def view(context, request): pass - result = self.config._derive_view(view, phash='nondefault') - self.assertNotEqual(result, view) - - def test_http_cached_view_integer(self): - import datetime - from pyramid.response import Response - response = Response('OK') - def inner_view(context, request): - return response - result = self.config._derive_view(inner_view, http_cache=3600) - self.assertFalse(result is inner_view) - self.assertEqual(inner_view.__module__, result.__module__) - self.assertEqual(inner_view.__doc__, result.__doc__) - request = self._makeRequest() - when = datetime.datetime.utcnow() + datetime.timedelta(hours=1) - result = result(None, request) - self.assertEqual(result, response) - headers = dict(result.headerlist) - expires = parse_httpdate(headers['Expires']) - assert_similar_datetime(expires, when) - self.assertEqual(headers['Cache-Control'], 'max-age=3600') - - def test_http_cached_view_timedelta(self): - import datetime - from pyramid.response import Response - response = Response('OK') - def inner_view(context, request): - return response - result = self.config._derive_view(inner_view, - http_cache=datetime.timedelta(hours=1)) - self.assertFalse(result is inner_view) - self.assertEqual(inner_view.__module__, result.__module__) - self.assertEqual(inner_view.__doc__, result.__doc__) - request = self._makeRequest() - when = datetime.datetime.utcnow() + datetime.timedelta(hours=1) - result = result(None, request) - self.assertEqual(result, response) - headers = dict(result.headerlist) - expires = parse_httpdate(headers['Expires']) - assert_similar_datetime(expires, when) - self.assertEqual(headers['Cache-Control'], 'max-age=3600') - - def test_http_cached_view_tuple(self): - import datetime - from pyramid.response import Response - response = Response('OK') - def inner_view(context, request): - return response - result = self.config._derive_view(inner_view, - http_cache=(3600, {'public':True})) - self.assertFalse(result is inner_view) - self.assertEqual(inner_view.__module__, result.__module__) - self.assertEqual(inner_view.__doc__, result.__doc__) - request = self._makeRequest() - when = datetime.datetime.utcnow() + datetime.timedelta(hours=1) - result = result(None, request) - self.assertEqual(result, response) - headers = dict(result.headerlist) - expires = parse_httpdate(headers['Expires']) - assert_similar_datetime(expires, when) - self.assertEqual(headers['Cache-Control'], 'max-age=3600, public') - - def test_http_cached_view_tuple_seconds_None(self): - from pyramid.response import Response - response = Response('OK') - def inner_view(context, request): - return response - result = self.config._derive_view(inner_view, - http_cache=(None, {'public':True})) - self.assertFalse(result is inner_view) - self.assertEqual(inner_view.__module__, result.__module__) - self.assertEqual(inner_view.__doc__, result.__doc__) - request = self._makeRequest() - result = result(None, request) - self.assertEqual(result, response) - headers = dict(result.headerlist) - self.assertFalse('Expires' in headers) - self.assertEqual(headers['Cache-Control'], 'public') - - def test_http_cached_view_prevent_auto_set(self): - from pyramid.response import Response - response = Response() - response.cache_control.prevent_auto = True - def inner_view(context, request): - return response - result = self.config._derive_view(inner_view, http_cache=3600) - request = self._makeRequest() - result = result(None, request) - self.assertEqual(result, response) # doesn't blow up - headers = dict(result.headerlist) - self.assertFalse('Expires' in headers) - self.assertFalse('Cache-Control' in headers) - - def test_http_cached_prevent_http_cache_in_settings(self): - self.config.registry.settings['prevent_http_cache'] = True - from pyramid.response import Response - response = Response() - def inner_view(context, request): - return response - result = self.config._derive_view(inner_view, http_cache=3600) - request = self._makeRequest() - result = result(None, request) - self.assertEqual(result, response) - headers = dict(result.headerlist) - self.assertFalse('Expires' in headers) - self.assertFalse('Cache-Control' in headers) - - def test_http_cached_view_bad_tuple(self): - def view(request): pass - self.assertRaises(ConfigurationError, self.config._derive_view, - view, http_cache=(None,)) - - -class TestDerivationOrder(unittest.TestCase): - def setUp(self): - self.config = testing.setUp() - - def tearDown(self): - self.config = None - testing.tearDown() - - def test_right_order_user_sorted(self): - from pyramid.interfaces import IViewDerivers - - self.config.add_view_deriver(None, 'deriv1') - self.config.add_view_deriver(None, 'deriv2', over='deriv1') - self.config.add_view_deriver(None, 'deriv3', under='deriv2') - - derivers = self.config.registry.getUtility(IViewDerivers) - derivers_sorted = derivers.sorted() - dlist = [d for (d, _) in derivers_sorted] - self.assertEqual([ - 'authdebug_view', - 'secured_view', - 'owrapped_view', - 'http_cached_view', - 'decorated_view', - 'deriv2', - 'deriv3', - 'deriv1', - 'rendered_view', - ], dlist) - - def test_right_order_implicit(self): - from pyramid.interfaces import IViewDerivers - - self.config.add_view_deriver(None, 'deriv1') - self.config.add_view_deriver(None, 'deriv2') - self.config.add_view_deriver(None, 'deriv3') - - derivers = self.config.registry.getUtility(IViewDerivers) - derivers_sorted = derivers.sorted() - dlist = [d for (d, _) in derivers_sorted] - self.assertEqual([ - 'authdebug_view', - 'secured_view', - 'owrapped_view', - 'http_cached_view', - 'decorated_view', - 'deriv3', - 'deriv2', - 'deriv1', - 'rendered_view', - ], dlist) - - def test_right_order_under_rendered_view(self): - from pyramid.interfaces import IViewDerivers - - self.config.add_view_deriver(None, 'deriv1', under='rendered_view') - - derivers = self.config.registry.getUtility(IViewDerivers) - derivers_sorted = derivers.sorted() - dlist = [d for (d, _) in derivers_sorted] - self.assertEqual([ - 'authdebug_view', - 'secured_view', - 'owrapped_view', - 'http_cached_view', - 'decorated_view', - 'rendered_view', - 'deriv1', - ], dlist) - - - def test_right_order_under_rendered_view_others(self): - from pyramid.interfaces import IViewDerivers - - self.config.add_view_deriver(None, 'deriv1', under='rendered_view') - self.config.add_view_deriver(None, 'deriv2') - self.config.add_view_deriver(None, 'deriv3') - - derivers = self.config.registry.getUtility(IViewDerivers) - derivers_sorted = derivers.sorted() - dlist = [d for (d, _) in derivers_sorted] - self.assertEqual([ - 'authdebug_view', - 'secured_view', - 'owrapped_view', - 'http_cached_view', - 'decorated_view', - 'deriv3', - 'deriv2', - 'rendered_view', - 'deriv1', - ], dlist) - - -class TestAddDeriver(unittest.TestCase): - - def setUp(self): - self.config = testing.setUp() - - def tearDown(self): - self.config = None - testing.tearDown() - - def test_add_single_deriver(self): - response = DummyResponse() - response.deriv = False - view = lambda *arg: response - - def deriv(view, info): - self.assertFalse(response.deriv) - response.deriv = True - return view - - result = self.config._derive_view(view) - self.assertFalse(response.deriv) - self.config.add_view_deriver(deriv, 'test_deriv') - - result = self.config._derive_view(view) - self.assertTrue(response.deriv) - - def test_override_deriver(self): - flags = {} - - class AView: - def __init__(self): - self.response = DummyResponse() - - def deriv1(view, value, **kw): - flags['deriv1'] = True - return view - - def deriv2(view, value, **kw): - flags['deriv2'] = True - return view - - view1 = AView() - self.config.add_view_deriver(deriv1, 'test_deriv') - result = self.config._derive_view(view1) - self.assertTrue(flags.get('deriv1')) - self.assertFalse(flags.get('deriv2')) - - flags.clear() - view2 = AView() - self.config.add_view_deriver(deriv2, 'test_deriv') - result = self.config._derive_view(view2) - self.assertFalse(flags.get('deriv1')) - self.assertTrue(flags.get('deriv2')) - - def test_add_multi_derivers_ordered(self): - response = DummyResponse() - view = lambda *arg: response - response.deriv = [] - - def deriv1(view, value, **kw): - response.deriv.append('deriv1') - return view - - def deriv2(view, value, **kw): - response.deriv.append('deriv2') - return view - - def deriv3(view, value, **kw): - response.deriv.append('deriv3') - return view - - self.config.add_view_deriver(deriv1, 'deriv1') - self.config.add_view_deriver(deriv2, 'deriv2', under='deriv1') - self.config.add_view_deriver(deriv3, 'deriv3', over='deriv2') - result = self.config._derive_view(view) - self.assertEqual(response.deriv, ['deriv2', 'deriv3', 'deriv1']) - - -class TestDeriverIntegration(unittest.TestCase): - def setUp(self): - self.config = testing.setUp() - - def tearDown(self): - self.config = None - testing.tearDown() - - def _getViewCallable(self, config, ctx_iface=None, request_iface=None, - name=''): - from zope.interface import Interface - from pyramid.interfaces import IRequest - from pyramid.interfaces import IView - from pyramid.interfaces import IViewClassifier - from pyramid.interfaces import IExceptionViewClassifier - classifier = IViewClassifier - if ctx_iface is None: - ctx_iface = Interface - if request_iface is None: - request_iface = IRequest - return config.registry.adapters.lookup( - (classifier, request_iface, ctx_iface), IView, name=name, - default=None) - - def _makeRequest(self, config): - request = DummyRequest() - request.registry = config.registry - return request - - def test_view_options(self): - response = DummyResponse() - view = lambda *arg: response - response.deriv = [] - - def deriv1(view, info): - response.deriv.append(info.options['deriv1']) - return view - deriv1.options = ('deriv1',) - - def deriv2(view, info): - response.deriv.append(info.options['deriv2']) - return view - deriv2.options = ('deriv2',) - - self.config.add_view_deriver(deriv1, 'deriv1') - self.config.add_view_deriver(deriv2, 'deriv2') - self.config.add_view(view, deriv1='test1', deriv2='test2') - - wrapper = self._getViewCallable(self.config) - request = self._makeRequest(self.config) - request.method = 'GET' - self.assertEqual(wrapper(None, request), response) - self.assertEqual(['test1', 'test2'], response.deriv) - - def test_unexpected_view_options(self): - from pyramid.exceptions import ConfigurationError - def deriv1(view, info): pass - self.config.add_view_deriver(deriv1, 'deriv1') - self.assertRaises( - ConfigurationError, - lambda: self.config.add_view(lambda r: {}, deriv1='test1')) - -@implementer(IResponse) -class DummyResponse(object): - content_type = None - default_content_type = None - body = None - -class DummyRequest: - subpath = () - matchdict = None - request_iface = IRequest - - def __init__(self, environ=None): - if environ is None: - environ = {} - self.environ = environ - self.params = {} - self.cookies = {} - self.response = DummyResponse() - -class DummyLogger: - def __init__(self): - self.messages = [] - def info(self, msg): - self.messages.append(msg) - warn = info - debug = info - -class DummySecurityPolicy: - def __init__(self, permitted=True): - self.permitted = permitted - - def effective_principals(self, request): - return [] - - def permits(self, context, principals, permission): - return self.permitted - -def parse_httpdate(s): - import datetime - # cannot use %Z, must use literal GMT; Jython honors timezone - # but CPython does not - return datetime.datetime.strptime(s, "%a, %d %b %Y %H:%M:%S GMT") - -def assert_similar_datetime(one, two): - for attr in ('year', 'month', 'day', 'hour', 'minute'): - one_attr = getattr(one, attr) - two_attr = getattr(two, attr) - if not one_attr == two_attr: # pragma: no cover - raise AssertionError('%r != %r in %s' % (one_attr, two_attr, attr)) diff --git a/pyramid/tests/test_viewderivers.py b/pyramid/tests/test_viewderivers.py new file mode 100644 index 000000000..dfac827dc --- /dev/null +++ b/pyramid/tests/test_viewderivers.py @@ -0,0 +1,1376 @@ +import unittest +from zope.interface import implementer + +from pyramid import testing +from pyramid.exceptions import ConfigurationError +from pyramid.interfaces import ( + IResponse, + IRequest, + ) + +class TestDeriveView(unittest.TestCase): + + def setUp(self): + self.config = testing.setUp() + + def tearDown(self): + self.config = None + testing.tearDown() + + def _makeRequest(self): + request = DummyRequest() + request.registry = self.config.registry + return request + + def _registerLogger(self): + from pyramid.interfaces import IDebugLogger + logger = DummyLogger() + self.config.registry.registerUtility(logger, IDebugLogger) + return logger + + def _registerSecurityPolicy(self, permissive): + from pyramid.interfaces import IAuthenticationPolicy + from pyramid.interfaces import IAuthorizationPolicy + policy = DummySecurityPolicy(permissive) + self.config.registry.registerUtility(policy, IAuthenticationPolicy) + self.config.registry.registerUtility(policy, IAuthorizationPolicy) + + def test_function_returns_non_adaptable(self): + def view(request): + return None + result = self.config.derive_view(view) + self.assertFalse(result is view) + try: + result(None, None) + except ValueError as e: + self.assertEqual( + e.args[0], + 'Could not convert return value of the view callable function ' + 'pyramid.tests.test_viewderivers.view into a response ' + 'object. The value returned was None. You may have forgotten ' + 'to return a value from the view callable.' + ) + else: # pragma: no cover + raise AssertionError + + def test_function_returns_non_adaptable_dict(self): + def view(request): + return {'a':1} + result = self.config.derive_view(view) + self.assertFalse(result is view) + try: + result(None, None) + except ValueError as e: + self.assertEqual( + e.args[0], + "Could not convert return value of the view callable function " + "pyramid.tests.test_viewderivers.view into a response " + "object. The value returned was {'a': 1}. You may have " + "forgotten to define a renderer in the view configuration." + ) + else: # pragma: no cover + raise AssertionError + + def test_instance_returns_non_adaptable(self): + class AView(object): + def __call__(self, request): + return None + view = AView() + result = self.config.derive_view(view) + self.assertFalse(result is view) + try: + result(None, None) + except ValueError as e: + msg = e.args[0] + self.assertTrue(msg.startswith( + 'Could not convert return value of the view callable object ' + ' into a response object. The value returned was None. You ' + 'may have forgotten to return a value from the view callable.')) + else: # pragma: no cover + raise AssertionError + + def test_function_returns_true_Response_no_renderer(self): + from pyramid.response import Response + r = Response('Hello') + def view(request): + return r + result = self.config.derive_view(view) + self.assertFalse(result is view) + response = result(None, None) + self.assertEqual(response, r) + + def test_function_returns_true_Response_with_renderer(self): + from pyramid.response import Response + r = Response('Hello') + def view(request): + return r + renderer = object() + result = self.config.derive_view(view) + self.assertFalse(result is view) + response = result(None, None) + self.assertEqual(response, r) + + def test_requestonly_default_method_returns_non_adaptable(self): + request = DummyRequest() + class AView(object): + def __init__(self, request): + pass + def __call__(self): + return None + result = self.config.derive_view(AView) + self.assertFalse(result is AView) + try: + result(None, request) + except ValueError as e: + self.assertEqual( + e.args[0], + 'Could not convert return value of the view callable ' + 'method __call__ of ' + 'class pyramid.tests.test_viewderivers.AView into a ' + 'response object. The value returned was None. You may have ' + 'forgotten to return a value from the view callable.' + ) + else: # pragma: no cover + raise AssertionError + + def test_requestonly_nondefault_method_returns_non_adaptable(self): + request = DummyRequest() + class AView(object): + def __init__(self, request): + pass + def theviewmethod(self): + return None + result = self.config.derive_view(AView, attr='theviewmethod') + self.assertFalse(result is AView) + try: + result(None, request) + except ValueError as e: + self.assertEqual( + e.args[0], + 'Could not convert return value of the view callable ' + 'method theviewmethod of ' + 'class pyramid.tests.test_viewderivers.AView into a ' + 'response object. The value returned was None. You may have ' + 'forgotten to return a value from the view callable.' + ) + else: # pragma: no cover + raise AssertionError + + def test_requestonly_function(self): + response = DummyResponse() + def view(request): + return response + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(result(None, None), response) + + def test_requestonly_function_with_renderer(self): + response = DummyResponse() + class moo(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, 'OK') + self.assertEqual(view_inst, view) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + def view(request): + return 'OK' + result = self.config.derive_view(view, renderer=moo()) + self.assertFalse(result.__wraps__ is view) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_requestonly_function_with_renderer_request_override(self): + def moo(info): + def inner(value, system): + self.assertEqual(value, 'OK') + self.assertEqual(system['request'], request) + self.assertEqual(system['context'], context) + return b'moo' + return inner + def view(request): + return 'OK' + self.config.add_renderer('moo', moo) + result = self.config.derive_view(view, renderer='string') + self.assertFalse(result is view) + request = self._makeRequest() + request.override_renderer = 'moo' + context = testing.DummyResource() + self.assertEqual(result(context, request).body, b'moo') + + def test_requestonly_function_with_renderer_request_has_view(self): + response = DummyResponse() + class moo(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, 'OK') + self.assertEqual(view_inst, 'view') + self.assertEqual(ctx, context) + return response + def clone(self): + return self + def view(request): + return 'OK' + result = self.config.derive_view(view, renderer=moo()) + self.assertFalse(result.__wraps__ is view) + request = self._makeRequest() + request.__view__ = 'view' + context = testing.DummyResource() + r = result(context, request) + self.assertEqual(r, response) + self.assertFalse(hasattr(request, '__view__')) + + def test_class_without_attr(self): + response = DummyResponse() + class View(object): + def __init__(self, request): + pass + def __call__(self): + return response + result = self.config.derive_view(View) + request = self._makeRequest() + self.assertEqual(result(None, request), response) + self.assertEqual(request.__view__.__class__, View) + + def test_class_with_attr(self): + response = DummyResponse() + class View(object): + def __init__(self, request): + pass + def another(self): + return response + result = self.config.derive_view(View, attr='another') + request = self._makeRequest() + self.assertEqual(result(None, request), response) + self.assertEqual(request.__view__.__class__, View) + + def test_as_function_context_and_request(self): + def view(context, request): + return 'OK' + result = self.config.derive_view(view) + self.assertTrue(result.__wraps__ is view) + self.assertFalse(hasattr(result, '__call_permissive__')) + self.assertEqual(view(None, None), 'OK') + + def test_as_function_requestonly(self): + response = DummyResponse() + def view(request): + return response + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), response) + + def test_as_newstyle_class_context_and_request(self): + response = DummyResponse() + class view(object): + def __init__(self, context, request): + pass + def __call__(self): + return response + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + self.assertEqual(result(None, request), response) + self.assertEqual(request.__view__.__class__, view) + + def test_as_newstyle_class_requestonly(self): + response = DummyResponse() + class view(object): + def __init__(self, context, request): + pass + def __call__(self): + return response + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + self.assertEqual(result(None, request), response) + self.assertEqual(request.__view__.__class__, view) + + def test_as_oldstyle_class_context_and_request(self): + response = DummyResponse() + class view: + def __init__(self, context, request): + pass + def __call__(self): + return response + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + self.assertEqual(result(None, request), response) + self.assertEqual(request.__view__.__class__, view) + + def test_as_oldstyle_class_requestonly(self): + response = DummyResponse() + class view: + def __init__(self, context, request): + pass + def __call__(self): + return response + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + self.assertEqual(result(None, request), response) + self.assertEqual(request.__view__.__class__, view) + + def test_as_instance_context_and_request(self): + response = DummyResponse() + class View: + def __call__(self, context, request): + return response + view = View() + result = self.config.derive_view(view) + self.assertTrue(result.__wraps__ is view) + self.assertFalse(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), response) + + def test_as_instance_requestonly(self): + response = DummyResponse() + class View: + def __call__(self, request): + return response + view = View() + result = self.config.derive_view(view) + self.assertFalse(result is view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertTrue('test_viewderivers' in result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + self.assertEqual(result(None, None), response) + + def test_with_debug_authorization_no_authpol(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): Allowed " + "(no authorization policy in use)") + + def test_with_debug_authorization_authn_policy_no_authz_policy(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict(debug_authorization=True) + from pyramid.interfaces import IAuthenticationPolicy + policy = DummySecurityPolicy(False) + self.config.registry.registerUtility(policy, IAuthenticationPolicy) + logger = self._registerLogger() + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): Allowed " + "(no authorization policy in use)") + + def test_with_debug_authorization_authz_policy_no_authn_policy(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict(debug_authorization=True) + from pyramid.interfaces import IAuthorizationPolicy + policy = DummySecurityPolicy(False) + self.config.registry.registerUtility(policy, IAuthorizationPolicy) + logger = self._registerLogger() + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): Allowed " + "(no authorization policy in use)") + + def test_with_debug_authorization_no_permission(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + self._registerSecurityPolicy(True) + logger = self._registerLogger() + result = self.config._derive_view(view) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): Allowed (" + "no permission registered)") + + def test_debug_auth_permission_authpol_permitted(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(True) + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertEqual(result.__call_permissive__.__wraps__, view) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): True") + + def test_debug_auth_permission_authpol_permitted_no_request(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(True) + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertEqual(result.__call_permissive__.__wraps__, view) + self.assertEqual(result(None, None), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url None (view name " + "None against context None): True") + + def test_debug_auth_permission_authpol_denied(self): + from pyramid.httpexceptions import HTTPForbidden + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(False) + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertEqual(result.__call_permissive__.__wraps__, view) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertRaises(HTTPForbidden, result, None, request) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): False") + + def test_debug_auth_permission_authpol_denied2(self): + view = lambda *arg: 'OK' + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + self._registerLogger() + self._registerSecurityPolicy(False) + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + permitted = result.__permitted__(None, None) + self.assertEqual(permitted, False) + + def test_debug_auth_permission_authpol_overridden(self): + from pyramid.security import NO_PERMISSION_REQUIRED + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = dict( + debug_authorization=True, reload_templates=True) + logger = self._registerLogger() + self._registerSecurityPolicy(False) + result = self.config._derive_view(view, permission=NO_PERMISSION_REQUIRED) + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + self.assertEqual(len(logger.messages), 1) + self.assertEqual(logger.messages[0], + "debug_authorization of url url (view name " + "'view_name' against context None): " + "Allowed (NO_PERMISSION_REQUIRED)") + + def test_secured_view_authn_policy_no_authz_policy(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = {} + from pyramid.interfaces import IAuthenticationPolicy + policy = DummySecurityPolicy(False) + self.config.registry.registerUtility(policy, IAuthenticationPolicy) + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + + def test_secured_view_authz_policy_no_authn_policy(self): + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = {} + from pyramid.interfaces import IAuthorizationPolicy + policy = DummySecurityPolicy(False) + self.config.registry.registerUtility(policy, IAuthorizationPolicy) + result = self.config._derive_view(view, permission='view') + self.assertEqual(view.__module__, result.__module__) + self.assertEqual(view.__doc__, result.__doc__) + self.assertEqual(view.__name__, result.__name__) + self.assertFalse(hasattr(result, '__call_permissive__')) + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + self.assertEqual(result(None, request), response) + + def test_secured_view_raises_forbidden_no_name(self): + from pyramid.interfaces import IAuthenticationPolicy + from pyramid.interfaces import IAuthorizationPolicy + from pyramid.httpexceptions import HTTPForbidden + response = DummyResponse() + view = lambda *arg: response + self.config.registry.settings = {} + policy = DummySecurityPolicy(False) + self.config.registry.registerUtility(policy, IAuthenticationPolicy) + self.config.registry.registerUtility(policy, IAuthorizationPolicy) + result = self.config._derive_view(view, permission='view') + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + try: + result(None, request) + except HTTPForbidden as e: + self.assertEqual(e.message, + 'Unauthorized: failed permission check') + else: # pragma: no cover + raise AssertionError + + def test_secured_view_raises_forbidden_with_name(self): + from pyramid.interfaces import IAuthenticationPolicy + from pyramid.interfaces import IAuthorizationPolicy + from pyramid.httpexceptions import HTTPForbidden + def myview(request): pass + self.config.registry.settings = {} + policy = DummySecurityPolicy(False) + self.config.registry.registerUtility(policy, IAuthenticationPolicy) + self.config.registry.registerUtility(policy, IAuthorizationPolicy) + result = self.config._derive_view(myview, permission='view') + request = self._makeRequest() + request.view_name = 'view_name' + request.url = 'url' + try: + result(None, request) + except HTTPForbidden as e: + self.assertEqual(e.message, + 'Unauthorized: myview failed permission check') + else: # pragma: no cover + raise AssertionError + + def test_predicate_mismatch_view_has_no_name(self): + from pyramid.exceptions import PredicateMismatch + response = DummyResponse() + view = lambda *arg: response + def predicate1(context, request): + return False + predicate1.text = lambda *arg: 'text' + result = self.config._derive_view(view, predicates=[predicate1]) + request = self._makeRequest() + request.method = 'POST' + try: + result(None, None) + except PredicateMismatch as e: + self.assertEqual(e.detail, + 'predicate mismatch for view (text)') + else: # pragma: no cover + raise AssertionError + + def test_predicate_mismatch_view_has_name(self): + from pyramid.exceptions import PredicateMismatch + def myview(request): pass + def predicate1(context, request): + return False + predicate1.text = lambda *arg: 'text' + result = self.config._derive_view(myview, predicates=[predicate1]) + request = self._makeRequest() + request.method = 'POST' + try: + result(None, None) + except PredicateMismatch as e: + self.assertEqual(e.detail, + 'predicate mismatch for view myview (text)') + else: # pragma: no cover + raise AssertionError + + def test_predicate_mismatch_exception_has_text_in_detail(self): + from pyramid.exceptions import PredicateMismatch + def myview(request): pass + def predicate1(context, request): + return True + predicate1.text = lambda *arg: 'pred1' + def predicate2(context, request): + return False + predicate2.text = lambda *arg: 'pred2' + result = self.config._derive_view(myview, + predicates=[predicate1, predicate2]) + request = self._makeRequest() + request.method = 'POST' + try: + result(None, None) + except PredicateMismatch as e: + self.assertEqual(e.detail, + 'predicate mismatch for view myview (pred2)') + else: # pragma: no cover + raise AssertionError + + def test_with_predicates_all(self): + response = DummyResponse() + view = lambda *arg: response + predicates = [] + def predicate1(context, request): + predicates.append(True) + return True + def predicate2(context, request): + predicates.append(True) + return True + result = self.config._derive_view(view, + predicates=[predicate1, predicate2]) + request = self._makeRequest() + request.method = 'POST' + next = result(None, None) + self.assertEqual(next, response) + self.assertEqual(predicates, [True, True]) + + def test_with_predicates_checker(self): + view = lambda *arg: 'OK' + predicates = [] + def predicate1(context, request): + predicates.append(True) + return True + def predicate2(context, request): + predicates.append(True) + return True + result = self.config._derive_view(view, + predicates=[predicate1, predicate2]) + request = self._makeRequest() + request.method = 'POST' + next = result.__predicated__(None, None) + self.assertEqual(next, True) + self.assertEqual(predicates, [True, True]) + + def test_with_predicates_notall(self): + from pyramid.httpexceptions import HTTPNotFound + view = lambda *arg: 'OK' + predicates = [] + def predicate1(context, request): + predicates.append(True) + return True + predicate1.text = lambda *arg: 'text' + def predicate2(context, request): + predicates.append(True) + return False + predicate2.text = lambda *arg: 'text' + result = self.config._derive_view(view, + predicates=[predicate1, predicate2]) + request = self._makeRequest() + request.method = 'POST' + self.assertRaises(HTTPNotFound, result, None, None) + self.assertEqual(predicates, [True, True]) + + def test_with_wrapper_viewname(self): + from pyramid.response import Response + from pyramid.interfaces import IView + from pyramid.interfaces import IViewClassifier + inner_response = Response('OK') + def inner_view(context, request): + return inner_response + def outer_view(context, request): + self.assertEqual(request.wrapped_response, inner_response) + self.assertEqual(request.wrapped_body, inner_response.body) + self.assertEqual(request.wrapped_view.__original_view__, + inner_view) + return Response(b'outer ' + request.wrapped_body) + self.config.registry.registerAdapter( + outer_view, (IViewClassifier, None, None), IView, 'owrap') + result = self.config._derive_view(inner_view, viewname='inner', + wrapper_viewname='owrap') + self.assertFalse(result is inner_view) + self.assertEqual(inner_view.__module__, result.__module__) + self.assertEqual(inner_view.__doc__, result.__doc__) + request = self._makeRequest() + response = result(None, request) + self.assertEqual(response.body, b'outer OK') + + def test_with_wrapper_viewname_notfound(self): + from pyramid.response import Response + inner_response = Response('OK') + def inner_view(context, request): + return inner_response + wrapped = self.config._derive_view(inner_view, viewname='inner', + wrapper_viewname='owrap') + request = self._makeRequest() + self.assertRaises(ValueError, wrapped, None, request) + + def test_as_newstyle_class_context_and_request_attr_and_renderer(self): + response = DummyResponse() + class renderer(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, {'a':'1'}) + self.assertEqual(view_inst.__class__, View) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + class View(object): + def __init__(self, context, request): + pass + def index(self): + return {'a':'1'} + result = self.config._derive_view(View, + renderer=renderer(), attr='index') + self.assertFalse(result is View) + self.assertEqual(result.__module__, View.__module__) + self.assertEqual(result.__doc__, View.__doc__) + self.assertEqual(result.__name__, View.__name__) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_as_newstyle_class_requestonly_attr_and_renderer(self): + response = DummyResponse() + class renderer(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, {'a':'1'}) + self.assertEqual(view_inst.__class__, View) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + class View(object): + def __init__(self, request): + pass + def index(self): + return {'a':'1'} + result = self.config.derive_view(View, + renderer=renderer(), attr='index') + self.assertFalse(result is View) + self.assertEqual(result.__module__, View.__module__) + self.assertEqual(result.__doc__, View.__doc__) + self.assertEqual(result.__name__, View.__name__) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_as_oldstyle_cls_context_request_attr_and_renderer(self): + response = DummyResponse() + class renderer(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, {'a':'1'}) + self.assertEqual(view_inst.__class__, View) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + class View: + def __init__(self, context, request): + pass + def index(self): + return {'a':'1'} + result = self.config.derive_view(View, + renderer=renderer(), attr='index') + self.assertFalse(result is View) + self.assertEqual(result.__module__, View.__module__) + self.assertEqual(result.__doc__, View.__doc__) + self.assertEqual(result.__name__, View.__name__) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_as_oldstyle_cls_requestonly_attr_and_renderer(self): + response = DummyResponse() + class renderer(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, {'a':'1'}) + self.assertEqual(view_inst.__class__, View) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + class View: + def __init__(self, request): + pass + def index(self): + return {'a':'1'} + result = self.config.derive_view(View, + renderer=renderer(), attr='index') + self.assertFalse(result is View) + self.assertEqual(result.__module__, View.__module__) + self.assertEqual(result.__doc__, View.__doc__) + self.assertEqual(result.__name__, View.__name__) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_as_instance_context_and_request_attr_and_renderer(self): + response = DummyResponse() + class renderer(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, {'a':'1'}) + self.assertEqual(view_inst, view) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + class View: + def index(self, context, request): + return {'a':'1'} + view = View() + result = self.config.derive_view(view, + renderer=renderer(), attr='index') + self.assertFalse(result is view) + self.assertEqual(result.__module__, view.__module__) + self.assertEqual(result.__doc__, view.__doc__) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_as_instance_requestonly_attr_and_renderer(self): + response = DummyResponse() + class renderer(object): + def render_view(inself, req, resp, view_inst, ctx): + self.assertEqual(req, request) + self.assertEqual(resp, {'a':'1'}) + self.assertEqual(view_inst, view) + self.assertEqual(ctx, context) + return response + def clone(self): + return self + class View: + def index(self, request): + return {'a':'1'} + view = View() + result = self.config.derive_view(view, + renderer=renderer(), attr='index') + self.assertFalse(result is view) + self.assertEqual(result.__module__, view.__module__) + self.assertEqual(result.__doc__, view.__doc__) + request = self._makeRequest() + context = testing.DummyResource() + self.assertEqual(result(context, request), response) + + def test_with_view_mapper_config_specified(self): + response = DummyResponse() + class mapper(object): + def __init__(self, **kw): + self.kw = kw + def __call__(self, view): + def wrapped(context, request): + return response + return wrapped + def view(context, request): return 'NOTOK' + result = self.config._derive_view(view, mapper=mapper) + self.assertFalse(result.__wraps__ is view) + self.assertEqual(result(None, None), response) + + def test_with_view_mapper_view_specified(self): + from pyramid.response import Response + response = Response() + def mapper(**kw): + def inner(view): + def superinner(context, request): + self.assertEqual(request, None) + return response + return superinner + return inner + def view(context, request): return 'NOTOK' + view.__view_mapper__ = mapper + result = self.config.derive_view(view) + self.assertFalse(result.__wraps__ is view) + self.assertEqual(result(None, None), response) + + def test_with_view_mapper_default_mapper_specified(self): + from pyramid.response import Response + response = Response() + def mapper(**kw): + def inner(view): + def superinner(context, request): + self.assertEqual(request, None) + return response + return superinner + return inner + self.config.set_view_mapper(mapper) + def view(context, request): return 'NOTOK' + result = self.config.derive_view(view) + self.assertFalse(result.__wraps__ is view) + self.assertEqual(result(None, None), response) + + def test_attr_wrapped_view_branching_default_phash(self): + from pyramid.config.util import DEFAULT_PHASH + def view(context, request): pass + result = self.config._derive_view(view, phash=DEFAULT_PHASH) + self.assertEqual(result.__wraps__, view) + + def test_attr_wrapped_view_branching_nondefault_phash(self): + def view(context, request): pass + result = self.config._derive_view(view, phash='nondefault') + self.assertNotEqual(result, view) + + def test_http_cached_view_integer(self): + import datetime + from pyramid.response import Response + response = Response('OK') + def inner_view(context, request): + return response + result = self.config._derive_view(inner_view, http_cache=3600) + self.assertFalse(result is inner_view) + self.assertEqual(inner_view.__module__, result.__module__) + self.assertEqual(inner_view.__doc__, result.__doc__) + request = self._makeRequest() + when = datetime.datetime.utcnow() + datetime.timedelta(hours=1) + result = result(None, request) + self.assertEqual(result, response) + headers = dict(result.headerlist) + expires = parse_httpdate(headers['Expires']) + assert_similar_datetime(expires, when) + self.assertEqual(headers['Cache-Control'], 'max-age=3600') + + def test_http_cached_view_timedelta(self): + import datetime + from pyramid.response import Response + response = Response('OK') + def inner_view(context, request): + return response + result = self.config._derive_view(inner_view, + http_cache=datetime.timedelta(hours=1)) + self.assertFalse(result is inner_view) + self.assertEqual(inner_view.__module__, result.__module__) + self.assertEqual(inner_view.__doc__, result.__doc__) + request = self._makeRequest() + when = datetime.datetime.utcnow() + datetime.timedelta(hours=1) + result = result(None, request) + self.assertEqual(result, response) + headers = dict(result.headerlist) + expires = parse_httpdate(headers['Expires']) + assert_similar_datetime(expires, when) + self.assertEqual(headers['Cache-Control'], 'max-age=3600') + + def test_http_cached_view_tuple(self): + import datetime + from pyramid.response import Response + response = Response('OK') + def inner_view(context, request): + return response + result = self.config._derive_view(inner_view, + http_cache=(3600, {'public':True})) + self.assertFalse(result is inner_view) + self.assertEqual(inner_view.__module__, result.__module__) + self.assertEqual(inner_view.__doc__, result.__doc__) + request = self._makeRequest() + when = datetime.datetime.utcnow() + datetime.timedelta(hours=1) + result = result(None, request) + self.assertEqual(result, response) + headers = dict(result.headerlist) + expires = parse_httpdate(headers['Expires']) + assert_similar_datetime(expires, when) + self.assertEqual(headers['Cache-Control'], 'max-age=3600, public') + + def test_http_cached_view_tuple_seconds_None(self): + from pyramid.response import Response + response = Response('OK') + def inner_view(context, request): + return response + result = self.config._derive_view(inner_view, + http_cache=(None, {'public':True})) + self.assertFalse(result is inner_view) + self.assertEqual(inner_view.__module__, result.__module__) + self.assertEqual(inner_view.__doc__, result.__doc__) + request = self._makeRequest() + result = result(None, request) + self.assertEqual(result, response) + headers = dict(result.headerlist) + self.assertFalse('Expires' in headers) + self.assertEqual(headers['Cache-Control'], 'public') + + def test_http_cached_view_prevent_auto_set(self): + from pyramid.response import Response + response = Response() + response.cache_control.prevent_auto = True + def inner_view(context, request): + return response + result = self.config._derive_view(inner_view, http_cache=3600) + request = self._makeRequest() + result = result(None, request) + self.assertEqual(result, response) # doesn't blow up + headers = dict(result.headerlist) + self.assertFalse('Expires' in headers) + self.assertFalse('Cache-Control' in headers) + + def test_http_cached_prevent_http_cache_in_settings(self): + self.config.registry.settings['prevent_http_cache'] = True + from pyramid.response import Response + response = Response() + def inner_view(context, request): + return response + result = self.config._derive_view(inner_view, http_cache=3600) + request = self._makeRequest() + result = result(None, request) + self.assertEqual(result, response) + headers = dict(result.headerlist) + self.assertFalse('Expires' in headers) + self.assertFalse('Cache-Control' in headers) + + def test_http_cached_view_bad_tuple(self): + def view(request): pass + self.assertRaises(ConfigurationError, self.config._derive_view, + view, http_cache=(None,)) + + +class TestDerivationOrder(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def tearDown(self): + self.config = None + testing.tearDown() + + def test_right_order_user_sorted(self): + from pyramid.interfaces import IViewDerivers + + self.config.add_view_deriver(None, 'deriv1') + self.config.add_view_deriver(None, 'deriv2', over='deriv1') + self.config.add_view_deriver(None, 'deriv3', under='deriv2') + + derivers = self.config.registry.getUtility(IViewDerivers) + derivers_sorted = derivers.sorted() + dlist = [d for (d, _) in derivers_sorted] + self.assertEqual([ + 'authdebug_view', + 'secured_view', + 'owrapped_view', + 'http_cached_view', + 'decorated_view', + 'deriv2', + 'deriv3', + 'deriv1', + 'rendered_view', + ], dlist) + + def test_right_order_implicit(self): + from pyramid.interfaces import IViewDerivers + + self.config.add_view_deriver(None, 'deriv1') + self.config.add_view_deriver(None, 'deriv2') + self.config.add_view_deriver(None, 'deriv3') + + derivers = self.config.registry.getUtility(IViewDerivers) + derivers_sorted = derivers.sorted() + dlist = [d for (d, _) in derivers_sorted] + self.assertEqual([ + 'authdebug_view', + 'secured_view', + 'owrapped_view', + 'http_cached_view', + 'decorated_view', + 'deriv3', + 'deriv2', + 'deriv1', + 'rendered_view', + ], dlist) + + def test_right_order_under_rendered_view(self): + from pyramid.interfaces import IViewDerivers + + self.config.add_view_deriver(None, 'deriv1', under='rendered_view') + + derivers = self.config.registry.getUtility(IViewDerivers) + derivers_sorted = derivers.sorted() + dlist = [d for (d, _) in derivers_sorted] + self.assertEqual([ + 'authdebug_view', + 'secured_view', + 'owrapped_view', + 'http_cached_view', + 'decorated_view', + 'rendered_view', + 'deriv1', + ], dlist) + + + def test_right_order_under_rendered_view_others(self): + from pyramid.interfaces import IViewDerivers + + self.config.add_view_deriver(None, 'deriv1', under='rendered_view') + self.config.add_view_deriver(None, 'deriv2') + self.config.add_view_deriver(None, 'deriv3') + + derivers = self.config.registry.getUtility(IViewDerivers) + derivers_sorted = derivers.sorted() + dlist = [d for (d, _) in derivers_sorted] + self.assertEqual([ + 'authdebug_view', + 'secured_view', + 'owrapped_view', + 'http_cached_view', + 'decorated_view', + 'deriv3', + 'deriv2', + 'rendered_view', + 'deriv1', + ], dlist) + + +class TestAddDeriver(unittest.TestCase): + + def setUp(self): + self.config = testing.setUp() + + def tearDown(self): + self.config = None + testing.tearDown() + + def test_add_single_deriver(self): + response = DummyResponse() + response.deriv = False + view = lambda *arg: response + + def deriv(view, info): + self.assertFalse(response.deriv) + response.deriv = True + return view + + result = self.config._derive_view(view) + self.assertFalse(response.deriv) + self.config.add_view_deriver(deriv, 'test_deriv') + + result = self.config._derive_view(view) + self.assertTrue(response.deriv) + + def test_override_deriver(self): + flags = {} + + class AView: + def __init__(self): + self.response = DummyResponse() + + def deriv1(view, value, **kw): + flags['deriv1'] = True + return view + + def deriv2(view, value, **kw): + flags['deriv2'] = True + return view + + view1 = AView() + self.config.add_view_deriver(deriv1, 'test_deriv') + result = self.config._derive_view(view1) + self.assertTrue(flags.get('deriv1')) + self.assertFalse(flags.get('deriv2')) + + flags.clear() + view2 = AView() + self.config.add_view_deriver(deriv2, 'test_deriv') + result = self.config._derive_view(view2) + self.assertFalse(flags.get('deriv1')) + self.assertTrue(flags.get('deriv2')) + + def test_add_multi_derivers_ordered(self): + response = DummyResponse() + view = lambda *arg: response + response.deriv = [] + + def deriv1(view, value, **kw): + response.deriv.append('deriv1') + return view + + def deriv2(view, value, **kw): + response.deriv.append('deriv2') + return view + + def deriv3(view, value, **kw): + response.deriv.append('deriv3') + return view + + self.config.add_view_deriver(deriv1, 'deriv1') + self.config.add_view_deriver(deriv2, 'deriv2', under='deriv1') + self.config.add_view_deriver(deriv3, 'deriv3', over='deriv2') + result = self.config._derive_view(view) + self.assertEqual(response.deriv, ['deriv2', 'deriv3', 'deriv1']) + + +class TestDeriverIntegration(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def tearDown(self): + self.config = None + testing.tearDown() + + def _getViewCallable(self, config, ctx_iface=None, request_iface=None, + name=''): + from zope.interface import Interface + from pyramid.interfaces import IRequest + from pyramid.interfaces import IView + from pyramid.interfaces import IViewClassifier + from pyramid.interfaces import IExceptionViewClassifier + classifier = IViewClassifier + if ctx_iface is None: + ctx_iface = Interface + if request_iface is None: + request_iface = IRequest + return config.registry.adapters.lookup( + (classifier, request_iface, ctx_iface), IView, name=name, + default=None) + + def _makeRequest(self, config): + request = DummyRequest() + request.registry = config.registry + return request + + def test_view_options(self): + response = DummyResponse() + view = lambda *arg: response + response.deriv = [] + + def deriv1(view, info): + response.deriv.append(info.options['deriv1']) + return view + deriv1.options = ('deriv1',) + + def deriv2(view, info): + response.deriv.append(info.options['deriv2']) + return view + deriv2.options = ('deriv2',) + + self.config.add_view_deriver(deriv1, 'deriv1') + self.config.add_view_deriver(deriv2, 'deriv2') + self.config.add_view(view, deriv1='test1', deriv2='test2') + + wrapper = self._getViewCallable(self.config) + request = self._makeRequest(self.config) + request.method = 'GET' + self.assertEqual(wrapper(None, request), response) + self.assertEqual(['test1', 'test2'], response.deriv) + + def test_unexpected_view_options(self): + from pyramid.exceptions import ConfigurationError + def deriv1(view, info): pass + self.config.add_view_deriver(deriv1, 'deriv1') + self.assertRaises( + ConfigurationError, + lambda: self.config.add_view(lambda r: {}, deriv1='test1')) + +@implementer(IResponse) +class DummyResponse(object): + content_type = None + default_content_type = None + body = None + +class DummyRequest: + subpath = () + matchdict = None + request_iface = IRequest + + def __init__(self, environ=None): + if environ is None: + environ = {} + self.environ = environ + self.params = {} + self.cookies = {} + self.response = DummyResponse() + +class DummyLogger: + def __init__(self): + self.messages = [] + def info(self, msg): + self.messages.append(msg) + warn = info + debug = info + +class DummySecurityPolicy: + def __init__(self, permitted=True): + self.permitted = permitted + + def effective_principals(self, request): + return [] + + def permits(self, context, principals, permission): + return self.permitted + +def parse_httpdate(s): + import datetime + # cannot use %Z, must use literal GMT; Jython honors timezone + # but CPython does not + return datetime.datetime.strptime(s, "%a, %d %b %Y %H:%M:%S GMT") + +def assert_similar_datetime(one, two): + for attr in ('year', 'month', 'day', 'hour', 'minute'): + one_attr = getattr(one, attr) + two_attr = getattr(two, attr) + if not one_attr == two_attr: # pragma: no cover + raise AssertionError('%r != %r in %s' % (one_attr, two_attr, attr)) diff --git a/pyramid/viewderivers.py b/pyramid/viewderivers.py new file mode 100644 index 000000000..99baf46f9 --- /dev/null +++ b/pyramid/viewderivers.py @@ -0,0 +1,453 @@ +import inspect + +from zope.interface import ( + implementer, + provider, + ) + +from pyramid.security import NO_PERMISSION_REQUIRED +from pyramid.response import Response + +from pyramid.interfaces import ( + IAuthenticationPolicy, + IAuthorizationPolicy, + IDebugLogger, + IResponse, + IViewMapper, + IViewMapperFactory, + ) + +from pyramid.compat import ( + is_bound_method, + is_unbound_method, + ) + +from pyramid.config.util import ( + DEFAULT_PHASH, + MAX_ORDER, + takes_one_arg, + ) + +from pyramid.exceptions import ( + ConfigurationError, + PredicateMismatch, + ) +from pyramid.httpexceptions import HTTPForbidden +from pyramid.util import object_description +from pyramid.view import render_view_to_response +from pyramid import renderers + + +def view_description(view): + try: + return view.__text__ + except AttributeError: + # custom view mappers might not add __text__ + return object_description(view) + +def requestonly(view, attr=None): + return takes_one_arg(view, attr=attr, argname='request') + +@implementer(IViewMapper) +@provider(IViewMapperFactory) +class DefaultViewMapper(object): + def __init__(self, **kw): + self.attr = kw.get('attr') + + def __call__(self, view): + if is_unbound_method(view) and self.attr is None: + raise ConfigurationError(( + 'Unbound method calls are not supported, please set the class ' + 'as your `view` and the method as your `attr`' + )) + + if inspect.isclass(view): + view = self.map_class(view) + else: + view = self.map_nonclass(view) + return view + + def map_class(self, view): + ronly = requestonly(view, self.attr) + if ronly: + mapped_view = self.map_class_requestonly(view) + else: + mapped_view = self.map_class_native(view) + mapped_view.__text__ = 'method %s of %s' % ( + self.attr or '__call__', object_description(view)) + return mapped_view + + def map_nonclass(self, view): + # We do more work here than appears necessary to avoid wrapping the + # view unless it actually requires wrapping (to avoid function call + # overhead). + mapped_view = view + ronly = requestonly(view, self.attr) + if ronly: + mapped_view = self.map_nonclass_requestonly(view) + elif self.attr: + mapped_view = self.map_nonclass_attr(view) + if inspect.isroutine(mapped_view): + # This branch will be true if the view is a function or a method. + # We potentially mutate an unwrapped object here if it's a + # function. We do this to avoid function call overhead of + # injecting another wrapper. However, we must wrap if the + # function is a bound method because we can't set attributes on a + # bound method. + if is_bound_method(view): + _mapped_view = mapped_view + def mapped_view(context, request): + return _mapped_view(context, request) + if self.attr is not None: + mapped_view.__text__ = 'attr %s of %s' % ( + self.attr, object_description(view)) + else: + mapped_view.__text__ = object_description(view) + return mapped_view + + def map_class_requestonly(self, view): + # its a class that has an __init__ which only accepts request + attr = self.attr + def _class_requestonly_view(context, request): + inst = view(request) + request.__view__ = inst + if attr is None: + response = inst() + else: + response = getattr(inst, attr)() + return response + return _class_requestonly_view + + def map_class_native(self, view): + # its a class that has an __init__ which accepts both context and + # request + attr = self.attr + def _class_view(context, request): + inst = view(context, request) + request.__view__ = inst + if attr is None: + response = inst() + else: + response = getattr(inst, attr)() + return response + return _class_view + + def map_nonclass_requestonly(self, view): + # its a function that has a __call__ which accepts only a single + # request argument + attr = self.attr + def _requestonly_view(context, request): + if attr is None: + response = view(request) + else: + response = getattr(view, attr)(request) + return response + return _requestonly_view + + def map_nonclass_attr(self, view): + # its a function that has a __call__ which accepts both context and + # request, but still has an attr + def _attr_view(context, request): + response = getattr(view, self.attr)(context, request) + return response + return _attr_view + + +def wraps_view(wrapper): + def inner(view, info): + wrapper_view = wrapper(view, info) + return preserve_view_attrs(view, wrapper_view) + return inner + +def preserve_view_attrs(view, wrapper): + if view is None: + return wrapper + + if wrapper is view: + return view + + original_view = getattr(view, '__original_view__', None) + + if original_view is None: + original_view = view + + wrapper.__wraps__ = view + wrapper.__original_view__ = original_view + wrapper.__module__ = view.__module__ + wrapper.__doc__ = view.__doc__ + + try: + wrapper.__name__ = view.__name__ + except AttributeError: + wrapper.__name__ = repr(view) + + # attrs that may not exist on "view", but, if so, must be attached to + # "wrapped view" + for attr in ('__permitted__', '__call_permissive__', '__permission__', + '__predicated__', '__predicates__', '__accept__', + '__order__', '__text__'): + try: + setattr(wrapper, attr, getattr(view, attr)) + except AttributeError: + pass + + return wrapper + +def mapped_view(view, info): + mapper = info.options.get('mapper') + if mapper is None: + mapper = getattr(view, '__view_mapper__', None) + if mapper is None: + mapper = info.registry.queryUtility(IViewMapperFactory) + if mapper is None: + mapper = DefaultViewMapper + + mapped_view = mapper(**info.options)(view) + return mapped_view + +mapped_view.options = ('mapper', 'attr') + +def owrapped_view(view, info): + wrapper_viewname = info.options.get('wrapper') + viewname = info.options.get('name') + if not wrapper_viewname: + return view + def _owrapped_view(context, request): + response = view(context, request) + request.wrapped_response = response + request.wrapped_body = response.body + request.wrapped_view = view + wrapped_response = render_view_to_response(context, request, + wrapper_viewname) + if wrapped_response is None: + raise ValueError( + 'No wrapper view named %r found when executing view ' + 'named %r' % (wrapper_viewname, viewname)) + return wrapped_response + return _owrapped_view + +owrapped_view.options = ('name', 'wrapper') + +def http_cached_view(view, info): + if info.settings.get('prevent_http_cache', False): + return view + + seconds = info.options.get('http_cache') + + if seconds is None: + return view + + options = {} + + if isinstance(seconds, (tuple, list)): + try: + seconds, options = seconds + except ValueError: + raise ConfigurationError( + 'If http_cache parameter is a tuple or list, it must be ' + 'in the form (seconds, options); not %s' % (seconds,)) + + def wrapper(context, request): + response = view(context, request) + prevent_caching = getattr(response.cache_control, 'prevent_auto', + False) + if not prevent_caching: + response.cache_expires(seconds, **options) + return response + + return wrapper + +http_cached_view.options = ('http_cache',) + +def secured_view(view, info): + permission = info.options.get('permission') + if permission == NO_PERMISSION_REQUIRED: + # allow views registered within configurations that have a + # default permission to explicitly override the default + # permission, replacing it with no permission at all + permission = None + + wrapped_view = view + authn_policy = info.registry.queryUtility(IAuthenticationPolicy) + authz_policy = info.registry.queryUtility(IAuthorizationPolicy) + + if authn_policy and authz_policy and (permission is not None): + def _permitted(context, request): + principals = authn_policy.effective_principals(request) + return authz_policy.permits(context, principals, permission) + def _secured_view(context, request): + result = _permitted(context, request) + if result: + return view(context, request) + view_name = getattr(view, '__name__', view) + msg = getattr( + request, 'authdebug_message', + 'Unauthorized: %s failed permission check' % view_name) + raise HTTPForbidden(msg, result=result) + _secured_view.__call_permissive__ = view + _secured_view.__permitted__ = _permitted + _secured_view.__permission__ = permission + wrapped_view = _secured_view + + return wrapped_view + +secured_view.options = ('permission',) + +def authdebug_view(view, info): + wrapped_view = view + settings = info.settings + permission = info.options.get('permission') + authn_policy = info.registry.queryUtility(IAuthenticationPolicy) + authz_policy = info.registry.queryUtility(IAuthorizationPolicy) + logger = info.registry.queryUtility(IDebugLogger) + if settings and settings.get('debug_authorization', False): + def _authdebug_view(context, request): + view_name = getattr(request, 'view_name', None) + + if authn_policy and authz_policy: + if permission is NO_PERMISSION_REQUIRED: + msg = 'Allowed (NO_PERMISSION_REQUIRED)' + elif permission is None: + msg = 'Allowed (no permission registered)' + else: + principals = authn_policy.effective_principals(request) + msg = str(authz_policy.permits( + context, principals, permission)) + else: + msg = 'Allowed (no authorization policy in use)' + + view_name = getattr(request, 'view_name', None) + url = getattr(request, 'url', None) + msg = ('debug_authorization of url %s (view name %r against ' + 'context %r): %s' % (url, view_name, context, msg)) + if logger: + logger.debug(msg) + if request is not None: + request.authdebug_message = msg + return view(context, request) + + wrapped_view = _authdebug_view + + return wrapped_view + +authdebug_view.options = ('permission',) + +def predicated_view(view, info): + preds = info.predicates + if not preds: + return view + def predicate_wrapper(context, request): + for predicate in preds: + if not predicate(context, request): + view_name = getattr(view, '__name__', view) + raise PredicateMismatch( + 'predicate mismatch for view %s (%s)' % ( + view_name, predicate.text())) + return view(context, request) + def checker(context, request): + return all((predicate(context, request) for predicate in + preds)) + predicate_wrapper.__predicated__ = checker + predicate_wrapper.__predicates__ = preds + return predicate_wrapper + +def attr_wrapped_view(view, info): + accept, order, phash = (info.options.get('accept', None), + getattr(info, 'order', MAX_ORDER), + getattr(info, 'phash', DEFAULT_PHASH)) + # this is a little silly but we don't want to decorate the original + # function with attributes that indicate accept, order, and phash, + # so we use a wrapper + if ( + (accept is None) and + (order == MAX_ORDER) and + (phash == DEFAULT_PHASH) + ): + return view # defaults + def attr_view(context, request): + return view(context, request) + attr_view.__accept__ = accept + attr_view.__order__ = order + attr_view.__phash__ = phash + attr_view.__view_attr__ = info.options.get('attr') + attr_view.__permission__ = info.options.get('permission') + return attr_view + +attr_wrapped_view.options = ('accept', 'attr', 'permission') + +def rendered_view(view, info): + # one way or another this wrapper must produce a Response (unless + # the renderer is a NullRendererHelper) + renderer = info.options.get('renderer') + if renderer is None: + # register a default renderer if you want super-dynamic + # rendering. registering a default renderer will also allow + # override_renderer to work if a renderer is left unspecified for + # a view registration. + def viewresult_to_response(context, request): + result = view(context, request) + if result.__class__ is Response: # common case + response = result + else: + response = info.registry.queryAdapterOrSelf(result, IResponse) + if response is None: + if result is None: + append = (' You may have forgotten to return a value ' + 'from the view callable.') + elif isinstance(result, dict): + append = (' You may have forgotten to define a ' + 'renderer in the view configuration.') + else: + append = '' + + msg = ('Could not convert return value of the view ' + 'callable %s into a response object. ' + 'The value returned was %r.' + append) + + raise ValueError(msg % (view_description(view), result)) + + return response + + return viewresult_to_response + + if renderer is renderers.null_renderer: + return view + + def rendered_view(context, request): + result = view(context, request) + if result.__class__ is Response: # potential common case + response = result + else: + # this must adapt, it can't do a simple interface check + # (avoid trying to render webob responses) + response = info.registry.queryAdapterOrSelf(result, IResponse) + if response is None: + attrs = getattr(request, '__dict__', {}) + if 'override_renderer' in attrs: + # renderer overridden by newrequest event or other + renderer_name = attrs.pop('override_renderer') + view_renderer = renderers.RendererHelper( + name=renderer_name, + package=info.package, + registry=info.registry) + else: + view_renderer = renderer.clone() + if '__view__' in attrs: + view_inst = attrs.pop('__view__') + else: + view_inst = getattr(view, '__original_view__', view) + response = view_renderer.render_view( + request, result, view_inst, context) + return response + + return rendered_view + +rendered_view.options = ('renderer',) + +def decorated_view(view, info): + decorator = info.options.get('decorator') + if decorator is None: + return view + return decorator(view) + +decorated_view.options = ('decorator',) -- cgit v1.2.3