summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2009-11-17 21:59:54 +0000
committerChris McDonough <chrism@agendaless.com>2009-11-17 21:59:54 +0000
commitcbfafba1514ce2ce2b87aadb0093c06210219372 (patch)
tree41e83cf95cf4cf8920396e644a219518689e8b89
parenta937e7d039f2c9e62e1a2771b2e6b23412ab709a (diff)
downloadpyramid-cbfafba1514ce2ce2b87aadb0093c06210219372.tar.gz
pyramid-cbfafba1514ce2ce2b87aadb0093c06210219372.tar.bz2
pyramid-cbfafba1514ce2ce2b87aadb0093c06210219372.zip
Move configuration methods into Configurator.
-rw-r--r--CHANGES.txt15
-rw-r--r--repoze/bfg/configuration.py787
-rw-r--r--repoze/bfg/registry.py660
-rw-r--r--repoze/bfg/renderers.py6
-rw-r--r--repoze/bfg/router.py42
-rw-r--r--repoze/bfg/testing.py16
-rw-r--r--repoze/bfg/tests/test_configuration.py634
-rw-r--r--repoze/bfg/tests/test_integration.py1
-rw-r--r--repoze/bfg/tests/test_registry.py456
-rw-r--r--repoze/bfg/tests/test_router.py139
-rw-r--r--repoze/bfg/tests/test_threadlocal.py21
-rw-r--r--repoze/bfg/tests/test_urldispatch.py191
-rw-r--r--repoze/bfg/tests/test_zcml.py26
-rw-r--r--repoze/bfg/threadlocal.py4
-rw-r--r--repoze/bfg/urldispatch.py52
-rw-r--r--repoze/bfg/view.py83
-rw-r--r--repoze/bfg/zcml.py80
17 files changed, 1565 insertions, 1648 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 39cb06af5..dfb9636fb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,18 @@
+Next release
+============
+
+Internals
+---------
+
+The ``repoze.bfg.registry.make_registry`` callable has been removed.
+
+Backwards Incompatibilites
+--------------------------
+
+The ``repoze.bfg.router.make_app`` callable no longer accepts the
+``authentication_policy`` nor the ``authorization_policy`` arguments.
+This feature was deprecated in version 1.0 and has been removed.
+
1.1 (2009-11-15)
================
diff --git a/repoze/bfg/configuration.py b/repoze/bfg/configuration.py
index 2b74017dc..495445ff6 100644
--- a/repoze/bfg/configuration.py
+++ b/repoze/bfg/configuration.py
@@ -1,130 +1,67 @@
import os
+import re
import sys
import threading
+import inspect
import zope.component
from zope.configuration import xmlconfig
+from zope.configuration.exceptions import ConfigurationError
from zope.configuration.config import ConfigurationMachine
from zope.component import getGlobalSiteManager
from zope.component import getSiteManager
+from zope.interface import Interface
+from zope.interface import implementedBy
+from zope.interface.interfaces import IInterface
+
from repoze.bfg.interfaces import IAuthenticationPolicy
from repoze.bfg.interfaces import IAuthorizationPolicy
from repoze.bfg.interfaces import IDefaultRootFactory
+from repoze.bfg.interfaces import IForbiddenView
from repoze.bfg.interfaces import ILogger
+from repoze.bfg.interfaces import IMultiView
+from repoze.bfg.interfaces import INotFoundView
+from repoze.bfg.interfaces import IPackageOverrides
+from repoze.bfg.interfaces import IRendererFactory
+from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IRootFactory
+from repoze.bfg.interfaces import IRouteRequest
from repoze.bfg.interfaces import IRoutesMapper
from repoze.bfg.interfaces import ISettings
+from repoze.bfg.interfaces import ISecuredView
+from repoze.bfg.interfaces import ITemplateRendererFactory
+from repoze.bfg.interfaces import IView
+from repoze.bfg.interfaces import IViewPermission
-from repoze.bfg.authorization import ACLAuthorizationPolicy
+from repoze.bfg import chameleon_zpt
+from repoze.bfg import chameleon_text
+from repoze.bfg import renderers
+from repoze.bfg.compat import all
+from repoze.bfg.exceptions import Forbidden
+from repoze.bfg.exceptions import NotFound
from repoze.bfg.log import make_stream_logger
-from repoze.bfg.registry import Registry
-from repoze.bfg.registry import DefaultRootFactory
+from repoze.bfg.request import route_request_iface
+from repoze.bfg.resource import PackageOverrides
from repoze.bfg.settings import Settings
from repoze.bfg.settings import get_options
+from repoze.bfg.settings import get_settings
+from repoze.bfg.static import StaticRootFactory
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.threadlocal import manager
-from repoze.bfg.urldispatch import RoutesRootFactory
-
-def make_registry(root_factory, package=None, filename='configure.zcml',
- authentication_policy=None, authorization_policy=None,
- options=None, registry=None, debug_logger=None,
- manager=manager, os=os, lock=threading.Lock()):
- # registry, debug_logger, manager, os and lock *only* for unittests
- if options is None:
- options = {}
-
- if not 'configure_zcml' in options:
- options['configure_zcml'] = filename
+from repoze.bfg.traversal import find_interface
+from repoze.bfg.urldispatch import RoutesMapper
+from repoze.bfg.view import MultiView
+from repoze.bfg.view import decorate_view
+from repoze.bfg.view import rendered_response
+from repoze.bfg.view import render_view_to_response
+from repoze.bfg.view import requestonly
+from repoze.bfg.view import static as static_view
- settings = Settings(get_options(options))
- filename = settings['configure_zcml']
-
- # not os.path.isabs below for windows systems
- if (':' in filename) and (not os.path.isabs(filename)):
- package, filename = filename.split(':', 1)
- __import__(package)
- package = sys.modules[package]
+import martian
- if registry is None:
- regname = filename
- if package:
- regname = package.__name__
- registry = Registry(regname)
-
- registry.registerUtility(settings, ISettings)
-
- if debug_logger is None:
- debug_logger = make_stream_logger('repoze.bfg.debug', sys.stderr)
- registry.registerUtility(debug_logger, ILogger, 'repoze.bfg.debug')
-
- if root_factory is None:
- root_factory = DefaultRootFactory
-
- mapper = RoutesRootFactory(root_factory)
- registry.registerUtility(mapper, IRoutesMapper)
- # register the *default* root factory so apps can find it later
- registry.registerUtility(root_factory, IDefaultRootFactory)
-
- if authentication_policy:
- debug_logger.warn(
- 'The "authentication_policy" and "authorization_policy" '
- 'arguments to repoze.bfg.router.make_app have been deprecated '
- 'in repoze.bfg version 1.0. Instead of using these arguments to '
- 'configure an authorization/authentication policy pair, use '
- 'a pair of ZCML directives (such as "authtktauthenticationpolicy" '
- 'and "aclauthorizationpolicy" documented within the Security '
- 'chapter in the BFG documentation. If you need to use a custom '
- 'authentication or authorization policy, you should make a ZCML '
- 'directive for it and use that directive within your '
- 'application\'s ZCML')
- registry.registerUtility(authentication_policy, IAuthenticationPolicy)
- if authorization_policy is None:
- authorization_policy = ACLAuthorizationPolicy()
- registry.registerUtility(authorization_policy, IAuthorizationPolicy)
-
- # We push our ZCML-defined configuration into an app-local
- # component registry in order to allow more than one bfg app to live
- # in the same process space without one unnecessarily stomping on
- # the other's component registrations (although I suspect directives
- # that have side effects are going to fail). The only way to do
- # that currently is to override zope.component.getGlobalSiteManager
- # for the duration of the ZCML includes. We acquire a lock in case
- # another make_app runs in a different thread simultaneously, in a
- # vain attempt to prevent mixing of registrations. There's not much
- # we can do about non-makeRegistry code that tries to use the global
- # site manager API directly in a different thread while we hold the
- # lock. Those registrations will end up in our application's
- # registry.
-
- lock.acquire()
- manager.push({'registry':registry, 'request':None})
- try:
- getSiteManager.sethook(get_current_registry)
- zope.component.getGlobalSiteManager = get_current_registry
- zcml_configure(filename, package)
- finally:
- # intentional: do not call getSiteManager.reset(); executing
- # this function means we're taking over getSiteManager for the
- # lifetime of this process
- zope.component.getGlobalSiteManager = getGlobalSiteManager
- lock.release()
- manager.pop()
-
- mapper = registry.getUtility(IRoutesMapper)
-
- if mapper.has_routes():
- # if the user had any <route/> statements in his configuration,
- # use the RoutesRootFactory as the IRootFactory; otherwise use the
- # default root factory (optimization; we don't want to go through
- # the Routes logic if we know there are no routes to match)
- root_factory = mapper
-
- registry.registerUtility(root_factory, IRootFactory)
-
- return registry
def zcml_configure(name, package):
""" Given a ZCML filename as ``name`` and a Python package as
@@ -140,3 +77,653 @@ def zcml_configure(name, package):
context.execute_actions(clear=False)
return context.actions
+class Configurator(object):
+ """ A wrapper around the registry that performs configuration tasks """
+ def __init__(self, registry):
+ self.reg = registry
+
+ def default_configuration(self, root_factory, package=None,
+ filename='configure.zcml', settings=None,
+ debug_logger=None, manager=manager, os=os,
+ lock=threading.Lock()):
+
+ # registry, debug_logger, manager, os and lock *only* for unittests
+ if settings is None:
+ settings = {}
+
+ if not 'configure_zcml' in settings:
+ settings['configure_zcml'] = filename
+
+ settings = Settings(get_options(settings))
+ filename = settings['configure_zcml']
+
+ # not os.path.isabs below for windows systems
+ if (':' in filename) and (not os.path.isabs(filename)):
+ package, filename = filename.split(':', 1)
+ __import__(package)
+ package = sys.modules[package]
+
+ self.settings(settings)
+ self.debug_logger(debug_logger)
+ self.root_factory(root_factory or DefaultRootFactory)
+ self.renderer(chameleon_zpt.renderer_factory, '.pt')
+ self.renderer(chameleon_text.renderer_factory, '.txt')
+ self.renderer(renderers.json_renderer_factory, 'json')
+ self.renderer(renderers.string_renderer_factory, 'string')
+
+ # We push our ZCML-defined configuration into an app-local
+ # component registry in order to allow more than one bfg app to live
+ # in the same process space without one unnecessarily stomping on
+ # the other's component registrations (although I suspect directives
+ # that have side effects are going to fail). The only way to do
+ # that currently is to override zope.component.getGlobalSiteManager
+ # for the duration of the ZCML includes. We acquire a lock in case
+ # another make_app runs in a different thread simultaneously, in a
+ # vain attempt to prevent mixing of registrations. There's not much
+ # we can do about non-makeRegistry code that tries to use the global
+ # site manager API directly in a different thread while we hold the
+ # lock. Those registrations will end up in our application's
+ # registry.
+
+ lock.acquire()
+ manager.push({'registry':self.reg, 'request':None})
+ try:
+ getSiteManager.sethook(get_current_registry)
+ zope.component.getGlobalSiteManager = get_current_registry
+ zcml_configure(filename, package)
+ finally:
+ # intentional: do not call getSiteManager.reset(); executing
+ # this function means we're taking over getSiteManager for the
+ # lifetime of this process
+ zope.component.getGlobalSiteManager = getGlobalSiteManager
+ lock.release()
+ manager.pop()
+
+ def view(self, permission=None, for_=None, view=None, name="",
+ request_type=None, route_name=None, request_method=None,
+ request_param=None, containment=None, attr=None,
+ renderer=None, wrapper=None, xhr=False, accept=None,
+ header=None, path_info=None, _info=u''):
+
+ if not view:
+ if renderer:
+ def view(context, request):
+ return {}
+ else:
+ raise ConfigurationError('"view" was not specified and '
+ 'no "renderer" specified')
+
+ if request_type in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE'):
+ # b/w compat for 1.0
+ request_method = request_type
+ request_type = None
+
+ if request_type is None:
+ if route_name is None:
+ request_type = IRequest
+ else:
+ request_type = self.reg.queryUtility(IRouteRequest,
+ name=route_name)
+ if request_type is None:
+ request_type = route_request_iface(route_name)
+ self.reg.registerUtility(request_type, IRouteRequest,
+ name=route_name)
+
+ score, predicates = _make_predicates(
+ xhr=xhr, request_method=request_method, path_info=path_info,
+ request_param=request_param, header=header, accept=accept,
+ containment=containment)
+
+ derived_view = self.derive_view(view, permission, predicates, attr,
+ renderer, wrapper, name)
+ r_for_ = for_
+ r_request_type = request_type
+ if r_for_ is None:
+ r_for_ = Interface
+ if not IInterface.providedBy(r_for_):
+ r_for_ = implementedBy(r_for_)
+ if not IInterface.providedBy(r_request_type):
+ r_request_type = implementedBy(r_request_type)
+ old_view = self.reg.adapters.lookup((r_for_, r_request_type),
+ IView,name=name)
+ if old_view is None:
+ if hasattr(derived_view, '__call_permissive__'):
+ self.reg.registerAdapter(derived_view, (for_, request_type),
+ ISecuredView, name, info=_info)
+ if hasattr(derived_view, '__permitted__'):
+ # bw compat
+ self.reg.registerAdapter(
+ derived_view.__permitted__,
+ (for_, request_type), IViewPermission,
+ name, info=_info)
+ else:
+ self.reg.registerAdapter(derived_view, (for_, request_type),
+ IView, name, info=_info)
+ else:
+ # XXX we could try to be more efficient here and register
+ # a non-secured view for a multiview if none of the
+ # multiview's consituent views have a permission
+ # associated with them, but this code is getting pretty
+ # rough already
+ if IMultiView.providedBy(old_view):
+ multiview = old_view
+ else:
+ multiview = MultiView(name)
+ multiview.add(old_view, sys.maxint)
+ multiview.add(derived_view, score)
+ for i in (IView, ISecuredView):
+ # unregister any existing views
+ self.reg.adapters.unregister((r_for_, r_request_type), i,
+ name=name)
+ self.reg.registerAdapter(multiview, (for_, request_type),
+ IMultiView, name, info=_info)
+ # b/w compat
+ self.reg.registerAdapter(multiview.__permitted__,
+ (for_, request_type), IViewPermission,
+ name, info=_info)
+
+ def map_view(self, view, attr=None, renderer_name=None):
+ wrapped_view = view
+
+ renderer = None
+
+ if renderer_name is None:
+ # global default renderer
+ factory = self.reg.queryUtility(IRendererFactory)
+ if factory is not None:
+ renderer_name = ''
+ renderer = factory(renderer_name)
+ else:
+ renderer = self.renderer_from_name(renderer_name)
+
+ if inspect.isclass(view):
+ # If the object we've located is a class, turn it into a
+ # function that operates like a Zope view (when it's invoked,
+ # construct an instance using 'context' and 'request' as
+ # position arguments, then immediately invoke the __call__
+ # method of the instance with no arguments; __call__ should
+ # return an IResponse).
+ if requestonly(view, attr):
+ # its __init__ accepts only a single request argument,
+ # instead of both context and request
+ def _bfg_class_requestonly_view(context, request):
+ inst = view(request)
+ if attr is None:
+ response = inst()
+ else:
+ response = getattr(inst, attr)()
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, inst,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_class_requestonly_view
+ else:
+ # its __init__ accepts both context and request
+ def _bfg_class_view(context, request):
+ inst = view(context, request)
+ if attr is None:
+ response = inst()
+ else:
+ response = getattr(inst, attr)()
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, inst,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_class_view
+
+ elif requestonly(view, attr):
+ # its __call__ accepts only a single request argument,
+ # instead of both context and request
+ def _bfg_requestonly_view(context, request):
+ if attr is None:
+ response = view(request)
+ else:
+ response = getattr(view, attr)(request)
+
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, view,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_requestonly_view
+
+ elif attr:
+ def _bfg_attr_view(context, request):
+ response = getattr(view, attr)(context, request)
+ if renderer is not None:
+ response = rendered_response(renderer,
+ response, view,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _bfg_attr_view
+
+ elif renderer is not None:
+ def _rendered_view(context, request):
+ response = view(context, request)
+ response = rendered_response(renderer,
+ response, view,
+ context, request,
+ renderer_name)
+ return response
+ wrapped_view = _rendered_view
+
+ decorate_view(wrapped_view, view)
+ return wrapped_view
+
+ def renderer_from_name(self, path):
+ name = os.path.splitext(path)[1]
+ if not name:
+ name = path
+ factory = self.reg.queryUtility(IRendererFactory, name=name)
+ if factory is None:
+ raise ValueError('No renderer for renderer name %r' % name)
+ return factory(path)
+
+ def derive_view(self, original_view, permission=None, predicates=(),
+ attr=None, renderer_name=None, wrapper_viewname=None,
+ viewname=None):
+ mapped_view = self.map_view(original_view, attr, renderer_name)
+ owrapped_view = self.owrap_view(mapped_view, viewname, wrapper_viewname)
+ secured_view = self.secure_view(owrapped_view, permission)
+ debug_view = self.authdebug_view(secured_view, permission)
+ derived_view = self.predicate_wrap(debug_view, predicates)
+ return derived_view
+
+ def owrap_view(self, view, viewname, wrapper_viewname):
+ if not wrapper_viewname:
+ return view
+ def _owrapped_view(context, request):
+ response = view(context, request)
+ request.wrapped_response = response
+ request.wrapped_body = response.body
+ request.wrapped_view = view
+ wrapped_response = render_view_to_response(context, request,
+ wrapper_viewname)
+ if wrapped_response is None:
+ raise ValueError(
+ 'No wrapper view named %r found when executing view '
+ 'named %r' % (wrapper_viewname, viewname))
+ return wrapped_response
+ decorate_view(_owrapped_view, view)
+ return _owrapped_view
+
+ def predicate_wrap(self, view, predicates):
+ if not predicates:
+ return view
+ def _wrapped(context, request):
+ if all((predicate(context, request) for predicate in predicates)):
+ return view(context, request)
+ raise NotFound('predicate mismatch for view %s' % view)
+ def checker(context, request):
+ return all((predicate(context, request) for predicate in
+ predicates))
+ _wrapped.__predicated__ = checker
+ decorate_view(_wrapped, view)
+ return _wrapped
+
+ def secure_view(self, view, permission):
+ wrapped_view = view
+ authn_policy = self.reg.queryUtility(IAuthenticationPolicy)
+ authz_policy = self.reg.queryUtility(IAuthorizationPolicy)
+ if authn_policy and authz_policy and (permission is not None):
+ def _secured_view(context, request):
+ principals = authn_policy.effective_principals(request)
+ if authz_policy.permits(context, principals, permission):
+ return view(context, request)
+ msg = getattr(request, 'authdebug_message',
+ 'Unauthorized: %s failed permission check' % view)
+ raise Forbidden(msg)
+ _secured_view.__call_permissive__ = view
+ def _permitted(context, request):
+ principals = authn_policy.effective_principals(request)
+ return authz_policy.permits(context, principals, permission)
+ _secured_view.__permitted__ = _permitted
+ wrapped_view = _secured_view
+ decorate_view(wrapped_view, view)
+
+ return wrapped_view
+
+ def authdebug_view(self, view, permission):
+ wrapped_view = view
+ authn_policy = self.reg.queryUtility(IAuthenticationPolicy)
+ authz_policy = self.reg.queryUtility(IAuthorizationPolicy)
+ settings = get_settings()
+ debug_authorization = False
+ if settings is not None:
+ debug_authorization = settings.get('debug_authorization', False)
+ if debug_authorization:
+ def _authdebug_view(context, request):
+ view_name = getattr(request, 'view_name', None)
+
+ if authn_policy and authz_policy:
+ if permission is None:
+ msg = 'Allowed (no permission registered)'
+ else:
+ principals = authn_policy.effective_principals(request)
+ msg = str(authz_policy.permits(context, principals,
+ permission))
+ else:
+ msg = 'Allowed (no authorization policy in use)'
+
+ view_name = getattr(request, 'view_name', None)
+ url = getattr(request, 'url', None)
+ msg = ('debug_authorization of url %s (view name %r against '
+ 'context %r): %s' % (url, view_name, context, msg))
+ logger = self.reg.queryUtility(ILogger, 'repoze.bfg.debug')
+ logger and logger.debug(msg)
+ if request is not None:
+ request.authdebug_message = msg
+ return view(context, request)
+
+ wrapped_view = _authdebug_view
+ decorate_view(wrapped_view, view)
+
+ return wrapped_view
+
+ def route(self, name, path, view=None, view_for=None,
+ permission=None, factory=None, request_type=None, for_=None,
+ header=None, xhr=False, accept=None, path_info=None,
+ request_method=None, request_param=None,
+ view_permission=None, view_request_type=None,
+ view_request_method=None, view_request_param=None,
+ view_containment=None, view_attr=None,
+ renderer=None, view_renderer=None, view_header=None,
+ view_accept=None, view_xhr=False,
+ view_path_info=None, _info=u''):
+ # the strange ordering of the request kw args above is for b/w
+ # compatibility purposes.
+ # these are route predicates; if they do not match, the next route
+ # in the routelist will be tried
+ _, predicates = _make_predicates(xhr=xhr,
+ request_method=request_method,
+ path_info=path_info,
+ request_param=request_param,
+ header=header,
+ accept=accept)
+
+ if request_type in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE'):
+ # b/w compat for 1.0
+ view_request_method = request_type
+ request_type = None
+
+ request_iface = self.reg.queryUtility(IRouteRequest, name=name)
+ if request_iface is None:
+ request_iface = route_request_iface(name)
+ self.reg.registerUtility(request_iface, IRouteRequest, name=name)
+
+ if view:
+ view_for = view_for or for_
+ view_request_type = view_request_type or request_type
+ view_permission = view_permission or permission
+ view_renderer = view_renderer or renderer
+ self.view(
+ permission=view_permission,
+ for_=view_for,
+ view=view,
+ name='',
+ request_type=view_request_type,
+ route_name=name,
+ request_method=view_request_method,
+ request_param=view_request_param,
+ containment=view_containment,
+ attr=view_attr,
+ renderer=view_renderer,
+ header=view_header,
+ accept=view_accept,
+ xhr=view_xhr,
+ path_info=view_path_info,
+ info=_info,
+ )
+
+ mapper = self.reg.queryUtility(IRoutesMapper)
+ if mapper is None:
+ mapper = RoutesMapper()
+ self.reg.registerUtility(mapper, IRoutesMapper)
+ mapper.connect(path, name, factory, predicates=predicates)
+
+ def scan(self, package, _info=u'', martian=martian):
+ # martian overrideable only for unit tests
+ multi_grokker = BFGMultiGrokker()
+ multi_grokker.register(BFGViewGrokker())
+ module_grokker = martian.ModuleGrokker(grokker=multi_grokker)
+ martian.grok_dotted_name(
+ package.__name__, grokker=module_grokker,
+ _info=_info, _configurator=self,
+ exclude_filter=lambda name: name.startswith('.'))
+
+ def authentication_policy(self, policy, _info=u''):
+ self.reg.registerUtility(policy, IAuthenticationPolicy, info=_info)
+
+ def authorization_policy(self, policy, _info=u''):
+ self.reg.registerUtility(policy, IAuthorizationPolicy, info=_info)
+
+ def renderer(self, factory, name, _info=u''):
+ iface = IRendererFactory
+ if name.startswith('.'):
+ iface = ITemplateRendererFactory
+ self.reg.registerUtility(factory, iface, name=name, info=_info)
+
+ def resource(self, to_override, override_with, _override=None,
+ _info=u''):
+ if to_override == override_with:
+ raise ConfigurationError('You cannot override a resource with '
+ 'itself')
+
+ package = to_override
+ path = ''
+ if ':' in to_override:
+ package, path = to_override.split(':', 1)
+
+ override_package = override_with
+ override_prefix = ''
+ if ':' in override_with:
+ override_package, override_prefix = override_with.split(':', 1)
+
+ if path and path.endswith('/'):
+ if override_prefix and (not override_prefix.endswith('/')):
+ raise ConfigurationError(
+ 'A directory cannot be overridden with a file (put a slash '
+ 'at the end of override_with if necessary)')
+
+ if override_prefix and override_prefix.endswith('/'):
+ if path and (not path.endswith('/')):
+ raise ConfigurationError(
+ 'A file cannot be overridden with a directory (put a slash '
+ 'at the end of to_override if necessary)')
+
+ __import__(package)
+ __import__(override_package)
+ package = sys.modules[package]
+ override_package = sys.modules[override_package]
+
+ if _override is not None:
+ _override(package, path, override_package, override_prefix)
+ else:
+ self._override(package, path, override_package, override_prefix)
+
+ def _override(self, package, path, override_package, override_prefix,
+ _info=u'', PackageOverrides=PackageOverrides):
+ pkg_name = package.__name__
+ override_pkg_name = override_package.__name__
+ override = self.reg.queryUtility(IPackageOverrides, name=pkg_name)
+ if override is None:
+ override = PackageOverrides(package)
+ self.reg.registerUtility(override, IPackageOverrides,
+ name=pkg_name, info=_info)
+ override.insert(path, override_pkg_name, override_prefix)
+
+
+ def notfound(self, view=None, attr=None, renderer=None, wrapper=None,
+ _info=u''):
+ self.view_utility(view, attr, renderer, wrapper, INotFoundView,
+ _info=_info)
+
+ def forbidden(self, view=None, attr=None, renderer=None, wrapper=None,
+ _info=u''):
+ self.view_utility(view, attr, renderer, wrapper,
+ IForbiddenView, _info=_info)
+
+ def view_utility(self, view, attr, renderer, wrapper, iface, _info=u''):
+ if not view:
+ if renderer:
+ def view(context, request):
+ return {}
+ else:
+ raise ConfigurationError('"view" attribute was not specified and '
+ 'no renderer specified')
+
+ derived_view = self.derive_view(view, attr=attr, renderer_name=renderer,
+ wrapper_viewname=wrapper)
+ self.reg.registerUtility(derived_view, iface, '', info=_info)
+
+ def static(self, name, path, cache_max_age=3600, _info=u''):
+ view = static_view(path, cache_max_age=cache_max_age)
+ self.route(name, "%s*subpath" % name, view=view,
+ view_for=StaticRootFactory, factory=StaticRootFactory(path),
+ _info=_info)
+
+ def settings(self, settings):
+ self.reg.registerUtility(settings, ISettings)
+
+ def debug_logger(self, logger):
+ if logger is None:
+ logger = make_stream_logger('repoze.bfg.debug', sys.stderr)
+ self.reg.registerUtility(logger, ILogger, 'repoze.bfg.debug')
+
+ def root_factory(self, factory):
+ self.reg.registerUtility(factory, IRootFactory)
+ self.reg.registerUtility(factory, IDefaultRootFactory) # b/c
+
+def _make_predicates(xhr=None, request_method=None, path_info=None,
+ request_param=None, header=None, accept=None,
+ containment=None):
+ # Predicates are added to the predicate list in (presumed)
+ # computation expense order. All predicates associated with a
+ # view must evaluate true for the view to "match" a request.
+ # Elsewhere in the code, we evaluate them using a generator
+ # expression. The fastest predicate should be evaluated first,
+ # then the next fastest, and so on, as if one returns false, the
+ # remainder of the predicates won't need to be evaluated.
+
+ # Each predicate is associated with a weight value. The weight
+ # symbolizes the relative potential "importance" of the predicate
+ # to all other predicates. A larger weight indicates greater
+ # importance. These weights are subtracted from an aggregate
+ # 'weight' variable. The aggregate weight is then divided by the
+ # length of the predicate list to compute a "score" for this view.
+ # The score represents the ordering in which a "multiview" ( a
+ # collection of views that share the same context/request/name
+ # triad but differ in other ways via predicates) will attempt to
+ # call its set of views. Views with lower scores will be tried
+ # first. The intent is to a) ensure that views with more
+ # predicates are always evaluated before views with fewer
+ # predicates and b) to ensure a stable call ordering of views that
+ # share the same number of predicates.
+
+ # Views which do not have any predicates get a score of
+ # sys.maxint, meaning that they will be tried very last.
+
+ predicates = []
+ weight = sys.maxint
+
+ if xhr:
+ def xhr_predicate(context, request):
+ return request.is_xhr
+ weight = weight - 10
+ predicates.append(xhr_predicate)
+
+ if request_method is not None:
+ def request_method_predicate(context, request):
+ return request.method == request_method
+ weight = weight - 20
+ predicates.append(request_method_predicate)
+
+ if path_info is not None:
+ try:
+ path_info_val = re.compile(path_info)
+ except re.error, why:
+ raise ConfigurationError(why[0])
+ def path_info_predicate(context, request):
+ return path_info_val.match(request.path_info) is not None
+ weight = weight - 30
+ predicates.append(path_info_predicate)
+
+ if request_param is not None:
+ request_param_val = None
+ if '=' in request_param:
+ request_param, request_param_val = request_param.split('=', 1)
+ def request_param_predicate(context, request):
+ if request_param_val is None:
+ return request_param in request.params
+ return request.params.get(request_param) == request_param_val
+ weight = weight - 40
+ predicates.append(request_param_predicate)
+
+ if header is not None:
+ header_name = header
+ header_val = None
+ if ':' in header:
+ header_name, header_val = header.split(':', 1)
+ try:
+ header_val = re.compile(header_val)
+ except re.error, why:
+ raise ConfigurationError(why[0])
+ def header_predicate(context, request):
+ if header_val is None:
+ return header_name in request.headers
+ val = request.headers.get(header_name)
+ return header_val.match(val) is not None
+ weight = weight - 50
+ predicates.append(header_predicate)
+
+ if accept is not None:
+ def accept_predicate(context, request):
+ return accept in request.accept
+ weight = weight - 60
+ predicates.append(accept_predicate)
+
+ if containment is not None:
+ def containment_predicate(context, request):
+ return find_interface(context, containment) is not None
+ weight = weight - 70
+ predicates.append(containment_predicate)
+
+ # this will be == sys.maxint if no predicates
+ score = weight / (len(predicates) + 1)
+ return score, predicates
+
+class BFGViewMarker(object):
+ pass
+
+class BFGMultiGrokker(martian.core.MultiInstanceOrClassGrokkerBase):
+ def get_bases(self, obj):
+ if hasattr(obj, '__bfg_view_settings__'):
+ return [BFGViewMarker]
+ return []
+
+class BFGViewGrokker(martian.InstanceGrokker):
+ martian.component(BFGViewMarker)
+ def grok(self, name, obj, **kw):
+ config = getattr(obj, '__bfg_view_settings__', [])
+ for settings in config:
+ config = kw['_configurator']
+ info = kw.get('_info', u'')
+ config.view(view=obj, _info=info, **settings)
+ return bool(config)
+
+class DefaultRootFactory:
+ __parent__ = None
+ __name__ = None
+ def __init__(self, request):
+ matchdict = getattr(request, 'matchdict', {})
+ # provide backwards compatibility for applications which
+ # used routes (at least apps without any custom "context
+ # factory") in BFG 0.9.X and before
+ self.__dict__.update(matchdict)
+
diff --git a/repoze/bfg/registry.py b/repoze/bfg/registry.py
index 8fd1047e1..68de05c34 100644
--- a/repoze/bfg/registry.py
+++ b/repoze/bfg/registry.py
@@ -1,70 +1,11 @@
-import os
-import re
-import sys
-import inspect
-
-from webob import Response
-
from zope.component.registry import Components
-from zope.configuration.exceptions import ConfigurationError
-
-from zope.interface import Interface
-from zope.interface import implementedBy
-from zope.interface.interfaces import IInterface
-
-import martian
-
-from repoze.bfg.interfaces import IAuthenticationPolicy
-from repoze.bfg.interfaces import IAuthorizationPolicy
-from repoze.bfg.interfaces import IForbiddenView
-from repoze.bfg.interfaces import IMultiView
-from repoze.bfg.interfaces import INotFoundView
-from repoze.bfg.interfaces import IPackageOverrides
-from repoze.bfg.interfaces import IRendererFactory
-from repoze.bfg.interfaces import IRequest
-from repoze.bfg.interfaces import IResponseFactory
-from repoze.bfg.interfaces import IRouteRequest
-from repoze.bfg.interfaces import IRoutesMapper
-from repoze.bfg.interfaces import ISecuredView
-from repoze.bfg.interfaces import ITemplateRendererFactory
-from repoze.bfg.interfaces import IView
-from repoze.bfg.interfaces import IViewPermission
-from repoze.bfg.interfaces import ILogger
-
-from repoze.bfg import chameleon_zpt
-from repoze.bfg import chameleon_text
-from repoze.bfg import renderers
-from repoze.bfg.compat import all
-from repoze.bfg.exceptions import NotFound
-from repoze.bfg.exceptions import Forbidden
-from repoze.bfg.request import route_request_iface
-from repoze.bfg.resource import PackageOverrides
-from repoze.bfg.settings import get_settings
-from repoze.bfg.static import StaticRootFactory
-from repoze.bfg.traversal import find_interface
-from repoze.bfg.view import static as static_view
-from repoze.bfg.view import render_view_to_response
-from repoze.bfg.view import requestonly
-from repoze.bfg.view import decorate_view
-from repoze.bfg.view import MultiView
-from repoze.bfg.urldispatch import RoutesRootFactory
-
class Registry(Components, dict):
# for optimization purposes, if no listeners are listening, don't try
# to notify them
has_listeners = False
- def __init__(self, name='', bases=()):
- Components.__init__(self, name=name, bases=bases)
- mapper = RoutesRootFactory(DefaultRootFactory)
- self.registerUtility(mapper, IRoutesMapper)
- self.renderer(chameleon_zpt.renderer_factory, '.pt')
- self.renderer(chameleon_text.renderer_factory, '.txt')
- self.renderer(renderers.json_renderer_factory, 'json')
- self.renderer(renderers.string_renderer_factory, 'string')
-
def registerSubscriptionAdapter(self, *arg, **kw):
result = Components.registerSubscriptionAdapter(self, *arg, **kw)
self.has_listeners = True
@@ -80,604 +21,3 @@ class Registry(Components, dict):
# iterating over subscribers assures they get executed
[ _ for _ in self.subscribers(events, None) ]
- def view(self, permission=None, for_=None, view=None, name="",
- request_type=None, route_name=None, request_method=None,
- request_param=None, containment=None, attr=None,
- renderer=None, wrapper=None, xhr=False, accept=None,
- header=None, path_info=None, _info=u''):
-
- if not view:
- if renderer:
- def view(context, request):
- return {}
- else:
- raise ConfigurationError('"view" was not specified and '
- 'no "renderer" specified')
-
- if request_type in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE'):
- # b/w compat for 1.0
- request_method = request_type
- request_type = None
-
- if request_type is None:
- if route_name is None:
- request_type = IRequest
- else:
- request_type = self.queryUtility(IRouteRequest, name=route_name)
- if request_type is None:
- request_type = route_request_iface(route_name)
- self.registerUtility(request_type, IRouteRequest,
- name=route_name)
-
- score, predicates = _make_predicates(
- xhr=xhr, request_method=request_method, path_info=path_info,
- request_param=request_param, header=header, accept=accept,
- containment=containment)
-
- derived_view = self.derive_view(view, permission, predicates, attr,
- renderer, wrapper, name)
- r_for_ = for_
- r_request_type = request_type
- if r_for_ is None:
- r_for_ = Interface
- if not IInterface.providedBy(r_for_):
- r_for_ = implementedBy(r_for_)
- if not IInterface.providedBy(r_request_type):
- r_request_type = implementedBy(r_request_type)
- old_view = self.adapters.lookup((r_for_, r_request_type),
- IView,name=name)
- if old_view is None:
- if hasattr(derived_view, '__call_permissive__'):
- self.registerAdapter(derived_view, (for_, request_type),
- ISecuredView, name, info=_info)
- if hasattr(derived_view, '__permitted__'):
- # bw compat
- self.registerAdapter(
- derived_view.__permitted__,
- (for_, request_type), IViewPermission,
- name, info=_info)
- else:
- self.registerAdapter(derived_view, (for_, request_type),
- IView, name, info=_info)
- else:
- # XXX we could try to be more efficient here and register
- # a non-secured view for a multiview if none of the
- # multiview's consituent views have a permission
- # associated with them, but this code is getting pretty
- # rough already
- if IMultiView.providedBy(old_view):
- multiview = old_view
- else:
- multiview = MultiView(name)
- multiview.add(old_view, sys.maxint)
- multiview.add(derived_view, score)
- for i in (IView, ISecuredView):
- # unregister any existing views
- self.adapters.unregister((r_for_, r_request_type), i,
- name=name)
- self.registerAdapter(multiview, (for_, request_type),
- IMultiView, name, info=_info)
- # b/w compat
- self.registerAdapter(multiview.__permitted__,
- (for_, request_type), IViewPermission,
- name, info=_info)
-
- def map_view(self, view, attr=None, renderer_name=None):
- wrapped_view = view
-
- renderer = None
-
- if renderer_name is None:
- # global default renderer
- factory = self.queryUtility(IRendererFactory)
- if factory is not None:
- renderer_name = ''
- renderer = factory(renderer_name)
- else:
- renderer = self.renderer_from_name(renderer_name)
-
- if inspect.isclass(view):
- # If the object we've located is a class, turn it into a
- # function that operates like a Zope view (when it's invoked,
- # construct an instance using 'context' and 'request' as
- # position arguments, then immediately invoke the __call__
- # method of the instance with no arguments; __call__ should
- # return an IResponse).
- if requestonly(view, attr):
- # its __init__ accepts only a single request argument,
- # instead of both context and request
- def _bfg_class_requestonly_view(context, request):
- inst = view(request)
- if attr is None:
- response = inst()
- else:
- response = getattr(inst, attr)()
- if renderer is not None:
- response = self.rendered_response(renderer,
- response, inst,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_class_requestonly_view
- else:
- # its __init__ accepts both context and request
- def _bfg_class_view(context, request):
- inst = view(context, request)
- if attr is None:
- response = inst()
- else:
- response = getattr(inst, attr)()
- if renderer is not None:
- response = self.rendered_response(renderer,
- response, inst,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_class_view
-
- elif requestonly(view, attr):
- # its __call__ accepts only a single request argument,
- # instead of both context and request
- def _bfg_requestonly_view(context, request):
- if attr is None:
- response = view(request)
- else:
- response = getattr(view, attr)(request)
-
- if renderer is not None:
- response = self.rendered_response(renderer,
- response, view,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_requestonly_view
-
- elif attr:
- def _bfg_attr_view(context, request):
- response = getattr(view, attr)(context, request)
- if renderer is not None:
- response = self.rendered_response(renderer,
- response, view,
- context, request,
- renderer_name)
- return response
- wrapped_view = _bfg_attr_view
-
- elif renderer is not None:
- def _rendered_view(context, request):
- response = view(context, request)
- response = self.rendered_response(renderer,
- response, view,
- context, request,
- renderer_name)
- return response
- wrapped_view = _rendered_view
-
- decorate_view(wrapped_view, view)
- return wrapped_view
-
- def renderer_from_name(self, path):
- name = os.path.splitext(path)[1]
- if not name:
- name = path
- factory = self.queryUtility(IRendererFactory, name=name)
- if factory is None:
- raise ValueError('No renderer for renderer name %r' % name)
- return factory(path)
-
- def rendered_response(self, renderer, response, view, context,request,
- renderer_name):
- if ( hasattr(response, 'app_iter') and hasattr(response, 'headerlist')
- and hasattr(response, 'status') ):
- return response
- result = renderer(response, {'view':view, 'renderer_name':renderer_name,
- 'context':context, 'request':request})
- response_factory = self.queryUtility(IResponseFactory, default=Response)
- response = response_factory(result)
- attrs = request.__dict__
- content_type = attrs.get('response_content_type', None)
- if content_type is not None:
- response.content_type = content_type
- headerlist = attrs.get('response_headerlist', None)
- if headerlist is not None:
- for k, v in headerlist:
- response.headers.add(k, v)
- status = attrs.get('response_status', None)
- if status is not None:
- response.status = status
- charset = attrs.get('response_charset', None)
- if charset is not None:
- response.charset = charset
- cache_for = attrs.get('response_cache_for', None)
- if cache_for is not None:
- response.cache_expires = cache_for
- return response
-
- def derive_view(self, original_view, permission=None, predicates=(),
- attr=None, renderer_name=None, wrapper_viewname=None,
- viewname=None):
- mapped_view = self.map_view(original_view, attr, renderer_name)
- owrapped_view = self.owrap_view(mapped_view, viewname, wrapper_viewname)
- secured_view = self.secure_view(owrapped_view, permission)
- debug_view = self.authdebug_view(secured_view, permission)
- derived_view = self.predicate_wrap(debug_view, predicates)
- return derived_view
-
- def owrap_view(self, view, viewname, wrapper_viewname):
- if not wrapper_viewname:
- return view
- def _owrapped_view(context, request):
- response = view(context, request)
- request.wrapped_response = response
- request.wrapped_body = response.body
- request.wrapped_view = view
- wrapped_response = render_view_to_response(context, request,
- wrapper_viewname)
- if wrapped_response is None:
- raise ValueError(
- 'No wrapper view named %r found when executing view '
- 'named %r' % (wrapper_viewname, viewname))
- return wrapped_response
- decorate_view(_owrapped_view, view)
- return _owrapped_view
-
- def predicate_wrap(self, view, predicates):
- if not predicates:
- return view
- def _wrapped(context, request):
- if all((predicate(context, request) for predicate in predicates)):
- return view(context, request)
- raise NotFound('predicate mismatch for view %s' % view)
- def checker(context, request):
- return all((predicate(context, request) for predicate in
- predicates))
- _wrapped.__predicated__ = checker
- decorate_view(_wrapped, view)
- return _wrapped
-
- def secure_view(self, view, permission):
- wrapped_view = view
- authn_policy = self.queryUtility(IAuthenticationPolicy)
- authz_policy = self.queryUtility(IAuthorizationPolicy)
- if authn_policy and authz_policy and (permission is not None):
- def _secured_view(context, request):
- principals = authn_policy.effective_principals(request)
- if authz_policy.permits(context, principals, permission):
- return view(context, request)
- msg = getattr(request, 'authdebug_message',
- 'Unauthorized: %s failed permission check' % view)
- raise Forbidden(msg)
- _secured_view.__call_permissive__ = view
- def _permitted(context, request):
- principals = authn_policy.effective_principals(request)
- return authz_policy.permits(context, principals, permission)
- _secured_view.__permitted__ = _permitted
- wrapped_view = _secured_view
- decorate_view(wrapped_view, view)
-
- return wrapped_view
-
- def authdebug_view(self, view, permission):
- wrapped_view = view
- authn_policy = self.queryUtility(IAuthenticationPolicy)
- authz_policy = self.queryUtility(IAuthorizationPolicy)
- settings = get_settings()
- debug_authorization = False
- if settings is not None:
- debug_authorization = settings.get('debug_authorization', False)
- if debug_authorization:
- def _authdebug_view(context, request):
- view_name = getattr(request, 'view_name', None)
-
- if authn_policy and authz_policy:
- if permission is None:
- msg = 'Allowed (no permission registered)'
- else:
- principals = authn_policy.effective_principals(request)
- msg = str(authz_policy.permits(context, principals,
- permission))
- else:
- msg = 'Allowed (no authorization policy in use)'
-
- view_name = getattr(request, 'view_name', None)
- url = getattr(request, 'url', None)
- msg = ('debug_authorization of url %s (view name %r against '
- 'context %r): %s' % (url, view_name, context, msg))
- logger = self.queryUtility(ILogger, 'repoze.bfg.debug')
- logger and logger.debug(msg)
- if request is not None:
- request.authdebug_message = msg
- return view(context, request)
-
- wrapped_view = _authdebug_view
- decorate_view(wrapped_view, view)
-
- return wrapped_view
-
- def route(self, name, path, view=None, view_for=None,
- permission=None, factory=None, request_type=None, for_=None,
- header=None, xhr=False, accept=None, path_info=None,
- request_method=None, request_param=None,
- view_permission=None, view_request_type=None,
- view_request_method=None, view_request_param=None,
- view_containment=None, view_attr=None,
- renderer=None, view_renderer=None, view_header=None,
- view_accept=None, view_xhr=False,
- view_path_info=None, _info=u''):
- # the strange ordering of the request kw args above is for b/w
- # compatibility purposes.
- # these are route predicates; if they do not match, the next route
- # in the routelist will be tried
- _, predicates = _make_predicates(xhr=xhr,
- request_method=request_method,
- path_info=path_info,
- request_param=request_param,
- header=header,
- accept=accept)
-
- if request_type in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE'):
- # b/w compat for 1.0
- view_request_method = request_type
- request_type = None
-
- request_iface = self.queryUtility(IRouteRequest, name=name)
- if request_iface is None:
- request_iface = route_request_iface(name)
- self.registerUtility(request_iface, IRouteRequest, name=name)
-
- if view:
- view_for = view_for or for_
- view_request_type = view_request_type or request_type
- view_permission = view_permission or permission
- view_renderer = view_renderer or renderer
- self.view(
- permission=view_permission,
- for_=view_for,
- view=view,
- name='',
- request_type=view_request_type,
- route_name=name,
- request_method=view_request_method,
- request_param=view_request_param,
- containment=view_containment,
- attr=view_attr,
- renderer=view_renderer,
- header=view_header,
- accept=view_accept,
- xhr=view_xhr,
- path_info=view_path_info,
- info=_info,
- )
-
- mapper = self.getUtility(IRoutesMapper)
- mapper.connect(path, name, factory, predicates=predicates)
-
- def scan(self, package, _info=u'', martian=martian):
- # martian overrideable only for unit tests
- multi_grokker = BFGMultiGrokker()
- multi_grokker.register(BFGViewGrokker())
- module_grokker = martian.ModuleGrokker(grokker=multi_grokker)
- martian.grok_dotted_name(
- package.__name__, grokker=module_grokker,
- _info=_info, _registry=self,
- exclude_filter=lambda name: name.startswith('.'))
-
- def authentication_policy(self, policy, _info=u''):
- self.registerUtility(policy, IAuthenticationPolicy, info=_info)
-
- def authorization_policy(self, policy, _info=u''):
- self.registerUtility(policy, IAuthorizationPolicy, info=_info)
-
- def renderer(self, factory, name, _info=u''):
- iface = IRendererFactory
- if name.startswith('.'):
- iface = ITemplateRendererFactory
- self.registerUtility(factory, iface, name=name, info=_info)
-
- def resource(self, to_override, override_with, _override=None,
- _info=u''):
- if to_override == override_with:
- raise ConfigurationError('You cannot override a resource with '
- 'itself')
-
- package = to_override
- path = ''
- if ':' in to_override:
- package, path = to_override.split(':', 1)
-
- override_package = override_with
- override_prefix = ''
- if ':' in override_with:
- override_package, override_prefix = override_with.split(':', 1)
-
- if path and path.endswith('/'):
- if override_prefix and (not override_prefix.endswith('/')):
- raise ConfigurationError(
- 'A directory cannot be overridden with a file (put a slash '
- 'at the end of override_with if necessary)')
-
- if override_prefix and override_prefix.endswith('/'):
- if path and (not path.endswith('/')):
- raise ConfigurationError(
- 'A file cannot be overridden with a directory (put a slash '
- 'at the end of to_override if necessary)')
-
- __import__(package)
- __import__(override_package)
- package = sys.modules[package]
- override_package = sys.modules[override_package]
-
- if _override is not None:
- _override(package, path, override_package, override_prefix)
- else:
- self._override(package, path, override_package, override_prefix)
-
- def _override(self, package, path, override_package, override_prefix,
- _info=u'', PackageOverrides=PackageOverrides):
- pkg_name = package.__name__
- override_pkg_name = override_package.__name__
- override = self.queryUtility(IPackageOverrides, name=pkg_name)
- if override is None:
- override = PackageOverrides(package)
- self.registerUtility(override, IPackageOverrides, name=pkg_name,
- info=_info)
- override.insert(path, override_pkg_name, override_prefix)
-
-
- def notfound(self, view=None, attr=None, renderer=None, wrapper=None,
- _info=u''):
- self._view_utility(view, attr, renderer, wrapper, INotFoundView,
- _info=_info)
-
- def forbidden(self, view=None, attr=None, renderer=None, wrapper=None,
- _info=u''):
- self._view_utility(view, attr, renderer, wrapper,
- IForbiddenView, _info=_info)
-
- def view_utility(self, view, attr, renderer, wrapper, iface, _info=u''):
- if not view:
- if renderer:
- def view(context, request):
- return {}
- else:
- raise ConfigurationError('"view" attribute was not specified and '
- 'no renderer specified')
-
- derived_view = self.derive_view(view, attr=attr, renderer_name=renderer,
- wrapper_viewname=wrapper)
- self.registerUtility(derived_view, iface, '', info=_info)
-
- def static(self, name, path, cache_max_age=3600, _info=u''):
- view = static_view(path, cache_max_age=cache_max_age)
- self.route(name, "%s*subpath" % name, view=view,
- view_for=StaticRootFactory, factory=StaticRootFactory(path),
- _info=_info)
-
-
-def _make_predicates(xhr=None, request_method=None, path_info=None,
- request_param=None, header=None, accept=None,
- containment=None):
- # Predicates are added to the predicate list in (presumed)
- # computation expense order. All predicates associated with a
- # view must evaluate true for the view to "match" a request.
- # Elsewhere in the code, we evaluate them using a generator
- # expression. The fastest predicate should be evaluated first,
- # then the next fastest, and so on, as if one returns false, the
- # remainder of the predicates won't need to be evaluated.
-
- # Each predicate is associated with a weight value. The weight
- # symbolizes the relative potential "importance" of the predicate
- # to all other predicates. A larger weight indicates greater
- # importance. These weights are subtracted from an aggregate
- # 'weight' variable. The aggregate weight is then divided by the
- # length of the predicate list to compute a "score" for this view.
- # The score represents the ordering in which a "multiview" ( a
- # collection of views that share the same context/request/name
- # triad but differ in other ways via predicates) will attempt to
- # call its set of views. Views with lower scores will be tried
- # first. The intent is to a) ensure that views with more
- # predicates are always evaluated before views with fewer
- # predicates and b) to ensure a stable call ordering of views that
- # share the same number of predicates.
-
- # Views which do not have any predicates get a score of
- # sys.maxint, meaning that they will be tried very last.
-
- predicates = []
- weight = sys.maxint
-
- if xhr:
- def xhr_predicate(context, request):
- return request.is_xhr
- weight = weight - 10
- predicates.append(xhr_predicate)
-
- if request_method is not None:
- def request_method_predicate(context, request):
- return request.method == request_method
- weight = weight - 20
- predicates.append(request_method_predicate)
-
- if path_info is not None:
- try:
- path_info_val = re.compile(path_info)
- except re.error, why:
- raise ConfigurationError(why[0])
- def path_info_predicate(context, request):
- return path_info_val.match(request.path_info) is not None
- weight = weight - 30
- predicates.append(path_info_predicate)
-
- if request_param is not None:
- request_param_val = None
- if '=' in request_param:
- request_param, request_param_val = request_param.split('=', 1)
- def request_param_predicate(context, request):
- if request_param_val is None:
- return request_param in request.params
- return request.params.get(request_param) == request_param_val
- weight = weight - 40
- predicates.append(request_param_predicate)
-
- if header is not None:
- header_name = header
- header_val = None
- if ':' in header:
- header_name, header_val = header.split(':', 1)
- try:
- header_val = re.compile(header_val)
- except re.error, why:
- raise ConfigurationError(why[0])
- def header_predicate(context, request):
- if header_val is None:
- return header_name in request.headers
- val = request.headers.get(header_name)
- return header_val.match(val) is not None
- weight = weight - 50
- predicates.append(header_predicate)
-
- if accept is not None:
- def accept_predicate(context, request):
- return accept in request.accept
- weight = weight - 60
- predicates.append(accept_predicate)
-
- if containment is not None:
- def containment_predicate(context, request):
- return find_interface(context, containment) is not None
- weight = weight - 70
- predicates.append(containment_predicate)
-
- # this will be == sys.maxint if no predicates
- score = weight / (len(predicates) + 1)
- return score, predicates
-
-class BFGViewMarker(object):
- pass
-
-class BFGMultiGrokker(martian.core.MultiInstanceOrClassGrokkerBase):
- def get_bases(self, obj):
- if hasattr(obj, '__bfg_view_settings__'):
- return [BFGViewMarker]
- return []
-
-class BFGViewGrokker(martian.InstanceGrokker):
- martian.component(BFGViewMarker)
- def grok(self, name, obj, **kw):
- config = getattr(obj, '__bfg_view_settings__', [])
- for settings in config:
- registry = kw['_registry']
- info = kw['_info']
- registry.view(view=obj, _info=info, **settings)
- return bool(config)
-
-class DefaultRootFactory:
- __parent__ = None
- __name__ = None
- def __init__(self, request):
- matchdict = getattr(request, 'matchdict', {})
- # provide backwards compatibility for applications which
- # used routes (at least apps without any custom "context
- # factory") in BFG 0.9.X and before
- self.__dict__.update(matchdict)
-
diff --git a/repoze/bfg/renderers.py b/repoze/bfg/renderers.py
index b5dfad523..f13d1ccb4 100644
--- a/repoze/bfg/renderers.py
+++ b/repoze/bfg/renderers.py
@@ -62,8 +62,10 @@ def template_renderer_factory(path, impl, level=3):
return renderer
def renderer_from_name(path):
- sm = getSiteManager()
- return sm.renderer_from_name(path)
+ from repoze.bfg.configuration import Configurator
+ reg = getSiteManager()
+ config = Configurator(reg)
+ return config.renderer_from_name(path)
def _reload_resources():
settings = get_settings()
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 595c89298..6b1502d22 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -2,24 +2,28 @@ from zope.component.event import dispatch
from zope.interface import implements
from zope.interface import providedBy
+from zope.interface import alsoProvides
from repoze.bfg.interfaces import IForbiddenView
from repoze.bfg.interfaces import ILogger
from repoze.bfg.interfaces import INotFoundView
from repoze.bfg.interfaces import IRootFactory
from repoze.bfg.interfaces import IRouter
+from repoze.bfg.interfaces import IRouteRequest
+from repoze.bfg.interfaces import IRoutesMapper
from repoze.bfg.interfaces import ISettings
from repoze.bfg.interfaces import ITraverser
from repoze.bfg.interfaces import IView
-from repoze.bfg.configuration import make_registry
from repoze.bfg.configuration import DefaultRootFactory
+from repoze.bfg.configuration import Configurator
from repoze.bfg.events import AfterTraversal
from repoze.bfg.events import NewRequest
from repoze.bfg.events import NewResponse
from repoze.bfg.events import WSGIApplicationCreatedEvent
from repoze.bfg.exceptions import Forbidden
from repoze.bfg.exceptions import NotFound
+from repoze.bfg.registry import Registry
from repoze.bfg.request import Request
from repoze.bfg.threadlocal import manager
from repoze.bfg.traversal import ModelGraphTraverser
@@ -41,6 +45,7 @@ class Router(object):
self.notfound_view = q(INotFoundView, default=default_notfound_view)
self.forbidden_view = q(IForbiddenView, default=default_forbidden_view)
self.root_factory = q(IRootFactory, default=DefaultRootFactory)
+ self.routes_mapper = q(IRoutesMapper)
self.root_policy = self.root_factory # b/w compat
self.registry = registry
settings = registry.queryUtility(ISettings)
@@ -69,9 +74,26 @@ class Router(object):
attrs['registry'] = registry
has_listeners and registry.notify(NewRequest(request))
- # view lookup
- root = self.root_factory(request)
+ # root resolution
+ root_factory = self.root_factory
+ if self.routes_mapper is not None:
+ info = self.routes_mapper(request)
+ match, route = info['match'], info['route']
+ if match:
+ environ['wsgiorg.routing_args'] = ((), match)
+ environ['bfg.routes.route'] = route
+ environ['bfg.routes.matchdict'] = match
+ request.matchdict = match
+ iface = registry.queryUtility(IRouteRequest,
+ name=route.name)
+ if iface is not None:
+ alsoProvides(request, iface)
+ root_factory = route.factory or self.root_factory
+
+ root = root_factory(request)
attrs['root'] = root
+
+ # view lookup
traverser = registry.queryAdapter(root, ITraverser)
if traverser is None:
traverser = ModelGraphTraverser(root)
@@ -135,12 +157,10 @@ class Router(object):
finally:
manager.pop()
-# make_registry and manager kw args for unit testing only
# note that ``options`` is a b/w compat alias for ``settings``
def make_app(root_factory, package=None, filename='configure.zcml',
- settings=None, options=None,
- authentication_policy=None, authorization_policy=None,
- manager=manager, make_registry=make_registry):
+ settings=None, options=None, Configurator=Configurator,
+ Router=Router, Registry=Registry, manager=manager):
""" Return a Router object, representing a fully configured
``repoze.bfg`` WSGI application.
@@ -178,9 +198,10 @@ def make_app(root_factory, package=None, filename='configure.zcml',
``settings`` keyword parameter.
"""
settings = settings or options
- registry = make_registry(root_factory, package, filename,
- authentication_policy, authorization_policy,
- settings)
+ registry = Registry()
+ config = Configurator(registry)
+ config.default_configuration(root_factory, package=package,
+ filename=filename, settings=settings)
app = Router(registry)
# We push the registry on to the stack here in case any ZCA API is
# used in listeners subscribed to the WSGIApplicationCreatedEvent
@@ -193,4 +214,3 @@ def make_app(root_factory, package=None, filename='configure.zcml',
finally:
manager.pop()
return app
-
diff --git a/repoze/bfg/testing.py b/repoze/bfg/testing.py
index 7167ec7b0..f3af05f22 100644
--- a/repoze/bfg/testing.py
+++ b/repoze/bfg/testing.py
@@ -10,6 +10,7 @@ from repoze.bfg.interfaces import IRequest
from repoze.bfg.configuration import zcml_configure # API import alias
from repoze.bfg.registry import Registry
+from repoze.bfg.threadlocal import manager
_marker = object()
@@ -232,8 +233,10 @@ def registerRoute(path, name, factory=None):
.. note:: This API was added in :mod:`repoze.bfg` version 1.1.
"""
from repoze.bfg.interfaces import IRoutesMapper
- from zope.component import getUtility
- mapper = getUtility(IRoutesMapper)
+ from zope.component import queryUtility
+ mapper = queryUtility(IRoutesMapper)
+ if mapper is None:
+ mapper = registerRoutesMapper(factory)
mapper.connect(path, name, factory)
def registerRoutesMapper(root_factory=None):
@@ -259,10 +262,8 @@ def registerRoutesMapper(root_factory=None):
"""
from repoze.bfg.interfaces import IRoutesMapper
- from repoze.bfg.urldispatch import RoutesRootFactory
- if root_factory is None:
- root_factory = DummyRootFactory
- mapper = RoutesRootFactory(root_factory)
+ from repoze.bfg.urldispatch import RoutesMapper
+ mapper = RoutesMapper()
sm = getSiteManager()
sm.registerUtility(mapper, IRoutesMapper)
return mapper
@@ -535,6 +536,8 @@ def setUp():
"""
registry = Registry('testing')
getSiteManager.sethook(lambda *arg: registry)
+ manager.clear()
+ manager.push({'registry':registry, 'request':None})
_clearContext()
def tearDown():
@@ -550,6 +553,7 @@ def tearDown():
"""
getSiteManager.reset()
+ manager.pop()
def cleanUp():
""" Deprecated (as of BFG 1.1) function whichs sets up a new
diff --git a/repoze/bfg/tests/test_configuration.py b/repoze/bfg/tests/test_configuration.py
index 026002253..d7f707f87 100644
--- a/repoze/bfg/tests/test_configuration.py
+++ b/repoze/bfg/tests/test_configuration.py
@@ -8,61 +8,447 @@ class MakeRegistryTests(unittest.TestCase):
def tearDown(self):
cleanUp()
+
+ def _makeOne(self):
+ from repoze.bfg.registry import Registry
+ from repoze.bfg.configuration import Configurator
+ reg = Registry()
+ config = Configurator(reg)
+
+
+class ConfiguratorTests(unittest.TestCase):
+ def _makeOne(self, registry=None):
+ from repoze.bfg.registry import Registry
+ from repoze.bfg.configuration import Configurator
+ if registry is None:
+ registry = Registry()
+ return Configurator(registry)
+
+ def _registerRenderer(self, config, name='.txt'):
+ from repoze.bfg.interfaces import IRendererFactory
+ from repoze.bfg.interfaces import ITemplateRenderer
+ from zope.interface import implements
+ class Renderer:
+ implements(ITemplateRenderer)
+ def __init__(self, path):
+ pass
+ def __call__(self, *arg):
+ return 'Hello!'
+ config.reg.registerUtility(Renderer, IRendererFactory, name=name)
+
+ def test__override_not_yet_registered(self):
+ from repoze.bfg.interfaces import IPackageOverrides
+ package = DummyPackage('package')
+ opackage = DummyPackage('opackage')
+ config = self._makeOne()
+ config._override(package, 'path', opackage, 'oprefix',
+ PackageOverrides=DummyOverrides)
+ overrides = config.reg.queryUtility(IPackageOverrides,
+ name='package')
+ self.assertEqual(overrides.inserted, [('path', 'opackage', 'oprefix')])
+ self.assertEqual(overrides.package, package)
+
+ def test__override_already_registered(self):
+ from repoze.bfg.interfaces import IPackageOverrides
+ package = DummyPackage('package')
+ opackage = DummyPackage('opackage')
+ overrides = DummyOverrides(package)
+ config = self._makeOne()
+ config.reg.registerUtility(overrides, IPackageOverrides,
+ name='package')
+ config._override(package, 'path', opackage, 'oprefix',
+ PackageOverrides=DummyOverrides)
+ self.assertEqual(overrides.inserted, [('path', 'opackage', 'oprefix')])
+ self.assertEqual(overrides.package, package)
+
+ def test_map_view_as_function_context_and_request(self):
+ def view(context, request):
+ return 'OK'
+ config = self._makeOne()
+ result = config.map_view(view)
+ self.failUnless(result is view)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_function_with_attr(self):
+ def view(context, request):
+ """ """
+ config = self._makeOne()
+ result = config.map_view(view, attr='__name__')
+ self.failIf(result is view)
+ self.assertRaises(TypeError, result, None, None)
+
+ def test_map_view_as_function_with_attr_and_renderer(self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ def view(context, request):
+ """ """
+ result = config.map_view(view, attr='__name__',
+ renderer_name='fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertRaises(TypeError, result, None, None)
- def _callFUT(self, *arg, **kw):
- from repoze.bfg.router import make_registry
- return make_registry(*arg, **kw)
+ def test_map_view_as_function_requestonly(self):
+ config = self._makeOne()
+ def view(request):
+ return 'OK'
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_function_requestonly_with_attr(self):
+ config = self._makeOne()
+ def view(request):
+ """ """
+ result = config.map_view(view, attr='__name__')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertRaises(TypeError, result, None, None)
+
+ def test_map_view_as_newstyle_class_context_and_request(self):
+ config = self._makeOne()
+ class view(object):
+ def __init__(self, context, request):
+ pass
+ def __call__(self):
+ return 'OK'
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_newstyle_class_context_and_request_with_attr(self):
+ config = self._makeOne()
+ class view(object):
+ def __init__(self, context, request):
+ pass
+ def index(self):
+ return 'OK'
+ result = config.map_view(view, attr='index')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_newstyle_class_context_and_request_attr_and_renderer(
+ self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ class view(object):
+ def __init__(self, context, request):
+ pass
+ def index(self):
+ return {'a':'1'}
+ result = config.map_view(
+ view, attr='index',
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def test_map_view_as_newstyle_class_requestonly(self):
+ config = self._makeOne()
+ class view(object):
+ def __init__(self, request):
+ pass
+ def __call__(self):
+ return 'OK'
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_newstyle_class_requestonly_with_attr(self):
+ config = self._makeOne()
+ class view(object):
+ def __init__(self, request):
+ pass
+ def index(self):
+ return 'OK'
+ result = config.map_view(view, attr='index')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_newstyle_class_requestonly_with_attr_and_renderer(
+ self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ class view(object):
+ def __init__(self, request):
+ pass
+ def index(self):
+ return {'a':'1'}
+ result = config.map_view(
+ view, attr='index',
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def test_map_view_as_oldstyle_class_context_and_request(self):
+ config = self._makeOne()
+ class view:
+ def __init__(self, context, request):
+ pass
+ def __call__(self):
+ return 'OK'
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_oldstyle_class_context_and_request_with_attr(self):
+ config = self._makeOne()
+ class view:
+ def __init__(self, context, request):
+ pass
+ def index(self):
+ return 'OK'
+ result = config.map_view(view, attr='index')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_oldstyle_class_context_and_request_attr_and_renderer(
+ self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ class view:
+ def __init__(self, context, request):
+ pass
+ def index(self):
+ return {'a':'1'}
+ result = config.map_view(
+ view, attr='index',
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
- def test_fixtureapp_default_filename_withpackage(self):
+ def test_map_view_as_oldstyle_class_requestonly(self):
+ config = self._makeOne()
+ class view:
+ def __init__(self, request):
+ pass
+ def __call__(self):
+ return 'OK'
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_oldstyle_class_requestonly_with_attr(self):
+ config = self._makeOne()
+ class view:
+ def __init__(self, request):
+ pass
+ def index(self):
+ return 'OK'
+ result = config.map_view(view, attr='index')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_oldstyle_class_requestonly_attr_and_renderer(self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ class view:
+ def __init__(self, request):
+ pass
+ def index(self):
+ return {'a':'1'}
+ result = config.map_view(
+ view, attr='index',
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.assertEqual(view.__name__, result.__name__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def test_map_view_as_instance_context_and_request(self):
+ config = self._makeOne()
+ class View:
+ def __call__(self, context, request):
+ return 'OK'
+ view = View()
+ result = config.map_view(view)
+ self.failUnless(result is view)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_instance_context_and_request_and_attr(self):
+ config = self._makeOne()
+ class View:
+ def index(self, context, request):
+ return 'OK'
+ view = View()
+ result = config.map_view(view, attr='index')
+ self.failIf(result is view)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_instance_context_and_request_attr_and_renderer(self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ class View:
+ def index(self, context, request):
+ return {'a':'1'}
+ view = View()
+ result = config.map_view(
+ view, attr='index',
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def test_map_view_as_instance_requestonly(self):
+ config = self._makeOne()
+ class View:
+ def __call__(self, request):
+ return 'OK'
+ view = View()
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.failUnless('instance' in result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_instance_requestonly_with_attr(self):
+ config = self._makeOne()
+ class View:
+ def index(self, request):
+ return 'OK'
+ view = View()
+ result = config.map_view(view, attr='index')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.failUnless('instance' in result.__name__)
+ self.assertEqual(result(None, None), 'OK')
+
+ def test_map_view_as_instance_requestonly_with_attr_and_renderer(self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ class View:
+ def index(self, request):
+ return {'a':'1'}
+ view = View()
+ result = config.map_view(
+ view, attr='index',
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ self.failUnless('instance' in result.__name__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def test_map_view_rendereronly(self):
+ config = self._makeOne()
+ self._registerRenderer(config)
+ def view(context, request):
+ return {'a':'1'}
+ result = config.map_view(
+ view,
+ renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def test_map_view_defaultrendereronly(self):
+ config = self._makeOne()
+ self._registerRenderer(config, name='')
+ def view(context, request):
+ return {'a':'1'}
+ result = config.map_view(view)
+ self.failIf(result is view)
+ self.assertEqual(view.__module__, result.__module__)
+ self.assertEqual(view.__doc__, result.__doc__)
+ request = DummyRequest()
+ self.assertEqual(result(None, request).body, 'Hello!')
+
+ def _callDefaultConfiguration(self, *arg, **kw):
+ inst = self._makeOne()
+ inst.default_configuration(*arg, **kw)
+ return inst.reg
+
+ def test_default_config_fixtureapp_default_filename_withpackage(self):
manager = DummyRegistryManager()
from repoze.bfg.tests import fixtureapp
rootfactory = DummyRootFactory(None)
- registry = self._callFUT(rootfactory, fixtureapp)
- self.assertEqual(registry.__name__, 'repoze.bfg.tests.fixtureapp')
+ registry = self._callDefaultConfiguration(rootfactory, fixtureapp)
from repoze.bfg.tests.fixtureapp.models import IFixture
self.failUnless(registry.queryUtility(IFixture)) # only in c.zcml
- def test_fixtureapp_explicit_filename(self):
+ def test_default_config_fixtureapp_explicit_filename(self):
manager = DummyRegistryManager()
from repoze.bfg.tests import fixtureapp
rootfactory = DummyRootFactory(None)
- registry = self._callFUT(
+ registry = self._callDefaultConfiguration(
rootfactory, fixtureapp, filename='another.zcml',
manager=manager)
- self.assertEqual(registry.__name__, 'repoze.bfg.tests.fixtureapp')
from repoze.bfg.tests.fixtureapp.models import IFixture
self.failIf(registry.queryUtility(IFixture)) # only in c.zcml
- def test_fixtureapp_explicit_filename_in_options(self):
+ def test_default_config_fixtureapp_explicit_filename_in_settings(self):
import os
manager = DummyRegistryManager()
rootfactory = DummyRootFactory(None)
from repoze.bfg.tests import fixtureapp
zcmlfile = os.path.join(os.path.dirname(fixtureapp.__file__),
'another.zcml')
- registry = self._callFUT(
+ registry = self._callDefaultConfiguration(
rootfactory, fixtureapp, filename='configure.zcml',
- options={'configure_zcml':zcmlfile},
+ settings={'configure_zcml':zcmlfile},
manager=manager)
- self.assertEqual(registry.__name__, 'repoze.bfg.tests.fixtureapp')
from repoze.bfg.tests.fixtureapp.models import IFixture
self.failIf(registry.queryUtility(IFixture)) # only in c.zcml
- def test_fixtureapp_explicit_specification_in_options(self):
+ def test_default_config_fixtureapp_explicit_specification_in_settings(self):
manager = DummyRegistryManager()
rootfactory = DummyRootFactory(None)
from repoze.bfg.tests import fixtureapp
zcmlfile = 'repoze.bfg.tests.fixtureapp.subpackage:yetanother.zcml'
- registry = self._callFUT(
+ registry = self._callDefaultConfiguration(
rootfactory, fixtureapp, filename='configure.zcml',
- options={'configure_zcml':zcmlfile},
+ settings={'configure_zcml':zcmlfile},
manager=manager)
- self.assertEqual(registry.__name__,
- 'repoze.bfg.tests.fixtureapp.subpackage')
from repoze.bfg.tests.fixtureapp.models import IFixture
self.failIf(registry.queryUtility(IFixture)) # only in c.zcml
- def test_fixtureapp_filename_hascolon_isabs(self):
+ def test_default_config_fixtureapp_filename_hascolon_isabs(self):
manager = DummyRegistryManager()
rootfactory = DummyRootFactory(None)
from repoze.bfg.tests import fixtureapp
@@ -72,35 +458,37 @@ class MakeRegistryTests(unittest.TestCase):
return True
os = Dummy()
os.path = Dummy()
- self.assertRaises(IOError, self._callFUT,
+ self.assertRaises(IOError, self._callDefaultConfiguration,
rootfactory,
fixtureapp,
filename='configure.zcml',
- options={'configure_zcml':zcmlfile},
+ settings={'configure_zcml':zcmlfile},
manager=manager,
os=os)
- def test_custom_settings(self):
+ def test_default_config_custom_settings(self):
manager = DummyRegistryManager()
- options= {'mysetting':True}
+ settings = {'mysetting':True}
from repoze.bfg.tests import fixtureapp
rootfactory = DummyRootFactory(None)
- registry = self._callFUT(rootfactory, fixtureapp, options=options,
- manager=manager)
+ registry = self._callDefaultConfiguration(
+ rootfactory, fixtureapp, settings=settings,
+ manager=manager)
from repoze.bfg.interfaces import ISettings
settings = registry.getUtility(ISettings)
self.assertEqual(settings.reload_templates, False)
self.assertEqual(settings.debug_authorization, False)
self.assertEqual(settings.mysetting, True)
- def test_registrations(self):
+ def test_default_config_registrations(self):
manager = DummyRegistryManager()
- options= {'reload_templates':True,
- 'debug_authorization':True}
+ settings = {'reload_templates':True,
+ 'debug_authorization':True}
from repoze.bfg.tests import fixtureapp
rootfactory = DummyRootFactory(None)
- registry = self._callFUT(rootfactory, fixtureapp, options=options,
- manager=manager)
+ registry = self._callDefaultConfiguration(
+ rootfactory, fixtureapp, settings=settings,
+ manager=manager)
from repoze.bfg.interfaces import ISettings
from repoze.bfg.interfaces import ILogger
from repoze.bfg.interfaces import IRootFactory
@@ -113,110 +501,106 @@ class MakeRegistryTests(unittest.TestCase):
self.assertEqual(rootfactory, rootfactory)
self.failUnless(manager.pushed and manager.popped)
- def test_routes_in_config_with_rootfactory(self):
- options= {'reload_templates':True,
- 'debug_authorization':True}
- from repoze.bfg.urldispatch import RoutesRootFactory
- from repoze.bfg.tests import routesapp
- rootfactory = DummyRootFactory(None)
- registry = self._callFUT(rootfactory, routesapp, options=options)
+ def test_default_config_routes_in_config(self):
from repoze.bfg.interfaces import ISettings
from repoze.bfg.interfaces import ILogger
from repoze.bfg.interfaces import IRootFactory
- settings = registry.getUtility(ISettings)
- logger = registry.getUtility(ILogger, name='repoze.bfg.debug')
- effective_rootfactory = registry.getUtility(IRootFactory)
- self.assertEqual(logger.name, 'repoze.bfg.debug')
- self.assertEqual(settings.reload_templates, True)
- self.assertEqual(settings.debug_authorization, True)
- self.failUnless(isinstance(effective_rootfactory, RoutesRootFactory))
- self.assertEqual(effective_rootfactory.default_root_factory,
- rootfactory)
-
- def test_routes_in_config_no_rootfactory(self):
- options= {'reload_templates':True,
- 'debug_authorization':True}
- from repoze.bfg.urldispatch import RoutesRootFactory
- from repoze.bfg.router import DefaultRootFactory
+ from repoze.bfg.interfaces import IRoutesMapper
+ settings = {'reload_templates':True,
+ 'debug_authorization':True}
from repoze.bfg.tests import routesapp
- registry = self._callFUT(None, routesapp, options=options)
- from repoze.bfg.interfaces import ISettings
- from repoze.bfg.interfaces import ILogger
- from repoze.bfg.interfaces import IRootFactory
+ rootfactory = DummyRootFactory(None)
+ registry = self._callDefaultConfiguration(
+ rootfactory, routesapp, settings=settings)
settings = registry.getUtility(ISettings)
logger = registry.getUtility(ILogger, name='repoze.bfg.debug')
- rootfactory = registry.getUtility(IRootFactory)
- self.assertEqual(logger.name, 'repoze.bfg.debug')
- self.assertEqual(settings.reload_templates, True)
- self.assertEqual(settings.debug_authorization, True)
- self.failUnless(isinstance(rootfactory, RoutesRootFactory))
- self.assertEqual(rootfactory.default_root_factory, DefaultRootFactory)
-
- def test_no_routes_in_config_no_rootfactory(self):
- from repoze.bfg.router import DefaultRootFactory
- from repoze.bfg.interfaces import IRootFactory
- options= {'reload_templates':True,
- 'debug_authorization':True}
- from repoze.bfg.tests import fixtureapp
- registry = self._callFUT(None, fixtureapp, options=options)
- rootfactory = registry.getUtility(IRootFactory)
- self.assertEqual(rootfactory, DefaultRootFactory)
+ self.assertEqual(registry.getUtility(IRootFactory), rootfactory)
+ self.failUnless(registry.getUtility(IRoutesMapper))
- def test_authorization_policy_no_authentication_policy(self):
- from repoze.bfg.interfaces import IAuthorizationPolicy
- authzpolicy = DummyContext()
- from repoze.bfg.tests import routesapp
- logger = DummyLogger()
- registry = self._callFUT(
- None, routesapp, authorization_policy=authzpolicy,
- debug_logger=logger)
- self.failIf(registry.queryUtility(IAuthorizationPolicy))
- self.assertEqual(logger.messages, [])
-
- def test_authentication_policy_no_authorization_policy(self):
- from repoze.bfg.interfaces import IAuthorizationPolicy
- from repoze.bfg.interfaces import IAuthenticationPolicy
- from repoze.bfg.authorization import ACLAuthorizationPolicy
- authnpolicy = DummyContext()
- from repoze.bfg.tests import routesapp
- logger = DummyLogger()
- registry = self._callFUT(
- None, routesapp, authentication_policy=authnpolicy,
- debug_logger=logger)
- self.assertEqual(registry.getUtility(IAuthenticationPolicy),
- authnpolicy)
- self.assertEqual(
- registry.getUtility(IAuthorizationPolicy).__class__,
- ACLAuthorizationPolicy)
- self.assertEqual(len(logger.messages), 1) # deprecation warning
-
- def test_authentication_policy_and_authorization_policy(self):
- from repoze.bfg.interfaces import IAuthorizationPolicy
- from repoze.bfg.interfaces import IAuthenticationPolicy
- authnpolicy = DummyContext()
- authzpolicy = DummyContext()
- from repoze.bfg.tests import routesapp
- logger = DummyLogger()
- registry = self._callFUT(
- None, routesapp, authentication_policy=authnpolicy,
- authorization_policy = authzpolicy,
- debug_logger=logger)
- self.assertEqual(registry.getUtility(IAuthenticationPolicy),
- authnpolicy)
- self.assertEqual(registry.getUtility(IAuthorizationPolicy),
- authzpolicy)
- self.assertEqual(len(logger.messages), 1) # deprecation warning
-
- def test_lock_and_unlock(self):
+ def test_default_config_lock_and_unlock(self):
from repoze.bfg.tests import fixtureapp
rootfactory = DummyRootFactory(None)
dummylock = DummyLock()
- registry = self._callFUT(
+ registry = self._callDefaultConfiguration(
rootfactory, fixtureapp, filename='configure.zcml',
lock=dummylock)
self.assertEqual(dummylock.acquired, True)
self.assertEqual(dummylock.released, True)
+class TestBFGViewGrokker(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.configuration import BFGViewGrokker
+ return BFGViewGrokker
+
+ def _makeOne(self, *arg, **kw):
+ return self._getTargetClass()(*arg, **kw)
+
+ def test_grok_is_bfg_view(self):
+ from zope.component import getSiteManager
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from zope.interface import Interface
+ from repoze.bfg.configuration import Configurator
+ grokker = self._makeOne()
+ class obj:
+ def __init__(self, context, request):
+ pass
+ def __call__(self):
+ return 'OK'
+ settings = dict(permission='foo', for_=Interface, name='foo.html',
+ request_type=IRequest, route_name=None,
+ request_method=None, request_param=None,
+ containment=None, attr=None, renderer=None,
+ wrapper=None, xhr=False, header=None,
+ accept=None)
+ obj.__bfg_view_settings__ = [settings]
+ sm = getSiteManager()
+ config = Configurator(sm)
+ result = grokker.grok('name', obj, _info='', _configurator=config)
+ self.assertEqual(result, True)
+ wrapped = sm.adapters.lookup((Interface, IRequest), IView,
+ name='foo.html')
+ self.assertEqual(wrapped(None, None), 'OK')
+
+ def test_grok_is_not_bfg_view(self):
+ grokker = self._makeOne()
+ class obj:
+ pass
+ context = DummyContext()
+ result = grokker.grok('name', obj)
+ self.assertEqual(result, False)
+
+class TestDefaultRootFactory(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.configuration import DefaultRootFactory
+ return DefaultRootFactory
+
+ def _makeOne(self, environ):
+ return self._getTargetClass()(environ)
+
+ def test_no_matchdict(self):
+ environ = {}
+ root = self._makeOne(environ)
+ self.assertEqual(root.__parent__, None)
+ self.assertEqual(root.__name__, None)
+
+ def test_matchdict(self):
+ class DummyRequest:
+ pass
+ request = DummyRequest()
+ request.matchdict = {'a':1, 'b':2}
+ root = self._makeOne(request)
+ self.assertEqual(root.a, 1)
+ self.assertEqual(root.b, 2)
+
+
+
class DummyRequest:
pass
@@ -253,3 +637,15 @@ class DummyLock:
def release(self):
self.released = True
+class DummyPackage:
+ def __init__(self, name):
+ self.__name__ = name
+
+class DummyOverrides:
+ def __init__(self, package):
+ self.package = package
+ self.inserted = []
+
+ def insert(self, path, package, prefix):
+ self.inserted.append((path, package, prefix))
+
diff --git a/repoze/bfg/tests/test_integration.py b/repoze/bfg/tests/test_integration.py
index c1779cdb6..57d22c286 100644
--- a/repoze/bfg/tests/test_integration.py
+++ b/repoze/bfg/tests/test_integration.py
@@ -99,7 +99,6 @@ class TestGrokkedApp(unittest.TestCase):
from repoze.bfg.view import render_view_to_response
from zope.interface import directlyProvides
from repoze.bfg.zcml import zcml_configure
- from repoze.bfg.interfaces import IView
from repoze.bfg.interfaces import IRequest
import repoze.bfg.tests.grokkedapp as package
diff --git a/repoze/bfg/tests/test_registry.py b/repoze/bfg/tests/test_registry.py
index 11f5cbf3a..b652d2155 100644
--- a/repoze/bfg/tests/test_registry.py
+++ b/repoze/bfg/tests/test_registry.py
@@ -38,450 +38,6 @@ class TestRegistry(unittest.TestCase):
registry.registerSubscriptionAdapter(EventHandler, [IFoo], Interface)
self.assertEqual(registry.has_listeners, True)
- def test__override_not_yet_registered(self):
- from repoze.bfg.interfaces import IPackageOverrides
- package = DummyPackage('package')
- opackage = DummyPackage('opackage')
- registry = self._makeOne()
- registry._override(package, 'path', opackage, 'oprefix',
- PackageOverrides=DummyOverrides)
- overrides = registry.queryUtility(IPackageOverrides, name='package')
- self.assertEqual(overrides.inserted, [('path', 'opackage', 'oprefix')])
- self.assertEqual(overrides.package, package)
-
- def test__override_already_registered(self):
- from repoze.bfg.interfaces import IPackageOverrides
- package = DummyPackage('package')
- opackage = DummyPackage('opackage')
- overrides = DummyOverrides(package)
- registry = self._makeOne()
- registry.registerUtility(overrides, IPackageOverrides, name='package')
- registry._override(package, 'path', opackage, 'oprefix',
- PackageOverrides=DummyOverrides)
- self.assertEqual(overrides.inserted, [('path', 'opackage', 'oprefix')])
- self.assertEqual(overrides.package, package)
-
- def _registerRenderer(self, reg, name='.txt'):
- from repoze.bfg.interfaces import IRendererFactory
- from repoze.bfg.interfaces import ITemplateRenderer
- from zope.interface import implements
- class Renderer:
- implements(ITemplateRenderer)
- def __init__(self, path):
- pass
- def __call__(self, *arg):
- return 'Hello!'
- reg.registerUtility(Renderer, IRendererFactory, name=name)
-
- def test_map_view_as_function_context_and_request(self):
- def view(context, request):
- return 'OK'
- reg = self._makeOne()
- result = reg.map_view(view)
- self.failUnless(result is view)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_function_with_attr(self):
- def view(context, request):
- """ """
- reg = self._makeOne()
- result = reg.map_view(view, attr='__name__')
- self.failIf(result is view)
- self.assertRaises(TypeError, result, None, None)
-
- def test_map_view_as_function_with_attr_and_renderer(self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- def view(context, request):
- """ """
- result = reg.map_view(view, attr='__name__',
- renderer_name='fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertRaises(TypeError, result, None, None)
-
- def test_map_view_as_function_requestonly(self):
- reg = self._makeOne()
- def view(request):
- return 'OK'
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_function_requestonly_with_attr(self):
- reg = self._makeOne()
- def view(request):
- """ """
- result = reg.map_view(view, attr='__name__')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertRaises(TypeError, result, None, None)
-
- def test_map_view_as_newstyle_class_context_and_request(self):
- reg = self._makeOne()
- class view(object):
- def __init__(self, context, request):
- pass
- def __call__(self):
- return 'OK'
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_newstyle_class_context_and_request_with_attr(self):
- reg = self._makeOne()
- class view(object):
- def __init__(self, context, request):
- pass
- def index(self):
- return 'OK'
- result = reg.map_view(view, attr='index')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_newstyle_class_context_and_request_attr_and_renderer(
- self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- class view(object):
- def __init__(self, context, request):
- pass
- def index(self):
- return {'a':'1'}
- result = reg.map_view(
- view, attr='index',
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_as_newstyle_class_requestonly(self):
- reg = self._makeOne()
- class view(object):
- def __init__(self, request):
- pass
- def __call__(self):
- return 'OK'
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_newstyle_class_requestonly_with_attr(self):
- reg = self._makeOne()
- class view(object):
- def __init__(self, request):
- pass
- def index(self):
- return 'OK'
- result = reg.map_view(view, attr='index')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_newstyle_class_requestonly_with_attr_and_renderer(self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- class view(object):
- def __init__(self, request):
- pass
- def index(self):
- return {'a':'1'}
- result = reg.map_view(
- view, attr='index',
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_as_oldstyle_class_context_and_request(self):
- reg = self._makeOne()
- class view:
- def __init__(self, context, request):
- pass
- def __call__(self):
- return 'OK'
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_oldstyle_class_context_and_request_with_attr(self):
- reg = self._makeOne()
- class view:
- def __init__(self, context, request):
- pass
- def index(self):
- return 'OK'
- result = reg.map_view(view, attr='index')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_oldstyle_class_context_and_request_attr_and_renderer(
- self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- class view:
- def __init__(self, context, request):
- pass
- def index(self):
- return {'a':'1'}
- result = reg.map_view(
- view, attr='index',
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_as_oldstyle_class_requestonly(self):
- reg = self._makeOne()
- class view:
- def __init__(self, request):
- pass
- def __call__(self):
- return 'OK'
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_oldstyle_class_requestonly_with_attr(self):
- reg = self._makeOne()
- class view:
- def __init__(self, request):
- pass
- def index(self):
- return 'OK'
- result = reg.map_view(view, attr='index')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_oldstyle_class_requestonly_attr_and_renderer(self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- class view:
- def __init__(self, request):
- pass
- def index(self):
- return {'a':'1'}
- result = reg.map_view(
- view, attr='index',
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_as_instance_context_and_request(self):
- reg = self._makeOne()
- class View:
- def __call__(self, context, request):
- return 'OK'
- view = View()
- result = reg.map_view(view)
- self.failUnless(result is view)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_instance_context_and_request_and_attr(self):
- reg = self._makeOne()
- class View:
- def index(self, context, request):
- return 'OK'
- view = View()
- result = reg.map_view(view, attr='index')
- self.failIf(result is view)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_instance_context_and_request_attr_and_renderer(self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- class View:
- def index(self, context, request):
- return {'a':'1'}
- view = View()
- result = reg.map_view(
- view, attr='index',
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_as_instance_requestonly(self):
- reg = self._makeOne()
- class View:
- def __call__(self, request):
- return 'OK'
- view = View()
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.failUnless('instance' in result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_instance_requestonly_with_attr(self):
- reg = self._makeOne()
- class View:
- def index(self, request):
- return 'OK'
- view = View()
- result = reg.map_view(view, attr='index')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.failUnless('instance' in result.__name__)
- self.assertEqual(result(None, None), 'OK')
-
- def test_map_view_as_instance_requestonly_with_attr_and_renderer(self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- class View:
- def index(self, request):
- return {'a':'1'}
- view = View()
- result = reg.map_view(
- view, attr='index',
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.failUnless('instance' in result.__name__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_rendereronly(self):
- reg = self._makeOne()
- self._registerRenderer(reg)
- def view(context, request):
- return {'a':'1'}
- result = reg.map_view(
- view,
- renderer_name='repoze.bfg.tests:fixtures/minimal.txt')
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
- def test_map_view_defaultrendereronly(self):
- reg = self._makeOne()
- self._registerRenderer(reg, name='')
- def view(context, request):
- return {'a':'1'}
- result = reg.map_view(view)
- self.failIf(result is view)
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- request = DummyRequest()
- self.assertEqual(result(None, request).body, 'Hello!')
-
-class TestBFGViewGrokker(unittest.TestCase):
- def setUp(self):
- cleanUp()
-
- def tearDown(self):
- cleanUp()
-
- def _getTargetClass(self):
- from repoze.bfg.registry import BFGViewGrokker
- return BFGViewGrokker
-
- def _makeOne(self, *arg, **kw):
- return self._getTargetClass()(*arg, **kw)
-
- def test_grok_is_bfg_view(self):
- from zope.component import getSiteManager
- from repoze.bfg.interfaces import IRequest
- from repoze.bfg.interfaces import IView
- from zope.interface import Interface
- grokker = self._makeOne()
- class obj:
- def __init__(self, context, request):
- pass
- def __call__(self):
- return 'OK'
- settings = dict(permission='foo', for_=Interface, name='foo.html',
- request_type=IRequest, route_name=None,
- request_method=None, request_param=None,
- containment=None, attr=None, renderer=None,
- wrapper=None, xhr=False, header=None,
- accept=None)
- obj.__bfg_view_settings__ = [settings]
- sm = getSiteManager()
- result = grokker.grok('name', obj, _info='', _registry=sm)
- self.assertEqual(result, True)
- wrapped = sm.adapters.lookup((Interface, IRequest), IView,
- name='foo.html')
- self.assertEqual(wrapped(None, None), 'OK')
-
- def test_grok_is_not_bfg_view(self):
- grokker = self._makeOne()
- class obj:
- pass
- context = DummyContext()
- result = grokker.grok('name', obj, context=context)
- self.assertEqual(result, False)
- actions = context.actions
- self.assertEqual(len(actions), 0)
-
-class TestDefaultRootFactory(unittest.TestCase):
- def _getTargetClass(self):
- from repoze.bfg.registry import DefaultRootFactory
- return DefaultRootFactory
-
- def _makeOne(self, environ):
- return self._getTargetClass()(environ)
-
- def test_no_matchdict(self):
- environ = {}
- root = self._makeOne(environ)
- self.assertEqual(root.__parent__, None)
- self.assertEqual(root.__name__, None)
-
- def test_matchdict(self):
- class DummyRequest:
- pass
- request = DummyRequest()
- request.matchdict = {'a':1, 'b':2}
- root = self._makeOne(request)
- self.assertEqual(root.a, 1)
- self.assertEqual(root.b, 2)
-
class DummyModule:
__path__ = "foo"
__name__ = "dummy"
@@ -494,18 +50,6 @@ class DummyContext:
self.info = None
self.resolved = resolved
-class DummyPackage:
- def __init__(self, name):
- self.__name__ = name
-
-class DummyOverrides:
- def __init__(self, package):
- self.package = package
- self.inserted = []
-
- def insert(self, path, package, prefix):
- self.inserted.append((path, package, prefix))
-
class DummyRequest:
def __init__(self, environ=None):
if environ is None:
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index 07c10adaa..3321af938 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -14,6 +14,27 @@ class TestRouter(unittest.TestCase):
getSiteManager.reset()
cleanUp()
+ def _registerRouteRequest(self, name):
+ from repoze.bfg.interfaces import IRouteRequest
+ from zope.interface import Interface
+ from zope.component import getSiteManager
+ class IRequest(Interface):
+ """ """
+ sm = getSiteManager()
+ sm.registerUtility(IRequest, IRouteRequest, name=name)
+ return IRequest
+
+ def _connectRoute(self, path, name, factory=None):
+ from repoze.bfg.interfaces import IRoutesMapper
+ from zope.component import getSiteManager
+ from repoze.bfg.urldispatch import RoutesMapper
+ sm = getSiteManager()
+ mapper = sm.queryUtility(IRoutesMapper)
+ if mapper is None:
+ mapper = RoutesMapper()
+ sm.registerUtility(mapper, IRoutesMapper)
+ mapper.connect(path, name, factory)
+
def _registerLogger(self):
from zope.component import getSiteManager
gsm = getSiteManager()
@@ -420,6 +441,80 @@ class TestRouter(unittest.TestCase):
self.assertEqual(len(router.threadlocal_manager.pushed), 1)
self.assertEqual(len(router.threadlocal_manager.popped), 1)
+ def test_call_route_matches_and_has_factory(self):
+ req_iface = self._registerRouteRequest('foo')
+ root = object()
+ def factory(request):
+ return root
+ self._connectRoute('archives/:action/:article', 'foo', factory)
+ context = DummyContext()
+ self._registerTraverserFactory(context)
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ view = DummyView(response)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ self._registerView(view, '', None, None)
+ rootfactory = self._registerRootFactory(context)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ request = view.request
+ self.assertEqual(request.view_name, '')
+ self.assertEqual(request.subpath, [])
+ self.assertEqual(request.context, context)
+ self.assertEqual(request.root, root)
+ routing_args = environ['wsgiorg.routing_args'][1]
+ self.assertEqual(routing_args['action'], 'action1')
+ self.assertEqual(routing_args['article'], 'article1')
+ self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
+ self.assertEqual(environ['bfg.routes.route'].name, 'foo')
+ self.assertEqual(request.matchdict, routing_args)
+ self.failUnless(req_iface.providedBy(request))
+
+ def test_call_route_matches_doesnt_overwrite_subscriber_iface(self):
+ from repoze.bfg.interfaces import INewRequest
+ from zope.interface import alsoProvides
+ from zope.interface import Interface
+ req_iface = self._registerRouteRequest('foo')
+ class IFoo(Interface):
+ pass
+ def listener(event):
+ alsoProvides(event.request, IFoo)
+ self.registry.registerHandler(listener, (INewRequest,))
+ root = object()
+ def factory(request):
+ return root
+ self._connectRoute('archives/:action/:article', 'foo', factory)
+ context = DummyContext()
+ self._registerTraverserFactory(context)
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ view = DummyView(response)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ self._registerView(view, '', None, None)
+ rootfactory = self._registerRootFactory(context)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ request = view.request
+ self.assertEqual(request.view_name, '')
+ self.assertEqual(request.subpath, [])
+ self.assertEqual(request.context, context)
+ self.assertEqual(request.root, root)
+ routing_args = environ['wsgiorg.routing_args'][1]
+ self.assertEqual(routing_args['action'], 'action1')
+ self.assertEqual(routing_args['article'], 'article1')
+ self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
+ self.assertEqual(environ['bfg.routes.route'].name, 'foo')
+ self.assertEqual(request.matchdict, routing_args)
+ self.failUnless(req_iface.providedBy(request))
+ self.failUnless(IFoo.providedBy(request))
class TestMakeApp(unittest.TestCase):
def setUp(self):
@@ -430,42 +525,34 @@ class TestMakeApp(unittest.TestCase):
def _callFUT(self, *arg, **kw):
from repoze.bfg.router import make_app
- return make_app(None, *arg, **kw)
-
- def _get_make_registry(self, sm):
- class DummyMakeRegistry(object):
- def __call__(self, *arg):
- self.arg = arg
- return sm
- return DummyMakeRegistry()
+ return make_app(*arg, **kw)
def test_it(self):
from repoze.bfg.interfaces import IWSGIApplicationCreatedEvent
from repoze.bfg.tests import fixtureapp
from zope.component import getSiteManager
sm = getSiteManager()
- dummy_make_registry = self._get_make_registry(sm)
def subscriber(event):
event.app.created = True
manager = DummyRegistryManager()
sm.registerHandler(subscriber, (IWSGIApplicationCreatedEvent,))
rootfactory = DummyRootFactory(None)
settings = {'a':1}
- app = self._callFUT(rootfactory, fixtureapp, manager=manager,
- settings=settings,
- make_registry=dummy_make_registry)
+ app = self._callFUT(rootfactory, fixtureapp, settings=settings,
+ Configurator=DummyConfigurator, manager=manager)
self.failUnless(app.created)
self.failUnless(manager.pushed)
self.failUnless(manager.popped)
- self.assertEqual(len(dummy_make_registry.arg), 6)
- self.assertEqual(dummy_make_registry.arg[-1], settings)
+ self.assertEqual(app.registry.root_factory, rootfactory)
+ self.assertEqual(app.registry.settings, settings)
+ self.assertEqual(app.registry.package, fixtureapp)
+ self.assertEqual(app.registry.filename, 'configure.zcml')
def test_it_options_means_settings(self):
from repoze.bfg.interfaces import IWSGIApplicationCreatedEvent
- from zope.component import getSiteManager
from repoze.bfg.tests import fixtureapp
+ from zope.component import getSiteManager
sm = getSiteManager()
- dummy_make_registry = self._get_make_registry(sm)
def subscriber(event):
event.app.created = True
manager = DummyRegistryManager()
@@ -473,13 +560,14 @@ class TestMakeApp(unittest.TestCase):
rootfactory = DummyRootFactory(None)
settings = {'a':1}
app = self._callFUT(rootfactory, fixtureapp, options=settings,
- manager=manager,
- make_registry=dummy_make_registry)
+ Configurator=DummyConfigurator, manager=manager)
self.failUnless(app.created)
self.failUnless(manager.pushed)
self.failUnless(manager.popped)
- self.assertEqual(len(dummy_make_registry.arg), 6)
- self.assertEqual(dummy_make_registry.arg[-1], settings)
+ self.assertEqual(app.registry.root_factory, rootfactory)
+ self.assertEqual(app.registry.settings, settings)
+ self.assertEqual(app.registry.package, fixtureapp)
+ self.assertEqual(app.registry.filename, 'configure.zcml')
class DummyContext:
pass
@@ -551,3 +639,14 @@ class DummyRegistryManager:
def pop(self):
self.popped = True
+class DummyConfigurator(object):
+ def __init__(self, registry):
+ self.registry = registry
+
+ def default_configuration(self, root_factory=None, package=None,
+ filename=None, settings=None):
+ self.registry.root_factory = root_factory
+ self.registry.package = package
+ self.registry.filename = filename
+ self.registry.settings = settings
+
diff --git a/repoze/bfg/tests/test_threadlocal.py b/repoze/bfg/tests/test_threadlocal.py
index 09a25debd..b277d6cb9 100644
--- a/repoze/bfg/tests/test_threadlocal.py
+++ b/repoze/bfg/tests/test_threadlocal.py
@@ -1,12 +1,12 @@
-from repoze.bfg.testing import cleanUp
+from repoze.bfg import testing
import unittest
class TestThreadLocalManager(unittest.TestCase):
def setUp(self):
- cleanUp()
+ testing.setUp()
def tearDown(self):
- cleanUp()
+ testing.tearDown()
def _getTargetClass(self):
from repoze.bfg.threadlocal import ThreadLocalManager
@@ -67,24 +67,29 @@ class TestGetCurrentRequest(unittest.TestCase):
class GetCurrentRegistryTests(unittest.TestCase):
def setUp(self):
- cleanUp()
+ testing.setUp()
def tearDown(self):
- cleanUp()
+ testing.tearDown()
def _callFUT(self):
from repoze.bfg.threadlocal import get_current_registry
return get_current_registry()
- def test_local(self):
+ def test_it(self):
from repoze.bfg.threadlocal import manager
try:
manager.push({'registry':123})
self.assertEqual(self._callFUT(), 123)
finally:
manager.pop()
-
- def test_global(self):
+
+class GetCurrentRegistryWithoutTestingRegistry(unittest.TestCase):
+ def _callFUT(self):
+ from repoze.bfg.threadlocal import get_current_registry
+ return get_current_registry()
+
+ def test_it(self):
from zope.component import getGlobalSiteManager
self.assertEqual(self._callFUT(), getGlobalSiteManager())
diff --git a/repoze/bfg/tests/test_urldispatch.py b/repoze/bfg/tests/test_urldispatch.py
index ecc76fe04..18ebe3e98 100644
--- a/repoze/bfg/tests/test_urldispatch.py
+++ b/repoze/bfg/tests/test_urldispatch.py
@@ -35,7 +35,7 @@ class TestRoute(unittest.TestCase):
route = self._makeOne(':path')
self.assertEqual(route.generate({'path':'abc'}), '/abc')
-class RoutesRootFactoryTests(unittest.TestCase):
+class RoutesMapperTests(unittest.TestCase):
def setUp(self):
testing.setUp()
@@ -52,213 +52,92 @@ class RoutesRootFactoryTests(unittest.TestCase):
request.registry = sm
return request
- def _registerRouteRequest(self, name):
- from repoze.bfg.interfaces import IRouteRequest
- from zope.interface import Interface
- from zope.component import getSiteManager
- class IRequest(Interface):
- """ """
- sm = getSiteManager()
- sm.registerUtility(IRequest, IRouteRequest, name=name)
- return IRequest
-
def _getTargetClass(self):
- from repoze.bfg.urldispatch import RoutesRootFactory
- return RoutesRootFactory
+ from repoze.bfg.urldispatch import RoutesMapper
+ return RoutesMapper
- def _makeOne(self, get_root):
+ def _makeOne(self):
klass = self._getTargetClass()
- return klass(get_root)
-
- def test_init_default_root_factory(self):
- mapper = self._makeOne(None)
- self.assertEqual(mapper.default_root_factory, None)
+ return klass()
def test_no_route_matches(self):
- root_factory = DummyRootFactory(123)
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
request = self._getRequest(PATH_INFO='/')
result = mapper(request)
- self.assertEqual(result, 123)
-
- def test_passed_environ_returns_default(self):
- root_factory = DummyRootFactory(123)
- mapper = self._makeOne(root_factory)
- request = self._getRequest(PATH_INFO='/')
- result = mapper(request.environ)
- self.assertEqual(result, 123)
- self.assertEqual(root_factory.request, request.environ)
+ self.assertEqual(result['match'], None)
+ self.assertEqual(result['route'], None)
def test_route_matches(self):
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('foo')
- mapper = self._makeOne(root_factory)
- mapper.connect('archives/:action/:article', 'foo')
- request = self._getRequest(PATH_INFO='/archives/action1/article1')
- result = mapper(request)
- self.assertEqual(result, 123)
- environ = request.environ
- routing_args = environ['wsgiorg.routing_args'][1]
- self.assertEqual(routing_args['action'], 'action1')
- self.assertEqual(routing_args['article'], 'article1')
- self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
- self.assertEqual(environ['bfg.routes.route'].name, 'foo')
- self.assertEqual(request.matchdict, routing_args)
- self.failUnless(req_iface.providedBy(request))
-
- def test_route_matches_already_has_iface(self):
- from zope.interface import Interface
- from zope.interface import directlyProvides
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('foo')
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('archives/:action/:article', 'foo')
request = self._getRequest(PATH_INFO='/archives/action1/article1')
- class IFoo(Interface):
- pass
- directlyProvides(request, IFoo)
- result = mapper(request)
- self.assertEqual(result, 123)
- environ = request.environ
- routing_args = environ['wsgiorg.routing_args'][1]
- self.assertEqual(routing_args['action'], 'action1')
- self.assertEqual(routing_args['article'], 'article1')
- self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
- self.assertEqual(environ['bfg.routes.route'].name, 'foo')
- self.assertEqual(request.matchdict, routing_args)
- self.failUnless(req_iface.providedBy(request))
- self.failUnless(IFoo.providedBy(request))
-
- def test_route_matches_and_has_factory(self):
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('foo')
- mapper = self._makeOne(root_factory)
- factory = DummyRootFactory(456)
- mapper.connect('archives/:action/:article', 'foo', factory)
- request = self._getRequest(PATH_INFO='/archives/action1/article1')
result = mapper(request)
- self.assertEqual(result, 456)
- self.assertEqual(factory.request, request)
- environ = request.environ
- routing_args = environ['wsgiorg.routing_args'][1]
- self.assertEqual(routing_args['action'], 'action1')
- self.assertEqual(routing_args['article'], 'article1')
- self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
- self.assertEqual(environ['bfg.routes.route'].name, 'foo')
- self.assertEqual(request.matchdict, routing_args)
- self.failUnless(req_iface.providedBy(request))
+ self.assertEqual(result['route'], mapper.routes['foo'])
+ self.assertEqual(result['match']['action'], 'action1')
+ self.assertEqual(result['match']['article'], 'article1')
def test_route_matches_with_predicates(self):
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('foo')
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('archives/:action/:article', 'foo',
predicates=[lambda *arg: True])
request = self._getRequest(PATH_INFO='/archives/action1/article1')
result = mapper(request)
- self.assertEqual(result, 123)
- environ = request.environ
- routing_args = environ['wsgiorg.routing_args'][1]
- self.assertEqual(routing_args['action'], 'action1')
- self.assertEqual(routing_args['article'], 'article1')
- self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
- self.assertEqual(environ['bfg.routes.route'].name, 'foo')
- self.assertEqual(request.matchdict, routing_args)
- self.failUnless(req_iface.providedBy(request))
+ self.assertEqual(result['route'], mapper.routes['foo'])
+ self.assertEqual(result['match']['action'], 'action1')
+ self.assertEqual(result['match']['article'], 'article1')
def test_route_fails_to_match_with_predicates(self):
- root_factory = DummyRootFactory(123)
- foo_iface = self._registerRouteRequest('foo')
- bar_iface = self._registerRouteRequest('bar')
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('archives/:action/article1', 'foo',
predicates=[lambda *arg: True, lambda *arg: False])
mapper.connect('archives/:action/:article', 'bar')
request = self._getRequest(PATH_INFO='/archives/action1/article1')
result = mapper(request)
- self.assertEqual(result, 123)
- environ = request.environ
- routing_args = environ['wsgiorg.routing_args'][1]
- self.assertEqual(routing_args['action'], 'action1')
- self.assertEqual(routing_args['article'], 'article1')
- self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
- self.assertEqual(environ['bfg.routes.route'].name, 'bar')
- self.assertEqual(request.matchdict, routing_args)
- self.failUnless(bar_iface.providedBy(request))
- self.failIf(foo_iface.providedBy(request))
+ self.assertEqual(result['route'], mapper.routes['bar'])
+ self.assertEqual(result['match']['action'], 'action1')
+ self.assertEqual(result['match']['article'], 'article1')
def test_root_route_matches(self):
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('root')
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('', 'root')
request = self._getRequest(PATH_INFO='/')
result = mapper(request)
- environ = request.environ
- self.assertEqual(result, 123)
- self.assertEqual(environ['bfg.routes.route'].name, 'root')
- self.assertEqual(environ['bfg.routes.matchdict'], {})
- self.assertEqual(environ['wsgiorg.routing_args'], ((), {}))
- self.assertEqual(request.matchdict, {})
- self.failUnless(req_iface.providedBy(request))
+ self.assertEqual(result['route'], mapper.routes['root'])
+ self.assertEqual(result['match'], {})
def test_root_route_matches2(self):
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('root')
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('/', 'root')
request = self._getRequest(PATH_INFO='/')
result = mapper(request)
- environ = request.environ
- self.assertEqual(result, 123)
- self.assertEqual(environ['bfg.routes.route'].name, 'root')
- self.assertEqual(environ['bfg.routes.matchdict'], {})
- self.assertEqual(environ['wsgiorg.routing_args'], ((), {}))
- self.assertEqual(request.matchdict, {})
- self.failUnless(req_iface.providedBy(request))
+ self.assertEqual(result['route'], mapper.routes['root'])
+ self.assertEqual(result['match'], {})
def test_root_route_when_path_info_empty(self):
- root_factory = DummyRootFactory(123)
- req_iface = self._registerRouteRequest('root')
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('/', 'root')
request = self._getRequest(PATH_INFO='')
result = mapper(request)
- environ = request.environ
- self.assertEqual(result, 123)
- self.assertEqual(environ['bfg.routes.route'].name, 'root')
- self.assertEqual(environ['bfg.routes.matchdict'], {})
- self.assertEqual(environ['wsgiorg.routing_args'], ((), {}))
- self.assertEqual(request.matchdict, {})
- self.failUnless(req_iface.providedBy(request))
-
- def test_fallback_to_default_root_factory(self):
- root_factory = DummyRootFactory(123)
- mapper = self._makeOne(root_factory)
- mapper.connect('wont/:be/:found', 'wont')
- request = self._getRequest(PATH_INFO='/archives/action1/article1')
- result = mapper(request)
- self.assertEqual(result, 123)
- self.assertEqual(root_factory.request, request)
+ self.assertEqual(result['route'], mapper.routes['root'])
+ self.assertEqual(result['match'], {})
def test_no_path_info(self):
- root_factory = DummyRootFactory(123)
- mapper = self._makeOne(root_factory)
+ mapper = self._makeOne()
mapper.connect('/', 'root')
request = self._getRequest()
result = mapper(request)
- self.assertEqual(result, 123)
- self.assertEqual(root_factory.request, request)
+ self.assertEqual(result['route'], mapper.routes['root'])
+ self.assertEqual(result['match'], {})
def test_has_routes(self):
- mapper = self._makeOne(None)
+ mapper = self._makeOne()
self.assertEqual(mapper.has_routes(), False)
mapper.connect('whatever', 'archives/:action/:article')
self.assertEqual(mapper.has_routes(), True)
def test_get_routes(self):
from repoze.bfg.urldispatch import Route
- mapper = self._makeOne(None)
+ mapper = self._makeOne()
self.assertEqual(mapper.get_routes(), [])
mapper.connect('whatever', 'archives/:action/:article')
routes = mapper.get_routes()
@@ -266,7 +145,7 @@ class RoutesRootFactoryTests(unittest.TestCase):
self.assertEqual(routes[0].__class__, Route)
def test_generate(self):
- mapper = self._makeOne(None)
+ mapper = self._makeOne()
def generator(kw):
return 123
route = DummyRoute(generator)
diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py
index 1a7275932..af2b64de6 100644
--- a/repoze/bfg/tests/test_zcml.py
+++ b/repoze/bfg/tests/test_zcml.py
@@ -2581,53 +2581,62 @@ class TestResourceDirective(unittest.TestCase):
def test_no_colons(self):
from zope.component import getSiteManager
+ from repoze.bfg.configuration import Configurator
context = DummyContext()
self._callFUT(context, 'a', 'b')
actions = context.actions
self.assertEqual(len(actions), 1)
action = actions[0]
sm = getSiteManager()
- self.assertEqual(action['callable'], sm.resource)
+ self.assertEqual(action['callable'].im_func,
+ Configurator.resource.im_func)
self.assertEqual(action['discriminator'], None)
self.assertEqual(action['args'], ('a', 'b', None))
def test_with_colons(self):
from zope.component import getSiteManager
+ from repoze.bfg.configuration import Configurator
context = DummyContext()
self._callFUT(context, 'a:foo.pt', 'b:foo.pt')
actions = context.actions
self.assertEqual(len(actions), 1)
action = actions[0]
sm = getSiteManager()
- self.assertEqual(action['callable'], sm.resource)
+ self.assertEqual(action['callable'].im_func,
+ Configurator.resource.im_func)
self.assertEqual(action['discriminator'], None)
self.assertEqual(action['args'], ('a:foo.pt', 'b:foo.pt', None))
def test_override_module_with_directory(self):
from zope.component import getSiteManager
+ from repoze.bfg.configuration import Configurator
context = DummyContext()
self._callFUT(context, 'a', 'b:foo/')
actions = context.actions
self.assertEqual(len(actions), 1)
action = actions[0]
sm = getSiteManager()
- self.assertEqual(action['callable'], sm.resource)
+ self.assertEqual(action['callable'].im_func,
+ Configurator.resource.im_func)
self.assertEqual(action['discriminator'], None)
self.assertEqual(action['args'], ('a', 'b:foo/', None))
def test_override_directory_with_module(self):
from zope.component import getSiteManager
+ from repoze.bfg.configuration import Configurator
context = DummyContext()
self._callFUT(context, 'a:foo/', 'b')
actions = context.actions
self.assertEqual(len(actions), 1)
action = actions[0]
sm = getSiteManager()
- self.assertEqual(action['callable'], sm.resource)
+ self.assertEqual(action['callable'].im_func,
+ Configurator.resource.im_func)
self.assertEqual(action['discriminator'], None)
self.assertEqual(action['args'], ('a:foo/', 'b', None))
def test_override_module_with_module(self):
+ from repoze.bfg.configuration import Configurator
from zope.component import getSiteManager
context = DummyContext()
self._callFUT(context, 'a', 'b')
@@ -2635,7 +2644,8 @@ class TestResourceDirective(unittest.TestCase):
self.assertEqual(len(actions), 1)
action = actions[0]
sm = getSiteManager()
- self.assertEqual(action['callable'], sm.resource)
+ self.assertEqual(action['callable'].im_func,
+ Configurator.resource.im_func)
self.assertEqual(action['discriminator'], None)
self.assertEqual(action['args'], ('a', 'b', None))
@@ -2697,7 +2707,7 @@ class TestZCMLScanDirective(unittest.TestCase):
return scan(context, package, martian)
def test_it(self):
- from repoze.bfg.registry import BFGMultiGrokker
+ from repoze.bfg.configuration import BFGMultiGrokker
martian = DummyMartianModule()
module_grokker = DummyModuleGrokker()
dummy_module = DummyModule()
@@ -2720,11 +2730,11 @@ class DummyModuleGrokker:
self.multi_grokker = grokker
class DummyMartianModule:
- def grok_dotted_name(self, name, grokker, _info, _registry,
+ def grok_dotted_name(self, name, grokker, _info, _configurator,
exclude_filter=None):
self.name = name
self.info = _info
- self.registry = _registry
+ self.configurator = _configurator
self.exclude_filter = exclude_filter
return True
diff --git a/repoze/bfg/threadlocal.py b/repoze/bfg/threadlocal.py
index 4fa53fa1e..5284b743f 100644
--- a/repoze/bfg/threadlocal.py
+++ b/repoze/bfg/threadlocal.py
@@ -31,8 +31,8 @@ class ThreadLocalManager(threading.local):
self.stack[:] = []
def defaults():
- gsm = getGlobalSiteManager()
- return {'request':None, 'registry':gsm}
+ reg = getGlobalSiteManager()
+ return {'request':None, 'registry':reg}
manager = ThreadLocalManager(default=defaults)
diff --git a/repoze/bfg/urldispatch.py b/repoze/bfg/urldispatch.py
index 7b0d4dbb7..7fff130ef 100644
--- a/repoze/bfg/urldispatch.py
+++ b/repoze/bfg/urldispatch.py
@@ -1,10 +1,6 @@
import re
from urllib import unquote
-from zope.interface import alsoProvides
-
-from repoze.bfg.interfaces import IRouteRequest
-
from repoze.bfg.compat import all
from repoze.bfg.encode import url_quote
from repoze.bfg.traversal import traversal_path
@@ -20,9 +16,8 @@ class Route(object):
self.factory = factory
self.predicates = predicates
-class RoutesRootFactory(object):
- def __init__(self, default_root_factory):
- self.default_root_factory = default_root_factory
+class RoutesMapper(object):
+ def __init__(self):
self.routelist = []
self.routes = {}
@@ -42,34 +37,13 @@ class RoutesRootFactory(object):
return self.routes[name].generate(kw)
def __call__(self, request):
+ environ = request.environ
+ registry = request.registry
try:
- # As of BFG 1.1a9, a root factory is now typically called
- # with a request object (instead of a WSGI environ, as in
- # previous versions) by the router. Simultaneously, as of
- # 1.1a9, the RoutesRootFactory *requires* that the object
- # passed to it be a request, instead of an environ, as it
- # uses both the ``registry`` attribute of the request, and
- # if a route is found, it decorates the object with an
- # interface using alsoProvides. However, existing app
- # code "in the wild" calls the root factory explicitly
- # with a dictionary argument (e.g. a subscriber to
- # WSGIApplicationCreatedEvent does
- # ``app.root_factory({})``). It makes no sense for such
- # code to depend on the side effects of a
- # RoutesRootFactory, for bw compat purposes we catch the
- # exception that will be raised when passed a dictionary
- # and just return the result of the default root factory.
- environ = request.environ
- registry = request.registry
- except AttributeError:
- return self.default_root_factory(request)
-
- try:
- path = environ['PATH_INFO']
+ # empty if mounted under a path in mod_wsgi, for example
+ path = environ['PATH_INFO'] or '/'
except KeyError:
path = '/'
- if not path: # empty if mounted under a path in mod_wsgi, for example
- path = '/'
for route in self.routelist:
match = route.match(path)
@@ -77,17 +51,9 @@ class RoutesRootFactory(object):
preds = route.predicates
if preds and not all((p(None, request) for p in preds)):
continue
- environ['wsgiorg.routing_args'] = ((), match)
- environ['bfg.routes.route'] = route
- environ['bfg.routes.matchdict'] = match
- request.matchdict = match
- iface = registry.queryUtility(IRouteRequest, name=route.name)
- if iface is not None:
- alsoProvides(request, iface)
- factory = route.factory or self.default_root_factory
- return factory(request)
-
- return self.default_root_factory(request)
+ return {'route':route, 'match':match}
+
+ return {'route':None, 'match':None}
# stolen from bobo and modified
route_re = re.compile(r'(/:[a-zA-Z]\w*)')
diff --git a/repoze/bfg/view.py b/repoze/bfg/view.py
index 4fc22000b..47f752274 100644
--- a/repoze/bfg/view.py
+++ b/repoze/bfg/view.py
@@ -504,41 +504,50 @@ def append_slash_notfound_view(context, request):
def derive_view(original_view, permission=None, predicates=(), attr=None,
renderer_name=None, wrapper_viewname=None, viewname=None):
- sm = getSiteManager()
- return sm.derive_view(original_view, permission=permission,
- predicates=predicates, attr=attr,
- renderer_name=renderer_name,
- wrapper_viewname=wrapper_viewname, viewname=viewname)
-
-def rendered_response(renderer, response, view, context, request,
- renderer_name):
- sm = getSiteManager()
- return sm.rendered_response(renderer, response, view, context, request,
- renderer_name)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return config.derive_view(original_view, permission=permission,
+ predicates=predicates, attr=attr,
+ renderer_name=renderer_name,
+ wrapper_viewname=wrapper_viewname,
+ viewname=viewname)
def renderer_from_name(self, path):
- sm = getSiteManager()
- return sm.renderer_from_name(path)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return config.renderer_from_name(path)
def map_view(view, attr=None, renderer_name=None):
- sm = getSiteManager()
- return sm.map_view(view, attr=attr, renderer_name=renderer_name)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return reg.map_view(view, attr=attr, renderer_name=renderer_name)
def owrap_view(view, viewname, wrapper_viewname):
- sm = getSiteManager()
- return sm.owrap_view(view, viewname, wrapper_viewname)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return config.owrap_view(view, viewname, wrapper_viewname)
def predicate_wrap(view, predicates):
- sm = getSiteManager()
- return sm.predicate_wrap(view, predicates)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return reg.predicate_wrap(view, predicates)
def secure_view(view, permission):
- sm = getSiteManager()
- return sm.secure_view(view, permission)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return config.secure_view(view, permission)
def authdebug_view(self, view, permission):
- sm = getSiteManager()
- return sm.authdebug_view(view, permission)
+ reg = getSiteManager()
+ from repoze.bfg.configuration import Configurator
+ config = Configurator(reg)
+ return config.authdebug_view(view, permission)
def requestonly(class_or_callable, attr=None):
""" Return true of the class or callable accepts only a request argument,
@@ -645,6 +654,34 @@ def decorate_view(wrapped_view, original_view):
return True
return False
+def rendered_response(renderer, response, view, context,request,
+ renderer_name):
+ if ( hasattr(response, 'app_iter') and hasattr(response, 'headerlist')
+ and hasattr(response, 'status') ):
+ return response
+ result = renderer(response, {'view':view, 'renderer_name':renderer_name,
+ 'context':context, 'request':request})
+ response_factory = queryUtility(IResponseFactory, default=Response)
+ response = response_factory(result)
+ attrs = request.__dict__
+ content_type = attrs.get('response_content_type', None)
+ if content_type is not None:
+ response.content_type = content_type
+ headerlist = attrs.get('response_headerlist', None)
+ if headerlist is not None:
+ for k, v in headerlist:
+ response.headers.add(k, v)
+ status = attrs.get('response_status', None)
+ if status is not None:
+ response.status = status
+ charset = attrs.get('response_charset', None)
+ if charset is not None:
+ response.charset = charset
+ cache_for = attrs.get('response_cache_for', None)
+ if cache_for is not None:
+ response.cache_expires = cache_for
+ return response
+
diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py
index 7bfa1a916..a3d136a09 100644
--- a/repoze/bfg/zcml.py
+++ b/repoze/bfg/zcml.py
@@ -27,10 +27,12 @@ from repoze.bfg.authentication import RemoteUserAuthenticationPolicy
from repoze.bfg.authentication import AuthTktAuthenticationPolicy
from repoze.bfg.authorization import ACLAuthorizationPolicy
from repoze.bfg.configuration import zcml_configure
+from repoze.bfg.configuration import Configurator
from repoze.bfg.path import package_name
from repoze.bfg.request import route_request_iface
from repoze.bfg.resource import resource_spec
from repoze.bfg.static import StaticRootFactory
+from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.view import static as static_view
###################### directives ##########################
@@ -162,7 +164,7 @@ def view(
raise ConfigurationError('"view" attribute was not specified and '
'no renderer specified')
- sm = getSiteManager()
+ reg = get_current_registry()
if request_type in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE'):
# b/w compat for 1.0
@@ -173,10 +175,11 @@ def view(
if route_name is None:
request_type = IRequest
else:
- request_type = queryUtility(IRouteRequest, name=route_name)
+ request_type = reg.queryUtility(IRouteRequest, name=route_name)
if request_type is None:
request_type = route_request_iface(route_name)
- sm.registerUtility(request_type, IRouteRequest, name=route_name)
+ reg.registerUtility(request_type, IRouteRequest,
+ name=route_name)
if isinstance(request_type, basestring):
request_type = _context.resolve(request_type)
@@ -185,12 +188,14 @@ def view(
renderer = resource_spec(renderer, package_name(_context.resolve('.')))
def register():
- sm.view(permission=permission, for_=for_, view=view, name=name,
- request_type=request_type, route_name=route_name,
- request_method=request_method, request_param=request_param,
- containment=containment, attr=attr, renderer=renderer,
- wrapper=wrapper, xhr=xhr, accept=accept, header=header,
- path_info=path_info, _info=_context.info)
+ config = Configurator(reg)
+ config.view(
+ permission=permission, for_=for_, view=view, name=name,
+ request_type=request_type, route_name=route_name,
+ request_method=request_method, request_param=request_param,
+ containment=containment, attr=attr, renderer=renderer,
+ wrapper=wrapper, xhr=xhr, accept=accept, header=header,
+ path_info=path_info, _info=_context.info)
_context.action(
discriminator = ('view', for_, name, request_type, IView, containment,
@@ -258,7 +263,7 @@ def route(_context, name, path, view=None, view_for=None,
# compatibility purposes.
# these are route predicates; if they do not match, the next route
# in the routelist will be tried
- sm = getSiteManager()
+ reg = get_current_registry()
if request_type in ('GET', 'HEAD', 'PUT', 'POST', 'DELETE'):
# b/w compat for 1.0
@@ -268,7 +273,7 @@ def route(_context, name, path, view=None, view_for=None,
request_iface = queryUtility(IRouteRequest, name=name)
if request_iface is None:
request_iface = route_request_iface(name)
- sm.registerUtility(request_iface, IRouteRequest, name=name)
+ reg.registerUtility(request_iface, IRouteRequest, name=name)
if view:
view_for = view_for or for_
@@ -295,18 +300,19 @@ def route(_context, name, path, view=None, view_for=None,
)
def register():
- sm.route(name, path, factory=factory, header=header,
- xhr=xhr, accept=accept, path_info=path_info,
- request_method=request_method, request_param=request_param,
- _info=_context.info)
-
+ config = Configurator(reg)
+ config.route(
+ name, path, factory=factory, header=header,
+ xhr=xhr, accept=accept, path_info=path_info,
+ request_method=request_method, request_param=request_param,
+ _info=_context.info)
+
_context.action(
discriminator = ('route', name, xhr, request_method, path_info,
request_param, header, accept),
callable = register,
)
-
class ISystemViewDirective(Interface):
view = GlobalObject(
title=u"",
@@ -348,8 +354,9 @@ def view_utility(_context, view, attr, renderer, wrapper, iface):
renderer = resource_spec(renderer, package_name(_context.resolve('.')))
def register():
- sm = getSiteManager()
- sm.view_utility(view, attr, renderer, wrapper, iface, _context.info)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.view_utility(view, attr, renderer, wrapper, iface, _context.info)
_context.action(
discriminator = iface,
@@ -396,11 +403,12 @@ def resource(_context, to_override, override_with):
'A file cannot be overridden with a directory (put a slash '
'at the end of to_override if necessary)')
- sm = getSiteManager()
+ reg = get_current_registry()
+ config = Configurator(reg)
_context.action(
discriminator = None,
- callable = sm.resource,
+ callable = config.resource,
args = (to_override, override_with, _context.info),
)
@@ -415,8 +423,9 @@ def repozewho1authenticationpolicy(_context, identifier_name='auth_tkt',
callback=callback)
# authentication policies must be registered eagerly so they can
# be found by the view registration machinery
- sm = getSiteManager()
- sm.authentication_policy(policy, _info=_context.info)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.authentication_policy(policy, _info=_context.info)
_context.action(discriminator=IAuthenticationPolicy)
class IRemoteUserAuthenticationPolicyDirective(Interface):
@@ -430,8 +439,9 @@ def remoteuserauthenticationpolicy(_context, environ_key='REMOTE_USER',
callback=callback)
# authentication policies must be registered eagerly so they can
# be found by the view registration machinery
- sm = getSiteManager()
- sm.authentication_policy(policy, _info=_context.info)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.authentication_policy(policy, _info=_context.info)
_context.action(discriminator=IAuthenticationPolicy)
class IAuthTktAuthenticationPolicyDirective(Interface):
@@ -467,8 +477,9 @@ def authtktauthenticationpolicy(_context,
raise ConfigurationError(str(why))
# authentication policies must be registered eagerly so they can
# be found by the view registration machinery
- sm = getSiteManager()
- sm.authentication_policy(policy, _info=_context.info)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.authentication_policy(policy, _info=_context.info)
_context.action(discriminator=IAuthenticationPolicy)
class IACLAuthorizationPolicyDirective(Interface):
@@ -478,8 +489,9 @@ def aclauthorizationpolicy(_context):
policy = ACLAuthorizationPolicy()
# authorization policies must be registered eagerly so they can be
# found by the view registration machinery
- sm = getSiteManager()
- sm.authorization_policy(policy, _info=_context.info)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.authorization_policy(policy, _info=_context.info)
_context.action(discriminator=IAuthorizationPolicy)
class IRendererDirective(Interface):
@@ -494,8 +506,9 @@ class IRendererDirective(Interface):
def renderer(_context, factory, name=''):
# renderer factories must be registered eagerly so they can be
# found by the view machinery
- sm = getSiteManager()
- sm.renderer(factory, name, _info=_context.info)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.renderer(factory, name, _info=_context.info)
_context.action(discriminator=(IRendererFactory, name))
class IStaticDirective(Interface):
@@ -534,8 +547,9 @@ class IScanDirective(Interface):
def scan(_context, package, martian=martian):
# martian overrideable only for unit tests
def register():
- sm = getSiteManager()
- sm.scan(package, _info=_context.info, martian=martian)
+ reg = get_current_registry()
+ config = Configurator(reg)
+ config.scan(package, _info=_context.info, martian=martian)
_context.action(discriminator=None, callable=register)
class Uncacheable(object):