summaryrefslogtreecommitdiff
path: root/repoze/bfg/configuration.py
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2009-11-21 03:57:56 +0000
committerChris McDonough <chrism@agendaless.com>2009-11-21 03:57:56 +0000
commit2a264f946dd98ab423b458ba8e2c1ec3b55052e1 (patch)
treea62e2118769ab8a5cf18a05d691c85c0f4f9101f /repoze/bfg/configuration.py
parent75d5a60251c9d7bdb06cdfdc01e93241ffa29cba (diff)
downloadpyramid-2a264f946dd98ab423b458ba8e2c1ec3b55052e1.tar.gz
pyramid-2a264f946dd98ab423b458ba8e2c1ec3b55052e1.tar.bz2
pyramid-2a264f946dd98ab423b458ba8e2c1ec3b55052e1.zip
Turn wrapper methods back into functions to avoid keeping around references
to Configurator in closures.
Diffstat (limited to 'repoze/bfg/configuration.py')
-rw-r--r--repoze/bfg/configuration.py409
1 files changed, 201 insertions, 208 deletions
diff --git a/repoze/bfg/configuration.py b/repoze/bfg/configuration.py
index b272bee52..3d4f477e9 100644
--- a/repoze/bfg/configuration.py
+++ b/repoze/bfg/configuration.py
@@ -67,26 +67,20 @@ class Configurator(object):
""" A wrapper around the registry that performs configuration tasks """
def __init__(self, registry=None):
self.package = caller_package()
- if registry is None:
- registry = self.make_default_registry()
self.reg = registry
- try:
- # yes, a cycle; see get_configurator
- self.reg['bfg_configurator'] = self
- except TypeError:
- pass
+ if registry is None:
+ registry = Registry(self.package.__name__)
+ self.reg = registry
+ self._default_configuration()
- def make_default_registry(self):
- self.reg = Registry()
+ def _default_configuration(self):
self.renderer(chameleon_zpt.renderer_factory, '.pt')
self.renderer(chameleon_text.renderer_factory, '.txt')
self.renderer(renderers.json_renderer_factory, 'json')
self.renderer(renderers.string_renderer_factory, 'string')
- settings = Settings({})
- self.settings(settings)
+ self.settings(Settings({}))
self.root_factory(DefaultRootFactory)
self.debug_logger(None)
- return self.reg
def make_wsgi_app(self, manager=manager, getSiteManager=getSiteManager):
# manager and getSiteManager in arglist for testing dep injection only
@@ -109,7 +103,7 @@ class Configurator(object):
settings=None, debug_logger=None, os=os,
lock=threading.Lock()):
- self.make_default_registry()
+ self._default_configuration()
# debug_logger, os and lock *only* for unittests
if settings is None:
@@ -243,213 +237,38 @@ class Configurator(object):
self.reg.registerAdapter(multiview, (for_, request_type),
IMultiView, name, info=_info)
+ def derive_view(self, view, permission=None, predicates=(),
+ attr=None, renderer_name=None, wrapper_viewname=None,
+ viewname=None):
+ renderer = self.renderer_from_name(renderer_name)
+ reg = self.reg
+ mapped_view = _map_view(reg, view, attr, renderer, renderer_name)
+ owrapped_view = _owrap_view(reg, mapped_view, viewname,wrapper_viewname)
+ secured_view = _secure_view(reg, owrapped_view, permission)
+ debug_view = _authdebug_view(reg, secured_view, permission)
+ derived_view = _predicate_wrap(reg, debug_view, predicates)
+ return derived_view
+
def renderer_from_name(self, path_or_spec):
+ if path_or_spec is None:
+ # check for global default renderer
+ factory = self.reg.queryUtility(IRendererFactory)
+ if factory is not None:
+ return factory(path_or_spec)
+ return None
+
if '.' in path_or_spec:
name = os.path.splitext(path_or_spec)[1]
spec = self.make_spec(path_or_spec)
else:
name = path_or_spec
spec = path_or_spec
+
factory = self.reg.queryUtility(IRendererFactory, name=name)
if factory is None:
raise ValueError('No renderer for renderer name %r' % name)
return factory(spec)
- def derive_view(self, original_view, permission=None, predicates=(),
- attr=None, renderer_name=None, wrapper_viewname=None,
- viewname=None):
- mapped_view = self._map_view(original_view, attr, renderer_name)
- owrapped_view = self._owrap_view(mapped_view, viewname,wrapper_viewname)
- secured_view = self._secure_view(owrapped_view, permission)
- debug_view = self._authdebug_view(secured_view, permission)
- derived_view = self._predicate_wrap(debug_view, predicates)
- return derived_view
-
- def _map_view(self, view, attr=None, renderer_name=None):
- wrapped_view = view
-
- renderer = None
-
- if renderer_name is None:
- # global default renderer
- factory = self.reg.queryUtility(IRendererFactory)
- if factory is not None:
- renderer_name = ''
- renderer = factory(renderer_name)
- else:
- renderer = self.renderer_from_name(renderer_name)
-
- if inspect.isclass(view):
- # If the object we've located is a class, turn it into a
- # function that operates like a Zope view (when it's invoked,
- # construct an instance using 'context' and 'request' as
- # position arguments, then immediately invoke the __call__
- # method of the instance with no arguments; __call__ should
- # return an IResponse).
- if requestonly(view, attr):
- # its __init__ accepts only a single request argument,
- # instead of both context and request
- def _bfg_class_requestonly_view(context, request):
- inst = view(request)
- if attr is None:
- response = inst()
- else:
- response = getattr(inst, attr)()
- if renderer is not None:
- response = rendered_response(renderer,
- response, inst,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_class_requestonly_view
- else:
- # its __init__ accepts both context and request
- def _bfg_class_view(context, request):
- inst = view(context, request)
- if attr is None:
- response = inst()
- else:
- response = getattr(inst, attr)()
- if renderer is not None:
- response = rendered_response(renderer,
- response, inst,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_class_view
-
- elif requestonly(view, attr):
- # its __call__ accepts only a single request argument,
- # instead of both context and request
- def _bfg_requestonly_view(context, request):
- if attr is None:
- response = view(request)
- else:
- response = getattr(view, attr)(request)
-
- if renderer is not None:
- response = rendered_response(renderer,
- response, view,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_requestonly_view
-
- elif attr:
- def _bfg_attr_view(context, request):
- response = getattr(view, attr)(context, request)
- if renderer is not None:
- response = rendered_response(renderer,
- response, view,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_attr_view
-
- elif renderer is not None:
- def _rendered_view(context, request):
- response = view(context, request)
- response = rendered_response(renderer,
- response, view,
- context, request,
- renderer_name)
- return response
- wrapped_view = _rendered_view
-
- decorate_view(wrapped_view, view)
- return wrapped_view
-
- def _owrap_view(self, view, viewname, wrapper_viewname):
- if not wrapper_viewname:
- return view
- def _owrapped_view(context, request):
- response = view(context, request)
- request.wrapped_response = response
- request.wrapped_body = response.body
- request.wrapped_view = view
- wrapped_response = render_view_to_response(context, request,
- wrapper_viewname)
- if wrapped_response is None:
- raise ValueError(
- 'No wrapper view named %r found when executing view '
- 'named %r' % (wrapper_viewname, viewname))
- return wrapped_response
- decorate_view(_owrapped_view, view)
- return _owrapped_view
-
- def _predicate_wrap(self, view, predicates):
- if not predicates:
- return view
- def _wrapped(context, request):
- if all((predicate(context, request) for predicate in predicates)):
- return view(context, request)
- raise NotFound('predicate mismatch for view %s' % view)
- def checker(context, request):
- return all((predicate(context, request) for predicate in
- predicates))
- _wrapped.__predicated__ = checker
- decorate_view(_wrapped, view)
- return _wrapped
-
- def _secure_view(self, view, permission):
- wrapped_view = view
- authn_policy = self.reg.queryUtility(IAuthenticationPolicy)
- authz_policy = self.reg.queryUtility(IAuthorizationPolicy)
- if authn_policy and authz_policy and (permission is not None):
- def _secured_view(context, request):
- principals = authn_policy.effective_principals(request)
- if authz_policy.permits(context, principals, permission):
- return view(context, request)
- msg = getattr(request, 'authdebug_message',
- 'Unauthorized: %s failed permission check' % view)
- raise Forbidden(msg)
- _secured_view.__call_permissive__ = view
- def _permitted(context, request):
- principals = authn_policy.effective_principals(request)
- return authz_policy.permits(context, principals, permission)
- _secured_view.__permitted__ = _permitted
- wrapped_view = _secured_view
- decorate_view(wrapped_view, view)
-
- return wrapped_view
-
- def _authdebug_view(self, view, permission):
- wrapped_view = view
- authn_policy = self.reg.queryUtility(IAuthenticationPolicy)
- authz_policy = self.reg.queryUtility(IAuthorizationPolicy)
- settings = self.reg.queryUtility(ISettings)
- debug_authorization = False
- if settings is not None:
- debug_authorization = settings.get('debug_authorization', False)
- if debug_authorization:
- def _authdebug_view(context, request):
- view_name = getattr(request, 'view_name', None)
-
- if authn_policy and authz_policy:
- if permission is None:
- msg = 'Allowed (no permission registered)'
- else:
- principals = authn_policy.effective_principals(request)
- msg = str(authz_policy.permits(context, principals,
- permission))
- else:
- msg = 'Allowed (no authorization policy in use)'
-
- view_name = getattr(request, 'view_name', None)
- url = getattr(request, 'url', None)
- msg = ('debug_authorization of url %s (view name %r against '
- 'context %r): %s' % (url, view_name, context, msg))
- logger = self.reg.queryUtility(ILogger, 'repoze.bfg.debug')
- logger and logger.debug(msg)
- if request is not None:
- request.authdebug_message = msg
- return view(context, request)
-
- wrapped_view = _authdebug_view
- decorate_view(wrapped_view, view)
-
- return wrapped_view
-
def route(self, name, path, view=None, view_for=None,
permission=None, factory=None, for_=None,
header=None, xhr=False, accept=None, path_info=None,
@@ -869,3 +688,177 @@ def requestonly(class_or_callable, attr=None):
return False
+def _map_view(registry, view, attr=None, renderer=None, renderer_name=None):
+ wrapped_view = view
+
+ if inspect.isclass(view):
+ # If the object we've located is a class, turn it into a
+ # function that operates like a Zope view (when it's invoked,
+ # construct an instance using 'context' and 'request' as
+ # position arguments, then immediately invoke the __call__
+ # method of the instance with no arguments; __call__ should
+ # return an IResponse).
+ if requestonly(view, attr):
+ # its __init__ accepts only a single request argument,
+ # instead of both context and request
+ def _bfg_class_requestonly_view(context, request):
+ inst = view(request)
+ if attr is None:
+ response = inst()
+ else:
+ response = getattr(inst, attr)()
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, inst,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_class_requestonly_view
+ else:
+ # its __init__ accepts both context and request
+ def _bfg_class_view(context, request):
+ inst = view(context, request)
+ if attr is None:
+ response = inst()
+ else:
+ response = getattr(inst, attr)()
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, inst,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_class_view
+
+ elif requestonly(view, attr):
+ # its __call__ accepts only a single request argument,
+ # instead of both context and request
+ def _bfg_requestonly_view(context, request):
+ if attr is None:
+ response = view(request)
+ else:
+ response = getattr(view, attr)(request)
+
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, view,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_requestonly_view
+
+ elif attr:
+ def _bfg_attr_view(context, request):
+ response = getattr(view, attr)(context, request)
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, view,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_attr_view
+
+ elif renderer is not None:
+ def _rendered_view(context, request):
+ response = view(context, request)
+ response = rendered_response(renderer,
+ response, view,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _rendered_view
+
+ decorate_view(wrapped_view, view)
+ return wrapped_view
+
+def _owrap_view(registry, view, viewname, wrapper_viewname):
+ if not wrapper_viewname:
+ return view
+ def _owrapped_view(context, request):
+ response = view(context, request)
+ request.wrapped_response = response
+ request.wrapped_body = response.body
+ request.wrapped_view = view
+ wrapped_response = render_view_to_response(context, request,
+ wrapper_viewname)
+ if wrapped_response is None:
+ raise ValueError(
+ 'No wrapper view named %r found when executing view '
+ 'named %r' % (wrapper_viewname, viewname))
+ return wrapped_response
+ decorate_view(_owrapped_view, view)
+ return _owrapped_view
+
+def _predicate_wrap(registry, view, predicates):
+ if not predicates:
+ return view
+ def _wrapped(context, request):
+ if all((predicate(context, request) for predicate in predicates)):
+ return view(context, request)
+ raise NotFound('predicate mismatch for view %s' % view)
+ def checker(context, request):
+ return all((predicate(context, request) for predicate in
+ predicates))
+ _wrapped.__predicated__ = checker
+ decorate_view(_wrapped, view)
+ return _wrapped
+
+def _secure_view(registry, view, permission):
+ wrapped_view = view
+ authn_policy = registry.queryUtility(IAuthenticationPolicy)
+ authz_policy = registry.queryUtility(IAuthorizationPolicy)
+ if authn_policy and authz_policy and (permission is not None):
+ def _secured_view(context, request):
+ principals = authn_policy.effective_principals(request)
+ if authz_policy.permits(context, principals, permission):
+ return view(context, request)
+ msg = getattr(request, 'authdebug_message',
+ 'Unauthorized: %s failed permission check' % view)
+ raise Forbidden(msg)
+ _secured_view.__call_permissive__ = view
+ def _permitted(context, request):
+ principals = authn_policy.effective_principals(request)
+ return authz_policy.permits(context, principals, permission)
+ _secured_view.__permitted__ = _permitted
+ wrapped_view = _secured_view
+ decorate_view(wrapped_view, view)
+
+ return wrapped_view
+
+def _authdebug_view(registry, view, permission):
+ wrapped_view = view
+ authn_policy = registry.queryUtility(IAuthenticationPolicy)
+ authz_policy = registry.queryUtility(IAuthorizationPolicy)
+ settings = registry.queryUtility(ISettings)
+ debug_authorization = False
+ if settings is not None:
+ debug_authorization = settings.get('debug_authorization', False)
+ if debug_authorization:
+ def _authdebug_view(context, request):
+ view_name = getattr(request, 'view_name', None)
+
+ if authn_policy and authz_policy:
+ if permission is None:
+ msg = 'Allowed (no permission registered)'
+ else:
+ principals = authn_policy.effective_principals(request)
+ msg = str(authz_policy.permits(context, principals,
+ permission))
+ else:
+ msg = 'Allowed (no authorization policy in use)'
+
+ view_name = getattr(request, 'view_name', None)
+ url = getattr(request, 'url', None)
+ msg = ('debug_authorization of url %s (view name %r against '
+ 'context %r): %s' % (url, view_name, context, msg))
+ logger =registry.queryUtility(ILogger, 'repoze.bfg.debug')
+ logger and logger.debug(msg)
+ if request is not None:
+ request.authdebug_message = msg
+ return view(context, request)
+
+ wrapped_view = _authdebug_view
+ decorate_view(wrapped_view, view)
+
+ return wrapped_view
+