diff options
| author | Chris McDonough <chrism@agendaless.com> | 2009-11-21 03:57:56 +0000 |
|---|---|---|
| committer | Chris McDonough <chrism@agendaless.com> | 2009-11-21 03:57:56 +0000 |
| commit | 2a264f946dd98ab423b458ba8e2c1ec3b55052e1 (patch) | |
| tree | a62e2118769ab8a5cf18a05d691c85c0f4f9101f /repoze/bfg/configuration.py | |
| parent | 75d5a60251c9d7bdb06cdfdc01e93241ffa29cba (diff) | |
| download | pyramid-2a264f946dd98ab423b458ba8e2c1ec3b55052e1.tar.gz pyramid-2a264f946dd98ab423b458ba8e2c1ec3b55052e1.tar.bz2 pyramid-2a264f946dd98ab423b458ba8e2c1ec3b55052e1.zip | |
Turn wrapper methods back into functions to avoid keeping around references
to Configurator in closures.
Diffstat (limited to 'repoze/bfg/configuration.py')
| -rw-r--r-- | repoze/bfg/configuration.py | 409 |
1 files changed, 201 insertions, 208 deletions
diff --git a/repoze/bfg/configuration.py b/repoze/bfg/configuration.py index b272bee52..3d4f477e9 100644 --- a/repoze/bfg/configuration.py +++ b/repoze/bfg/configuration.py @@ -67,26 +67,20 @@ class Configurator(object): """ A wrapper around the registry that performs configuration tasks """ def __init__(self, registry=None): self.package = caller_package() - if registry is None: - registry = self.make_default_registry() self.reg = registry - try: - # yes, a cycle; see get_configurator - self.reg['bfg_configurator'] = self - except TypeError: - pass + if registry is None: + registry = Registry(self.package.__name__) + self.reg = registry + self._default_configuration() - def make_default_registry(self): - self.reg = Registry() + def _default_configuration(self): self.renderer(chameleon_zpt.renderer_factory, '.pt') self.renderer(chameleon_text.renderer_factory, '.txt') self.renderer(renderers.json_renderer_factory, 'json') self.renderer(renderers.string_renderer_factory, 'string') - settings = Settings({}) - self.settings(settings) + self.settings(Settings({})) self.root_factory(DefaultRootFactory) self.debug_logger(None) - return self.reg def make_wsgi_app(self, manager=manager, getSiteManager=getSiteManager): # manager and getSiteManager in arglist for testing dep injection only @@ -109,7 +103,7 @@ class Configurator(object): settings=None, debug_logger=None, os=os, lock=threading.Lock()): - self.make_default_registry() + self._default_configuration() # debug_logger, os and lock *only* for unittests if settings is None: @@ -243,213 +237,38 @@ class Configurator(object): self.reg.registerAdapter(multiview, (for_, request_type), IMultiView, name, info=_info) + def derive_view(self, view, permission=None, predicates=(), + attr=None, renderer_name=None, wrapper_viewname=None, + viewname=None): + renderer = self.renderer_from_name(renderer_name) + reg = self.reg + mapped_view = _map_view(reg, view, attr, renderer, renderer_name) + owrapped_view = _owrap_view(reg, mapped_view, viewname,wrapper_viewname) + secured_view = _secure_view(reg, owrapped_view, permission) + debug_view = _authdebug_view(reg, secured_view, permission) + derived_view = _predicate_wrap(reg, debug_view, predicates) + return derived_view + def renderer_from_name(self, path_or_spec): + if path_or_spec is None: + # check for global default renderer + factory = self.reg.queryUtility(IRendererFactory) + if factory is not None: + return factory(path_or_spec) + return None + if '.' in path_or_spec: name = os.path.splitext(path_or_spec)[1] spec = self.make_spec(path_or_spec) else: name = path_or_spec spec = path_or_spec + factory = self.reg.queryUtility(IRendererFactory, name=name) if factory is None: raise ValueError('No renderer for renderer name %r' % name) return factory(spec) - def derive_view(self, original_view, permission=None, predicates=(), - attr=None, renderer_name=None, wrapper_viewname=None, - viewname=None): - mapped_view = self._map_view(original_view, attr, renderer_name) - owrapped_view = self._owrap_view(mapped_view, viewname,wrapper_viewname) - secured_view = self._secure_view(owrapped_view, permission) - debug_view = self._authdebug_view(secured_view, permission) - derived_view = self._predicate_wrap(debug_view, predicates) - return derived_view - - def _map_view(self, view, attr=None, renderer_name=None): - wrapped_view = view - - renderer = None - - if renderer_name is None: - # global default renderer - factory = self.reg.queryUtility(IRendererFactory) - if factory is not None: - renderer_name = '' - renderer = factory(renderer_name) - else: - renderer = self.renderer_from_name(renderer_name) - - if inspect.isclass(view): - # If the object we've located is a class, turn it into a - # function that operates like a Zope view (when it's invoked, - # construct an instance using 'context' and 'request' as - # position arguments, then immediately invoke the __call__ - # method of the instance with no arguments; __call__ should - # return an IResponse). - if requestonly(view, attr): - # its __init__ accepts only a single request argument, - # instead of both context and request - def _bfg_class_requestonly_view(context, request): - inst = view(request) - if attr is None: - response = inst() - else: - response = getattr(inst, attr)() - if renderer is not None: - response = rendered_response(renderer, - response, inst, - context, request, - renderer_name) - return response - wrapped_view = _bfg_class_requestonly_view - else: - # its __init__ accepts both context and request - def _bfg_class_view(context, request): - inst = view(context, request) - if attr is None: - response = inst() - else: - response = getattr(inst, attr)() - if renderer is not None: - response = rendered_response(renderer, - response, inst, - context, request, - renderer_name) - return response - wrapped_view = _bfg_class_view - - elif requestonly(view, attr): - # its __call__ accepts only a single request argument, - # instead of both context and request - def _bfg_requestonly_view(context, request): - if attr is None: - response = view(request) - else: - response = getattr(view, attr)(request) - - if renderer is not None: - response = rendered_response(renderer, - response, view, - context, request, - renderer_name) - return response - wrapped_view = _bfg_requestonly_view - - elif attr: - def _bfg_attr_view(context, request): - response = getattr(view, attr)(context, request) - if renderer is not None: - response = rendered_response(renderer, - response, view, - context, request, - renderer_name) - return response - wrapped_view = _bfg_attr_view - - elif renderer is not None: - def _rendered_view(context, request): - response = view(context, request) - response = rendered_response(renderer, - response, view, - context, request, - renderer_name) - return response - wrapped_view = _rendered_view - - decorate_view(wrapped_view, view) - return wrapped_view - - def _owrap_view(self, view, viewname, wrapper_viewname): - if not wrapper_viewname: - return view - def _owrapped_view(context, request): - response = view(context, request) - request.wrapped_response = response - request.wrapped_body = response.body - request.wrapped_view = view - wrapped_response = render_view_to_response(context, request, - wrapper_viewname) - if wrapped_response is None: - raise ValueError( - 'No wrapper view named %r found when executing view ' - 'named %r' % (wrapper_viewname, viewname)) - return wrapped_response - decorate_view(_owrapped_view, view) - return _owrapped_view - - def _predicate_wrap(self, view, predicates): - if not predicates: - return view - def _wrapped(context, request): - if all((predicate(context, request) for predicate in predicates)): - return view(context, request) - raise NotFound('predicate mismatch for view %s' % view) - def checker(context, request): - return all((predicate(context, request) for predicate in - predicates)) - _wrapped.__predicated__ = checker - decorate_view(_wrapped, view) - return _wrapped - - def _secure_view(self, view, permission): - wrapped_view = view - authn_policy = self.reg.queryUtility(IAuthenticationPolicy) - authz_policy = self.reg.queryUtility(IAuthorizationPolicy) - if authn_policy and authz_policy and (permission is not None): - def _secured_view(context, request): - principals = authn_policy.effective_principals(request) - if authz_policy.permits(context, principals, permission): - return view(context, request) - msg = getattr(request, 'authdebug_message', - 'Unauthorized: %s failed permission check' % view) - raise Forbidden(msg) - _secured_view.__call_permissive__ = view - def _permitted(context, request): - principals = authn_policy.effective_principals(request) - return authz_policy.permits(context, principals, permission) - _secured_view.__permitted__ = _permitted - wrapped_view = _secured_view - decorate_view(wrapped_view, view) - - return wrapped_view - - def _authdebug_view(self, view, permission): - wrapped_view = view - authn_policy = self.reg.queryUtility(IAuthenticationPolicy) - authz_policy = self.reg.queryUtility(IAuthorizationPolicy) - settings = self.reg.queryUtility(ISettings) - debug_authorization = False - if settings is not None: - debug_authorization = settings.get('debug_authorization', False) - if debug_authorization: - def _authdebug_view(context, request): - view_name = getattr(request, 'view_name', None) - - if authn_policy and authz_policy: - if permission is None: - msg = 'Allowed (no permission registered)' - else: - principals = authn_policy.effective_principals(request) - msg = str(authz_policy.permits(context, principals, - permission)) - else: - msg = 'Allowed (no authorization policy in use)' - - view_name = getattr(request, 'view_name', None) - url = getattr(request, 'url', None) - msg = ('debug_authorization of url %s (view name %r against ' - 'context %r): %s' % (url, view_name, context, msg)) - logger = self.reg.queryUtility(ILogger, 'repoze.bfg.debug') - logger and logger.debug(msg) - if request is not None: - request.authdebug_message = msg - return view(context, request) - - wrapped_view = _authdebug_view - decorate_view(wrapped_view, view) - - return wrapped_view - def route(self, name, path, view=None, view_for=None, permission=None, factory=None, for_=None, header=None, xhr=False, accept=None, path_info=None, @@ -869,3 +688,177 @@ def requestonly(class_or_callable, attr=None): return False +def _map_view(registry, view, attr=None, renderer=None, renderer_name=None): + wrapped_view = view + + if inspect.isclass(view): + # If the object we've located is a class, turn it into a + # function that operates like a Zope view (when it's invoked, + # construct an instance using 'context' and 'request' as + # position arguments, then immediately invoke the __call__ + # method of the instance with no arguments; __call__ should + # return an IResponse). + if requestonly(view, attr): + # its __init__ accepts only a single request argument, + # instead of both context and request + def _bfg_class_requestonly_view(context, request): + inst = view(request) + if attr is None: + response = inst() + else: + response = getattr(inst, attr)() + if renderer is not None: + response = rendered_response(renderer, + response, inst, + context, request, + renderer_name) + return response + wrapped_view = _bfg_class_requestonly_view + else: + # its __init__ accepts both context and request + def _bfg_class_view(context, request): + inst = view(context, request) + if attr is None: + response = inst() + else: + response = getattr(inst, attr)() + if renderer is not None: + response = rendered_response(renderer, + response, inst, + context, request, + renderer_name) + return response + wrapped_view = _bfg_class_view + + elif requestonly(view, attr): + # its __call__ accepts only a single request argument, + # instead of both context and request + def _bfg_requestonly_view(context, request): + if attr is None: + response = view(request) + else: + response = getattr(view, attr)(request) + + if renderer is not None: + response = rendered_response(renderer, + response, view, + context, request, + renderer_name) + return response + wrapped_view = _bfg_requestonly_view + + elif attr: + def _bfg_attr_view(context, request): + response = getattr(view, attr)(context, request) + if renderer is not None: + response = rendered_response(renderer, + response, view, + context, request, + renderer_name) + return response + wrapped_view = _bfg_attr_view + + elif renderer is not None: + def _rendered_view(context, request): + response = view(context, request) + response = rendered_response(renderer, + response, view, + context, request, + renderer_name) + return response + wrapped_view = _rendered_view + + decorate_view(wrapped_view, view) + return wrapped_view + +def _owrap_view(registry, view, viewname, wrapper_viewname): + if not wrapper_viewname: + return view + def _owrapped_view(context, request): + response = view(context, request) + request.wrapped_response = response + request.wrapped_body = response.body + request.wrapped_view = view + wrapped_response = render_view_to_response(context, request, + wrapper_viewname) + if wrapped_response is None: + raise ValueError( + 'No wrapper view named %r found when executing view ' + 'named %r' % (wrapper_viewname, viewname)) + return wrapped_response + decorate_view(_owrapped_view, view) + return _owrapped_view + +def _predicate_wrap(registry, view, predicates): + if not predicates: + return view + def _wrapped(context, request): + if all((predicate(context, request) for predicate in predicates)): + return view(context, request) + raise NotFound('predicate mismatch for view %s' % view) + def checker(context, request): + return all((predicate(context, request) for predicate in + predicates)) + _wrapped.__predicated__ = checker + decorate_view(_wrapped, view) + return _wrapped + +def _secure_view(registry, view, permission): + wrapped_view = view + authn_policy = registry.queryUtility(IAuthenticationPolicy) + authz_policy = registry.queryUtility(IAuthorizationPolicy) + if authn_policy and authz_policy and (permission is not None): + def _secured_view(context, request): + principals = authn_policy.effective_principals(request) + if authz_policy.permits(context, principals, permission): + return view(context, request) + msg = getattr(request, 'authdebug_message', + 'Unauthorized: %s failed permission check' % view) + raise Forbidden(msg) + _secured_view.__call_permissive__ = view + def _permitted(context, request): + principals = authn_policy.effective_principals(request) + return authz_policy.permits(context, principals, permission) + _secured_view.__permitted__ = _permitted + wrapped_view = _secured_view + decorate_view(wrapped_view, view) + + return wrapped_view + +def _authdebug_view(registry, view, permission): + wrapped_view = view + authn_policy = registry.queryUtility(IAuthenticationPolicy) + authz_policy = registry.queryUtility(IAuthorizationPolicy) + settings = registry.queryUtility(ISettings) + debug_authorization = False + if settings is not None: + debug_authorization = settings.get('debug_authorization', False) + if debug_authorization: + def _authdebug_view(context, request): + view_name = getattr(request, 'view_name', None) + + if authn_policy and authz_policy: + if permission is None: + msg = 'Allowed (no permission registered)' + else: + principals = authn_policy.effective_principals(request) + msg = str(authz_policy.permits(context, principals, + permission)) + else: + msg = 'Allowed (no authorization policy in use)' + + view_name = getattr(request, 'view_name', None) + url = getattr(request, 'url', None) + msg = ('debug_authorization of url %s (view name %r against ' + 'context %r): %s' % (url, view_name, context, msg)) + logger =registry.queryUtility(ILogger, 'repoze.bfg.debug') + logger and logger.debug(msg) + if request is not None: + request.authdebug_message = msg + return view(context, request) + + wrapped_view = _authdebug_view + decorate_view(wrapped_view, view) + + return wrapped_view + |
