summaryrefslogtreecommitdiff
path: root/repoze
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2010-04-14 02:49:19 +0000
committerChris McDonough <chrism@agendaless.com>2010-04-14 02:49:19 +0000
commitff1213e8f2aed987108ba57aed517c033491b1aa (patch)
treef531544c3373ae7d5b51746987cb373326277a9c /repoze
parent2b6bc8adfa294f7133680f64df411251afb67dfc (diff)
downloadpyramid-ff1213e8f2aed987108ba57aed517c033491b1aa.tar.gz
pyramid-ff1213e8f2aed987108ba57aed517c033491b1aa.tar.bz2
pyramid-ff1213e8f2aed987108ba57aed517c033491b1aa.zip
Add "exception views" work contributed primarily by Andrey Popp by merging the "phash" branch.
Diffstat (limited to 'repoze')
-rw-r--r--repoze/bfg/compat/__init__.py6
-rw-r--r--repoze/bfg/configuration.py324
-rw-r--r--repoze/bfg/interfaces.py13
-rw-r--r--repoze/bfg/request.py6
-rw-r--r--repoze/bfg/router.py41
-rw-r--r--repoze/bfg/security.py3
-rw-r--r--repoze/bfg/testing.py2
-rw-r--r--repoze/bfg/tests/exceptionviewapp/__init__.py1
-rw-r--r--repoze/bfg/tests/exceptionviewapp/configure.zcml44
-rw-r--r--repoze/bfg/tests/exceptionviewapp/models.py18
-rw-r--r--repoze/bfg/tests/exceptionviewapp/views.py17
-rw-r--r--repoze/bfg/tests/fixtureapp/configure.zcml15
-rw-r--r--repoze/bfg/tests/fixtureapp/views.py14
-rw-r--r--repoze/bfg/tests/hybridapp/configure.zcml56
-rw-r--r--repoze/bfg/tests/hybridapp/views.py22
-rw-r--r--repoze/bfg/tests/test_configuration.py743
-rw-r--r--repoze/bfg/tests/test_integration.py59
-rw-r--r--repoze/bfg/tests/test_request.py2
-rw-r--r--repoze/bfg/tests/test_router.py578
-rw-r--r--repoze/bfg/tests/test_security.py3
-rw-r--r--repoze/bfg/tests/test_view.py3
-rw-r--r--repoze/bfg/tests/test_zcml.py193
-rw-r--r--repoze/bfg/view.py3
-rw-r--r--repoze/bfg/zcml.py65
24 files changed, 1774 insertions, 457 deletions
diff --git a/repoze/bfg/compat/__init__.py b/repoze/bfg/compat/__init__.py
index c83bc4aec..b3585eac2 100644
--- a/repoze/bfg/compat/__init__.py
+++ b/repoze/bfg/compat/__init__.py
@@ -140,3 +140,9 @@ try:
except ImportError: #pragma: no cover
from pkgutil_26 import walk_packages
+try:
+ from hashlib import md5
+except ImportError: # pragma: no cover
+ import md5
+ md5 = md5.new
+
diff --git a/repoze/bfg/configuration.py b/repoze/bfg/configuration.py
index 025c6e048..3d1c0214d 100644
--- a/repoze/bfg/configuration.py
+++ b/repoze/bfg/configuration.py
@@ -2,6 +2,7 @@ import os
import re
import sys
import threading
+import types
import inspect
from webob import Response
@@ -17,9 +18,8 @@ from repoze.bfg.interfaces import IAuthenticationPolicy
from repoze.bfg.interfaces import IAuthorizationPolicy
from repoze.bfg.interfaces import IDebugLogger
from repoze.bfg.interfaces import IDefaultRootFactory
-from repoze.bfg.interfaces import IForbiddenView
+from repoze.bfg.interfaces import IExceptionViewClassifier
from repoze.bfg.interfaces import IMultiView
-from repoze.bfg.interfaces import INotFoundView
from repoze.bfg.interfaces import IPackageOverrides
from repoze.bfg.interfaces import IRendererFactory
from repoze.bfg.interfaces import IRequest
@@ -32,12 +32,14 @@ from repoze.bfg.interfaces import ISettings
from repoze.bfg.interfaces import ITemplateRenderer
from repoze.bfg.interfaces import ITraverser
from repoze.bfg.interfaces import IView
+from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg import chameleon_text
from repoze.bfg import chameleon_zpt
from repoze.bfg import renderers
from repoze.bfg.authorization import ACLAuthorizationPolicy
from repoze.bfg.compat import all
+from repoze.bfg.compat import md5
from repoze.bfg.compat import walk_packages
from repoze.bfg.events import WSGIApplicationCreatedEvent
from repoze.bfg.exceptions import Forbidden
@@ -59,8 +61,11 @@ from repoze.bfg.traversal import find_interface
from repoze.bfg.urldispatch import RoutesMapper
from repoze.bfg.view import render_view_to_response
from repoze.bfg.view import static
+from repoze.bfg.view import default_notfound_view
+from repoze.bfg.view import default_forbidden_view
-MAX_WEIGHT = 10000
+MAX_ORDER = 1 << 30
+DEFAULT_PHASH = md5().hexdigest()
DEFAULT_RENDERERS = (
('.pt', chameleon_zpt.renderer_factory),
@@ -206,7 +211,8 @@ class Configurator(object):
def _derive_view(self, view, permission=None, predicates=(),
attr=None, renderer_name=None, wrapper_viewname=None,
- viewname=None, accept=None, score=MAX_WEIGHT):
+ viewname=None, accept=None, order=MAX_ORDER,
+ phash=DEFAULT_PHASH):
renderer = self._renderer_from_name(renderer_name)
authn_policy = self.registry.queryUtility(IAuthenticationPolicy)
authz_policy = self.registry.queryUtility(IAuthorizationPolicy)
@@ -220,7 +226,7 @@ class Configurator(object):
authn_policy, authz_policy, settings,
logger)
predicated_view = _predicate_wrap(debug_view, predicates)
- derived_view = _attr_wrap(predicated_view, accept, score)
+ derived_view = _attr_wrap(predicated_view, accept, order, phash)
return derived_view
def _override(self, package, path, override_package, override_prefix,
@@ -235,23 +241,6 @@ class Configurator(object):
name=pkg_name, info=_info)
override.insert(path, override_pkg_name, override_prefix)
-
- def _system_view(self, iface, view=None, attr=None, renderer=None,
- wrapper=None, _info=u''):
- if not view:
- if renderer:
- def view(context, request):
- return {}
- else:
- raise ConfigurationError(
- '"view" argument was not specified and no renderer '
- 'specified')
-
- derived_view = self._derive_view(view, attr=attr,
- renderer_name=renderer,
- wrapper_viewname=wrapper)
- self.registry.registerUtility(derived_view, iface, '', info=_info)
-
def _set_security_policies(self, authentication, authorization=None):
if authorization is None:
authorization = ACLAuthorizationPolicy() # default
@@ -312,6 +301,8 @@ class Configurator(object):
authorization_policy)
for name, renderer in renderers:
self.add_renderer(name, renderer)
+ self.set_notfound_view(default_notfound_view)
+ self.set_forbidden_view(default_forbidden_view)
# getSiteManager is a unit testing dep injection
def hook_zca(self, getSiteManager=None):
@@ -723,14 +714,15 @@ class Configurator(object):
view_info.append(info)
return
- score, predicates = _make_predicates(
+ order, predicates, phash = _make_predicates(
xhr=xhr, request_method=request_method, path_info=path_info,
request_param=request_param, header=header, accept=accept,
containment=containment, request_type=request_type,
custom=custom_predicates)
derived_view = self._derive_view(view, permission, predicates, attr,
- renderer, wrapper, name, accept, score)
+ renderer, wrapper, name, accept, order,
+ phash)
if context is None:
context = for_
@@ -764,41 +756,76 @@ class Configurator(object):
old_view = None
for view_type in (IView, ISecuredView, IMultiView):
- old_view = registered((request_iface, r_context), view_type, name)
+ old_view = registered((IViewClassifier, request_iface, r_context),
+ view_type, name)
if old_view is not None:
break
-
- if old_view is None:
- # No component was registered for any of our I*View
- # interfaces exactly; this is the first view for this
- # triad. We don't need a multiview.
- if hasattr(derived_view, '__call_permissive__'):
+
+ is_exception_view = isexception(context)
+
+ def regclosure():
+ if hasattr(view, '__call_permissive__'):
view_iface = ISecuredView
else:
view_iface = IView
- self.registry.registerAdapter(derived_view,
- (request_iface, context),
- view_iface, name, info=_info)
+ self.registry.registerAdapter(
+ derived_view, (IViewClassifier, request_iface, context),
+ view_iface, name, info=_info)
+ if is_exception_view:
+ self.registry.registerAdapter(
+ derived_view,
+ (IExceptionViewClassifier, request_iface, context),
+ view_iface, name, info=_info)
+
+ is_multiview = IMultiView.providedBy(old_view)
+ old_phash = getattr(old_view, '__phash__', DEFAULT_PHASH)
+
+ if old_view is None:
+ # - No component was yet registered for any of our I*View
+ # interfaces exactly; this is the first view for this
+ # triad.
+ regclosure()
+
+ elif (not is_multiview) and (old_phash == phash):
+ # - A single view component was previously registered with
+ # the same predicate hash as this view; this registration
+ # is therefore an override.
+ regclosure()
+
else:
+ # - A view or multiview was already registered for this
+ # triad, and the new view is not an override.
+
# XXX we could try to be more efficient here and register
# a non-secured view for a multiview if none of the
# multiview's consituent views have a permission
# associated with them, but this code is getting pretty
# rough already
- if IMultiView.providedBy(old_view):
+ if is_multiview:
multiview = old_view
else:
multiview = MultiView(name)
old_accept = getattr(old_view, '__accept__', None)
- old_score = getattr(old_view, '__score__', MAX_WEIGHT)
- multiview.add(old_view, old_score, old_accept)
- multiview.add(derived_view, score, accept)
+ old_order = getattr(old_view, '__order__', MAX_ORDER)
+ multiview.add(old_view, old_order, old_accept, old_phash)
+ multiview.add(derived_view, order, accept, phash)
for view_type in (IView, ISecuredView):
# unregister any existing views
self.registry.adapters.unregister(
- (request_iface, r_context), view_type, name=name)
- self.registry.registerAdapter(multiview, (request_iface, context),
- IMultiView, name, info=_info)
+ (IViewClassifier, request_iface, r_context),
+ view_type, name=name)
+ if is_exception_view:
+ self.registry.adapters.unregister(
+ (IExceptionViewClassifier, request_iface, r_context),
+ view_type, name=name)
+ self.registry.registerAdapter(
+ multiview, (IViewClassifier, request_iface, context),
+ IMultiView, name, info=_info)
+ if is_exception_view:
+ self.registry.registerAdapter(
+ multiview,
+ (IExceptionViewClassifier, request_iface, context),
+ IMultiView, name, info=_info)
def add_route(self,
name,
@@ -1030,13 +1057,15 @@ class Configurator(object):
"""
# these are route predicates; if they do not match, the next route
# in the routelist will be tried
- _, predicates = _make_predicates(xhr=xhr,
- request_method=request_method,
- path_info=path_info,
- request_param=request_param,
- header=header,
- accept=accept,
- custom=custom_predicates)
+ ignored, predicates, ignored = _make_predicates(
+ xhr=xhr,
+ request_method=request_method,
+ path_info=path_info,
+ request_param=request_param,
+ header=header,
+ accept=accept,
+ custom=custom_predicates
+ )
request_iface = self.registry.queryUtility(IRouteRequest, name=name)
@@ -1172,10 +1201,32 @@ class Configurator(object):
override(package, path, override_package, override_prefix,
_info=_info)
- def set_forbidden_view(self, *arg, **kw):
+ def set_forbidden_view(self, view=None, attr=None, renderer=None,
+ wrapper=None, _info=u''):
""" Add a default forbidden view to the current configuration
state.
+ .. warning:: This method has been deprecated in
+ :mod:`repoze.bfg` 1.3. *Do not use it for new development;
+ it should only be used to support older code bases which
+ depend upon it.* See :ref:`changing_the_forbidden_view` to
+ see how a forbidden view should be registered in new
+ projects.
+
+ ..note:: For backwards compatibility with :mod:`repoze.bfg`
+ 1.2, unlike an 'exception view' as described in
+ :ref:`exception_views`, a ``context, request`` view
+ callable registered using this API should not expect to
+ receive the exception as its first (``context``) argument.
+ Instead it should expect to receive the 'real' context as
+ found via context-finding or ``None`` if no context could
+ be found. The exception causing the registered view to be
+ called is however still available as ``request.exception``.
+ .. warning:: This method has been deprecated in
+ :mod:`repoze.bfg` 1.3. See
+ :ref:`changing_the_forbidden_view` to see how a not found
+ view should be registered in :mod:`repoze.bfg` 1.3+.
+
The ``view`` argument should be a :term:`view callable`.
The ``attr`` argument should be the attribute of the view
@@ -1191,16 +1242,36 @@ class Configurator(object):
The ``wrapper`` argument should be the name of another view
which will wrap this view when rendered (see the ``add_view``
- method's ``wrapper`` argument for a description).
-
- See :ref:`changing_the_forbidden_view` for more
- information."""
- return self._system_view(IForbiddenView, *arg, **kw)
-
- def set_notfound_view(self, *arg, **kw):
+ method's ``wrapper`` argument for a description)."""
+ view = self._derive_view(view, attr=attr, renderer_name=renderer)
+ def bwcompat_view(context, request):
+ ctx = getattr(request, 'context', None)
+ return view(ctx, request)
+ return self.add_view(bwcompat_view, context=Forbidden,
+ wrapper=wrapper, _info=_info)
+
+ def set_notfound_view(self, view=None, attr=None, renderer=None,
+ wrapper=None, _info=u''):
""" Add a default not found view to the current configuration
state.
+ .. warning:: This method has been deprecated in
+ :mod:`repoze.bfg` 1.3. *Do not use it for new development;
+ it should only be used to support older code bases which
+ depend upon it.* See :ref:`changing_the_notfound_view` to
+ see how a not found view should be registered in new
+ projects.
+
+ ..note:: For backwards compatibility with :mod:`repoze.bfg`
+ 1.2, unlike an 'exception view' as described in
+ :ref:`exception_views`, a ``context, request`` view
+ callable registered using this API should not expect to
+ receive the exception as its first (``context``) argument.
+ Instead it should expect to receive the 'real' context as
+ found via context-finding or ``None`` if no context could
+ be found. The exception causing the registered view to be
+ called is however still available as ``request.exception``.
+
The ``view`` argument should be a :term:`view callable`.
The ``attr`` argument should be the attribute of the view
@@ -1217,11 +1288,13 @@ class Configurator(object):
The ``wrapper`` argument should be the name of another view
which will wrap this view when rendered (see the ``add_view``
method's ``wrapper`` argument for a description).
-
- See :ref:`changing_the_notfound_view` for more
- information.
"""
- return self._system_view(INotFoundView, *arg, **kw)
+ view = self._derive_view(view, attr=attr, renderer_name=renderer)
+ def bwcompat_view(context, request):
+ ctx = getattr(request, 'context', None)
+ return view(ctx, request)
+ return self.add_view(bwcompat_view, context=NotFound,
+ wrapper=wrapper, _info=_info)
def add_static_view(self, name, path, cache_max_age=3600, _info=u''):
""" Add a view used to render static resources to the current
@@ -1353,46 +1426,68 @@ class Configurator(object):
def _make_predicates(xhr=None, request_method=None, path_info=None,
request_param=None, header=None, accept=None,
containment=None, request_type=None, custom=()):
- # Predicates are added to the predicate list in (presumed)
+
+ # PREDICATES
+ # ----------
+ #
+ # Given an argument list, a predicate list is computed.
+ # Predicates are added to a predicate list in (presumed)
# computation expense order. All predicates associated with a
- # view must evaluate true for the view to "match" a request.
- # Elsewhere in the code, we evaluate them using a generator
- # expression. The fastest predicate should be evaluated first,
- # then the next fastest, and so on, as if one returns false, the
- # remainder of the predicates won't need to be evaluated.
-
- # Each predicate is associated with a weight value. The weight
- # symbolizes the relative potential "importance" of the predicate
- # to all other predicates. A larger weight indicates greater
- # importance. These weights are subtracted from an aggregate
- # 'weight' variable. The aggregate weight is then divided by the
- # length of the predicate list to compute a "score" for this view.
- # The score represents the ordering in which a "multiview" ( a
+ # view or route must evaluate true for the view or route to
+ # "match" during a request. Elsewhere in the code, we evaluate
+ # predicates using a generator expression. The fastest predicate
+ # should be evaluated first, then the next fastest, and so on, as
+ # if one returns false, the remainder of the predicates won't need
+ # to be evaluated.
+ #
+ # While we compute predicates, we also compute a predicate hash
+ # (aka phash) that can be used by a caller to identify identical
+ # predicate lists.
+ #
+ # ORDERING
+ # --------
+ #
+ # A "order" is computed for the predicate list. An order is
+ # a scoring.
+ #
+ # Each predicate is associated with a weight value, which is a
+ # multiple of 2. The weight of a predicate symbolizes the
+ # relative potential "importance" of the predicate to all other
+ # predicates. A larger weight indicates greater importance.
+ #
+ # All weights for a given predicate list are bitwise ORed together
+ # to create a "score"; this score is then subtracted from
+ # MAX_ORDER and divided by an integer representing the number of
+ # predicates+1 to determine the order.
+ #
+ # The order represents the ordering in which a "multiview" ( a
# collection of views that share the same context/request/name
# triad but differ in other ways via predicates) will attempt to
- # call its set of views. Views with lower scores will be tried
+ # call its set of views. Views with lower orders will be tried
# first. The intent is to a) ensure that views with more
# predicates are always evaluated before views with fewer
# predicates and b) to ensure a stable call ordering of views that
- # share the same number of predicates.
-
- # Views which do not have any predicates get a score of
- # MAX_WEIGHT, meaning that they will be tried very last.
+ # share the same number of predicates. Views which do not have
+ # any predicates get an order of MAX_ORDER, meaning that they will
+ # be tried very last.
predicates = []
- weight = MAX_WEIGHT
+ weights = []
+ h = md5()
if xhr:
def xhr_predicate(context, request):
return request.is_xhr
- weight = weight - 20
+ weights.append(1 << 0)
predicates.append(xhr_predicate)
+ h.update('xhr:%r' % bool(xhr))
if request_method is not None:
def request_method_predicate(context, request):
return request.method == request_method
- weight = weight - 30
+ weights.append(1 << 2)
predicates.append(request_method_predicate)
+ h.update('request_method:%r' % request_method)
if path_info is not None:
try:
@@ -1401,8 +1496,9 @@ def _make_predicates(xhr=None, request_method=None, path_info=None,
raise ConfigurationError(why[0])
def path_info_predicate(context, request):
return path_info_val.match(request.path_info) is not None
- weight = weight - 40
+ weights.append(1 << 3)
predicates.append(path_info_predicate)
+ h.update('path_info:%r' % path_info)
if request_param is not None:
request_param_val = None
@@ -1412,8 +1508,9 @@ def _make_predicates(xhr=None, request_method=None, path_info=None,
if request_param_val is None:
return request_param in request.params
return request.params.get(request_param) == request_param_val
- weight = weight - 50
+ weights.append(1 << 4)
predicates.append(request_param_predicate)
+ h.update('request_param:%r=%r' % (request_param, request_param_val))
if header is not None:
header_name = header
@@ -1429,35 +1526,43 @@ def _make_predicates(xhr=None, request_method=None, path_info=None,
return header_name in request.headers
val = request.headers.get(header_name)
return header_val.match(val) is not None
- weight = weight - 60
+ weights.append(1 << 5)
predicates.append(header_predicate)
+ h.update('header:%r=%r' % (header_name, header_val))
if accept is not None:
def accept_predicate(context, request):
return accept in request.accept
- weight = weight - 70
+ weights.append(1 << 6)
predicates.append(accept_predicate)
+ h.update('accept:%r' % accept)
if containment is not None:
def containment_predicate(context, request):
return find_interface(context, containment) is not None
- weight = weight - 80
+ weights.append(1 << 7)
predicates.append(containment_predicate)
+ h.update('containment:%r' % id(containment))
if request_type is not None:
def request_type_predicate(context, request):
return request_type.providedBy(request)
- weight = weight - 90
+ weights.append(1 << 8)
predicates.append(request_type_predicate)
+ h.update('request_type:%r' % id(request_type))
if custom:
- for predicate in custom:
- weight = weight - 100
+ for num, predicate in enumerate(custom):
predicates.append(predicate)
+ h.update('custom%s:%r' % (num, id(predicate)))
+ weights.append(1 << 9)
- # this will be == MAX_WEIGHT if no predicates
- score = weight / (len(predicates) + 1)
- return score, predicates
+ score = 0
+ for bit in weights:
+ score = score | bit
+ order = (MAX_ORDER - score) / (len(predicates) + 1)
+ phash = h.hexdigest()
+ return order, predicates, phash
class MultiView(object):
implements(IMultiView)
@@ -1468,13 +1573,19 @@ class MultiView(object):
self.views = []
self.accepts = []
- def add(self, view, score, accept=None):
+ def add(self, view, order, accept=None, phash=None):
+ if phash is not None:
+ for i, (s, v, h) in enumerate(list(self.views)):
+ if phash == h:
+ self.views[i] = (order, view, phash)
+ return
+
if accept is None or '*' in accept:
- self.views.append((score, view))
+ self.views.append((order, view, phash))
self.views.sort()
else:
subset = self.media_views.setdefault(accept, [])
- subset.append((score, view))
+ subset.append((order, view, phash))
subset.sort()
accepts = set(self.accepts)
accepts.add(accept)
@@ -1496,7 +1607,7 @@ class MultiView(object):
return self.views
def match(self, context, request):
- for score, view in self.get_views(request):
+ for order, view, phash in self.get_views(request):
if not hasattr(view, '__predicated__'):
return view
if view.__predicated__(context, request):
@@ -1515,7 +1626,7 @@ class MultiView(object):
return view(context, request)
def __call__(self, context, request):
- for score, view in self.get_views(request):
+ for order, view, phash in self.get_views(request):
try:
return view(context, request)
except NotFound:
@@ -1548,7 +1659,7 @@ def decorate_view(wrapped_view, original_view):
except AttributeError:
pass
try:
- wrapped_view.__score__ = original_view.__score__
+ wrapped_view.__order__ = original_view.__order__
except AttributeError:
pass
return True
@@ -1795,19 +1906,26 @@ def _authdebug_view(view, permission, authn_policy, authz_policy, settings,
return wrapped_view
-def _attr_wrap(view, accept, score):
+def _attr_wrap(view, accept, order, phash):
# this is a little silly but we don't want to decorate the original
- # function with attributes that indicate accept and score,
+ # function with attributes that indicate accept, order, and phash,
# so we use a wrapper
- if (accept is None) and (score == MAX_WEIGHT):
+ if (accept is None) and (order == MAX_ORDER) and (phash == DEFAULT_PHASH):
return view # defaults
def attr_view(context, request):
return view(context, request)
attr_view.__accept__ = accept
- attr_view.__score__ = score
+ attr_view.__order__ = order
+ attr_view.__phash__ = phash
decorate_view(attr_view, view)
return attr_view
+def isclass(o):
+ return isinstance(o, (type, types.ClassType))
+
+def isexception(o):
+ return isinstance(o, Exception) or isclass(o) and issubclass(o, Exception)
+
# note that ``options`` is a b/w compat alias for ``settings`` and
# ``Configurator`` is a testing dep inj
def make_app(root_factory, package=None, filename='configure.zcml',
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index c322de2ff..53a905972 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -29,6 +29,9 @@ class IWSGIApplicationCreatedEvent(Interface):
class IRequest(Interface):
""" Request type interface attached to all request objects """
+# for exception view lookups
+IRequest.combined = IRequest
+
class IRouteRequest(Interface):
""" *internal only* interface used as in a utility lookup to find
route-specific interfaces. Not an API."""
@@ -78,6 +81,12 @@ class IResponseFactory(Interface):
should accept all the arguments that the webob.Response class
accepts)"""
+class IViewClassifier(Interface):
+ """ *Internal only* marker interface for views."""
+
+class IExceptionViewClassifier(Interface):
+ """ *Internal only* marker interface for exception views."""
+
class IView(Interface):
def __call__(context, request):
""" Must return an object that implements IResponse. May
@@ -99,7 +108,7 @@ class IMultiView(ISecuredView):
""" *internal only*. A multiview is a secured view that is a
collection of other views. Each of the views is associated with
zero or more predicates. Not an API."""
- def add(view, predicates, score):
+ def add(view, predicates, order, accept=None, phash=None):
""" Add a view to the multiview. """
class IRootFactory(Interface):
@@ -242,4 +251,4 @@ class IPackageOverrides(Interface):
# VH_ROOT_KEY is an interface; its imported from other packages (e.g.
# traversalwrapper)
-VH_ROOT_KEY = 'HTTP_X_VHM_ROOT'
+VH_ROOT_KEY = 'HTTP_X_VHM_ROOT'
diff --git a/repoze/bfg/request.py b/repoze/bfg/request.py
index 4dbbbb0df..8939e7b88 100644
--- a/repoze/bfg/request.py
+++ b/repoze/bfg/request.py
@@ -80,7 +80,11 @@ class Request(WebobRequest):
return self.environ.values()
def route_request_iface(name, bases=()):
- return InterfaceClass('%s_IRequest' % name, bases=bases)
+ iface = InterfaceClass('%s_IRequest' % name, bases=bases)
+ # for exception view lookups
+ iface.combined = InterfaceClass('%s_combined_IRequest' % name,
+ bases=(iface, IRequest))
+ return iface
def add_global_response_headers(request, headerlist):
attrs = request.__dict__
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 13da06266..d63eceb32 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -2,8 +2,7 @@ from zope.interface import implements
from zope.interface import providedBy
from repoze.bfg.interfaces import IDebugLogger
-from repoze.bfg.interfaces import IForbiddenView
-from repoze.bfg.interfaces import INotFoundView
+from repoze.bfg.interfaces import IExceptionViewClassifier
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IRootFactory
from repoze.bfg.interfaces import IRouteRequest
@@ -12,19 +11,17 @@ from repoze.bfg.interfaces import IRoutesMapper
from repoze.bfg.interfaces import ISettings
from repoze.bfg.interfaces import ITraverser
from repoze.bfg.interfaces import IView
+from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.configuration import make_app # b/c import
from repoze.bfg.events import AfterTraversal
from repoze.bfg.events import NewRequest
from repoze.bfg.events import NewResponse
-from repoze.bfg.exceptions import Forbidden
from repoze.bfg.exceptions import NotFound
from repoze.bfg.request import Request
from repoze.bfg.threadlocal import manager
from repoze.bfg.traversal import DefaultRootFactory
from repoze.bfg.traversal import ModelGraphTraverser
-from repoze.bfg.view import default_forbidden_view
-from repoze.bfg.view import default_notfound_view
make_app # prevent pyflakes from complaining
@@ -37,8 +34,6 @@ class Router(object):
def __init__(self, registry):
q = registry.queryUtility
self.logger = q(IDebugLogger)
- self.notfound_view = q(INotFoundView, default=default_notfound_view)
- self.forbidden_view = q(IForbiddenView, default=default_forbidden_view)
self.root_factory = q(IRootFactory, default=DefaultRootFactory)
self.routes_mapper = q(IRoutesMapper)
self.root_policy = self.root_factory # b/w compat
@@ -56,6 +51,7 @@ class Router(object):
return an iterable.
"""
registry = self.registry
+ adapters = registry.adapters
has_listeners = registry.has_listeners
logger = self.logger
manager = self.threadlocal_manager
@@ -72,7 +68,7 @@ class Router(object):
has_listeners and registry.notify(NewRequest(request))
request_iface = IRequest
-
+
try:
# find the root
root_factory = self.root_factory
@@ -94,7 +90,6 @@ class Router(object):
attrs['root'] = root
# find a view callable
- adapters = registry.adapters
traverser = adapters.queryAdapter(root, ITraverser)
if traverser is None:
traverser = ModelGraphTraverser(root)
@@ -107,7 +102,7 @@ class Router(object):
has_listeners and registry.notify(AfterTraversal(request))
context_iface = providedBy(context)
view_callable = adapters.lookup(
- (request_iface, context_iface),
+ (IViewClassifier, request_iface, context_iface),
IView, name=view_name, default=None)
# invoke the view callable
@@ -125,25 +120,27 @@ class Router(object):
else:
msg = request.path_info
environ['repoze.bfg.message'] = msg
- response = self.notfound_view(context, request)
+ raise NotFound(msg)
else:
response = view_callable(context, request)
# handle exceptions raised during root finding and view lookup
- except Forbidden, why:
- try:
- msg = why[0]
- except (IndexError, TypeError):
- msg = ''
- environ['repoze.bfg.message'] = msg
- response = self.forbidden_view(context, request)
- except NotFound, why:
+ except Exception, why:
+ for_ = (IExceptionViewClassifier,
+ request_iface.combined,
+ providedBy(why))
+ view_callable = adapters.lookup(for_, IView, default=None)
+ if view_callable is None:
+ raise
+
try:
msg = why[0]
- except (IndexError, TypeError):
+ except Exception:
msg = ''
environ['repoze.bfg.message'] = msg
- response = self.notfound_view(context, request)
+
+ attrs['exception'] = why
+ response = view_callable(why, request)
# process the response
has_listeners and registry.notify(NewResponse(response))
@@ -159,7 +156,7 @@ class Router(object):
if 'global_response_headers' in attrs:
headers = list(headers)
headers.extend(attrs['global_response_headers'])
-
+
start_response(status, headers)
return app_iter
diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py
index 822fd9ee7..cd1bae9a5 100644
--- a/repoze/bfg/security.py
+++ b/repoze/bfg/security.py
@@ -5,6 +5,7 @@ from zope.deprecation import deprecated
from repoze.bfg.interfaces import IAuthenticationPolicy
from repoze.bfg.interfaces import IAuthorizationPolicy
from repoze.bfg.interfaces import ISecuredView
+from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.exceptions import Forbidden as Unauthorized # b/c import
from repoze.bfg.threadlocal import get_current_registry
@@ -122,7 +123,7 @@ def view_execution_permitted(context, request, name=''):
reg = request.registry
except AttributeError:
reg = get_current_registry() # b/c
- provides = map(providedBy, (request, context))
+ provides = [IViewClassifier] + map(providedBy, (request, context))
view = reg.adapters.lookup(provides, ISecuredView, name=name)
if view is None:
return Allowed(
diff --git a/repoze/bfg/testing.py b/repoze/bfg/testing.py
index 4e9abdae0..b68539f43 100644
--- a/repoze/bfg/testing.py
+++ b/repoze/bfg/testing.py
@@ -14,6 +14,7 @@ from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IRoutesMapper
from repoze.bfg.interfaces import ISecuredView
from repoze.bfg.interfaces import IView
+from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IViewPermission
from repoze.bfg.configuration import Configurator
@@ -165,6 +166,7 @@ def registerView(name, result='', view=None, for_=(Interface, Interface),
:meth:`repoze.bfg.configuration.Configurator.add_view``
method in your unit and integration tests.
"""
+ for_ = (IViewClassifier, ) + for_
if view is None:
def view(context, request):
return Response(result)
diff --git a/repoze/bfg/tests/exceptionviewapp/__init__.py b/repoze/bfg/tests/exceptionviewapp/__init__.py
new file mode 100644
index 000000000..ef5fe8b12
--- /dev/null
+++ b/repoze/bfg/tests/exceptionviewapp/__init__.py
@@ -0,0 +1 @@
+# a package
diff --git a/repoze/bfg/tests/exceptionviewapp/configure.zcml b/repoze/bfg/tests/exceptionviewapp/configure.zcml
new file mode 100644
index 000000000..680e065a6
--- /dev/null
+++ b/repoze/bfg/tests/exceptionviewapp/configure.zcml
@@ -0,0 +1,44 @@
+<configure xmlns="http://namespaces.repoze.org/bfg">
+
+ <include package="repoze.bfg.includes" />
+
+ <view view=".views.maybe"/>
+
+ <view context=".models.NotAnException"
+ view=".views.no"/>
+
+ <view context=".models.AnException"
+ view=".views.yes"/>
+
+ <view name="raise_exception"
+ view=".views.raise_exception"/>
+
+ <route name="route_raise_exception"
+ path="route_raise_exception"
+ view=".views.raise_exception"/>
+
+ <route name="route_raise_exception2"
+ path="route_raise_exception2"
+ view=".views.raise_exception"
+ factory=".models.route_factory"/>
+
+ <route name="route_raise_exception3"
+ path="route_raise_exception3"
+ view=".views.raise_exception"
+ factory=".models.route_factory2"/>
+
+ <view context=".models.AnException"
+ route_name="route_raise_exception3"
+ view=".views.whoa"/>
+
+ <route name="route_raise_exception4"
+ path="route_raise_exception4"
+ view=".views.raise_exception"/>
+
+ <view context=".models.AnException"
+ route_name="route_raise_exception4"
+ view=".views.whoa"/>
+
+</configure>
+
+
diff --git a/repoze/bfg/tests/exceptionviewapp/models.py b/repoze/bfg/tests/exceptionviewapp/models.py
new file mode 100644
index 000000000..fe407badc
--- /dev/null
+++ b/repoze/bfg/tests/exceptionviewapp/models.py
@@ -0,0 +1,18 @@
+
+class NotAnException(object):
+ pass
+
+class AnException(Exception):
+ pass
+
+class RouteContext(object):
+ pass
+
+class RouteContext2(object):
+ pass
+
+def route_factory(*arg):
+ return RouteContext()
+
+def route_factory2(*arg):
+ return RouteContext2()
diff --git a/repoze/bfg/tests/exceptionviewapp/views.py b/repoze/bfg/tests/exceptionviewapp/views.py
new file mode 100644
index 000000000..1432618cf
--- /dev/null
+++ b/repoze/bfg/tests/exceptionviewapp/views.py
@@ -0,0 +1,17 @@
+from webob import Response
+from models import AnException
+
+def no(request):
+ return Response('no')
+
+def yes(request):
+ return Response('yes')
+
+def maybe(request):
+ return Response('maybe')
+
+def whoa(request):
+ return Response('whoa')
+
+def raise_exception(request):
+ raise AnException()
diff --git a/repoze/bfg/tests/fixtureapp/configure.zcml b/repoze/bfg/tests/fixtureapp/configure.zcml
index b936b158e..e3470d47a 100644
--- a/repoze/bfg/tests/fixtureapp/configure.zcml
+++ b/repoze/bfg/tests/fixtureapp/configure.zcml
@@ -7,6 +7,21 @@
/>
<view
+ view=".views.exception_view"
+ for="RuntimeError"
+ />
+
+ <view
+ view=".views.protected_view"
+ name="protected.html"
+ />
+
+ <view
+ view=".views.erroneous_view"
+ name="error.html"
+ />
+
+ <view
view=".views.fixture_view"
name="dummyskin.html"
request_type=".views.IDummy"
diff --git a/repoze/bfg/tests/fixtureapp/views.py b/repoze/bfg/tests/fixtureapp/views.py
index d9bc0bb6e..862046d43 100644
--- a/repoze/bfg/tests/fixtureapp/views.py
+++ b/repoze/bfg/tests/fixtureapp/views.py
@@ -1,10 +1,22 @@
from zope.interface import Interface
from webob import Response
+from repoze.bfg.exceptions import Forbidden
def fixture_view(context, request):
""" """
return Response('fixture')
+def erroneous_view(context, request):
+ """ """
+ raise RuntimeError()
+
+def exception_view(context, request):
+ """ """
+ return Response('supressed')
+
+def protected_view(context, request):
+ """ """
+ raise Forbidden()
+
class IDummy(Interface):
pass
-
diff --git a/repoze/bfg/tests/hybridapp/configure.zcml b/repoze/bfg/tests/hybridapp/configure.zcml
index 56c6ea8db..a94409e26 100644
--- a/repoze/bfg/tests/hybridapp/configure.zcml
+++ b/repoze/bfg/tests/hybridapp/configure.zcml
@@ -58,4 +58,60 @@
use_global_views="True"
/>
+ <route
+ path="error"
+ name="route7"
+ />
+
+ <view
+ route_name="route7"
+ view=".views.erroneous_view"
+ />
+
+ <route
+ path="error2"
+ name="route8"
+ />
+
+ <view
+ route_name="route8"
+ view=".views.erroneous_view"
+ />
+
+ <!-- we want this view to "win" for route7 as exception view -->
+ <view
+ view=".views.exception_view"
+ for="RuntimeError"
+ />
+
+ <!-- we want this view to "win" for route8 as exception view-->
+ <view
+ route_name="route8"
+ view=".views.exception2_view"
+ for="RuntimeError"
+ />
+
+ <route
+ path="error_sub"
+ name="route9"
+ />
+
+ <view
+ route_name="route9"
+ view=".views.erroneous_sub_view"
+ />
+
+ <!-- we want this view to "win" for route9 as exception view... -->
+ <view
+ route_name="route9"
+ view=".views.exception2_view"
+ for=".views.SuperException"
+ />
+
+ <!-- ...even if we have more context-specialized view for raised exception -->
+ <view
+ view=".views.exception_view"
+ for=".views.SubException"
+ />
+
</configure>
diff --git a/repoze/bfg/tests/hybridapp/views.py b/repoze/bfg/tests/hybridapp/views.py
index 7f60ddbfe..135ef8290 100644
--- a/repoze/bfg/tests/hybridapp/views.py
+++ b/repoze/bfg/tests/hybridapp/views.py
@@ -15,3 +15,25 @@ def global2_view(request):
def route2_view(request):
""" """
return Response('route2')
+
+def exception_view(request):
+ """ """
+ return Response('supressed')
+
+def exception2_view(request):
+ """ """
+ return Response('supressed2')
+
+def erroneous_view(request):
+ """ """
+ raise RuntimeError()
+
+def erroneous_sub_view(request):
+ """ """
+ raise SubException()
+
+class SuperException(Exception):
+ """ """
+
+class SubException(SuperException):
+ """ """
diff --git a/repoze/bfg/tests/test_configuration.py b/repoze/bfg/tests/test_configuration.py
index 90413f1d6..99d564b91 100644
--- a/repoze/bfg/tests/test_configuration.py
+++ b/repoze/bfg/tests/test_configuration.py
@@ -21,16 +21,22 @@ class ConfiguratorTests(unittest.TestCase):
return Renderer
def _getViewCallable(self, config, ctx_iface=None, request_iface=None,
- name=''):
+ name='', exception_view=False):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ if exception_view:
+ classifier = IExceptionViewClassifier
+ else:
+ classifier = IViewClassifier
if ctx_iface is None:
ctx_iface = Interface
if request_iface is None:
request_iface = IRequest
return config.registry.adapters.lookup(
- (request_iface, ctx_iface), IView, name=name,
+ (classifier, request_iface, ctx_iface), IView, name=name,
default=None)
def _getRouteRequestIface(self, config, name):
@@ -182,6 +188,8 @@ class ConfiguratorTests(unittest.TestCase):
pass
reg = DummyRegistry()
config = self._makeOne(reg)
+ config.set_notfound_view = lambda *arg, **kw: None
+ config.set_forbidden_view = lambda *arg, **kw: None
config.setup_registry()
self.assertEqual(reg.has_listeners, True)
self.assertEqual(reg.notify(1), None)
@@ -558,54 +566,260 @@ class ConfiguratorTests(unittest.TestCase):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import ISecuredView
+ from repoze.bfg.interfaces import IViewClassifier
view = lambda *arg: 'OK'
view.__call_permissive__ = view
config = self._makeOne()
config.add_view(view=view)
wrapper = config.registry.adapters.lookup(
- (IRequest, Interface), ISecuredView, name='', default=None)
+ (IViewClassifier, IRequest, Interface),
+ ISecuredView, name='', default=None)
self.assertEqual(wrapper, view)
+ def test_add_view_exception_register_secured_view(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ view = lambda *arg: 'OK'
+ view.__call_permissive__ = view
+ config = self._makeOne()
+ config.add_view(view=view, context=RuntimeError)
+ wrapper = config.registry.adapters.lookup(
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='', default=None)
+ self.assertEqual(wrapper, view)
+
+ def test_add_view_same_phash_overrides_existing_single_view(self):
+ from repoze.bfg.compat import md5
+ from zope.interface import Interface
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ phash = md5()
+ phash.update('xhr:True')
+ view = lambda *arg: 'NOT OK'
+ view.__phash__ = phash.hexdigest()
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view, (IViewClassifier, IRequest, Interface), IView, name='')
+ def newview(context, request):
+ return 'OK'
+ config.add_view(view=newview, xhr=True)
+ wrapper = self._getViewCallable(config)
+ self.failIf(IMultiView.providedBy(wrapper))
+ request = DummyRequest()
+ request.is_xhr = True
+ self.assertEqual(wrapper(None, request), 'OK')
+
+ def test_add_view_exc_same_phash_overrides_existing_single_view(self):
+ from repoze.bfg.compat import md5
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ phash = md5()
+ phash.update('xhr:True')
+ view = lambda *arg: 'NOT OK'
+ view.__phash__ = phash.hexdigest()
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ def newview(context, request):
+ return 'OK'
+ config.add_view(view=newview, xhr=True,
+ context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failIf(IMultiView.providedBy(wrapper))
+ request = DummyRequest()
+ request.is_xhr = True
+ self.assertEqual(wrapper(None, request), 'OK')
+
+ def test_add_view_default_phash_overrides_no_phash(self):
+ from zope.interface import Interface
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ view = lambda *arg: 'NOT OK'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view, (IViewClassifier, IRequest, Interface), IView, name='')
+ def newview(context, request):
+ return 'OK'
+ config.add_view(view=newview)
+ wrapper = self._getViewCallable(config)
+ self.failIf(IMultiView.providedBy(wrapper))
+ request = DummyRequest()
+ request.is_xhr = True
+ self.assertEqual(wrapper(None, request), 'OK')
+
+ def test_add_view_exc_default_phash_overrides_no_phash(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ view = lambda *arg: 'NOT OK'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ def newview(context, request):
+ return 'OK'
+ config.add_view(view=newview, context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failIf(IMultiView.providedBy(wrapper))
+ request = DummyRequest()
+ request.is_xhr = True
+ self.assertEqual(wrapper(None, request), 'OK')
+
+ def test_add_view_default_phash_overrides_default_phash(self):
+ from repoze.bfg.configuration import DEFAULT_PHASH
+ from zope.interface import Interface
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ view = lambda *arg: 'NOT OK'
+ view.__phash__ = DEFAULT_PHASH
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view, (IViewClassifier, IRequest, Interface), IView, name='')
+ def newview(context, request):
+ return 'OK'
+ config.add_view(view=newview)
+ wrapper = self._getViewCallable(config)
+ self.failIf(IMultiView.providedBy(wrapper))
+ request = DummyRequest()
+ request.is_xhr = True
+ self.assertEqual(wrapper(None, request), 'OK')
+
+ def test_add_view_exc_default_phash_overrides_default_phash(self):
+ from repoze.bfg.configuration import DEFAULT_PHASH
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ view = lambda *arg: 'NOT OK'
+ view.__phash__ = DEFAULT_PHASH
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ def newview(context, request):
+ return 'OK'
+ config.add_view(view=newview, context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failIf(IMultiView.providedBy(wrapper))
+ request = DummyRequest()
+ request.is_xhr = True
+ self.assertEqual(wrapper(None, request), 'OK')
+
def test_add_view_multiview_replaces_existing_view(self):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IMultiView
view = lambda *arg: 'OK'
+ view.__phash__ = 'abc'
config = self._makeOne()
config.registry.registerAdapter(
- view, (IRequest, Interface), IView, name='')
+ view, (IViewClassifier, IRequest, Interface), IView, name='')
config.add_view(view=view)
wrapper = self._getViewCallable(config)
self.failUnless(IMultiView.providedBy(wrapper))
self.assertEqual(wrapper(None, None), 'OK')
+ def test_add_view_exc_multiview_replaces_existing_view(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IMultiView
+ view = lambda *arg: 'OK'
+ view.__phash__ = 'abc'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ config.add_view(view=view, context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failUnless(IMultiView.providedBy(wrapper))
+ self.assertEqual(wrapper(None, None), 'OK')
+
def test_add_view_multiview_replaces_existing_securedview(self):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import ISecuredView
from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
view = lambda *arg: 'OK'
+ view.__phash__ = 'abc'
config = self._makeOne()
config.registry.registerAdapter(
- view, (IRequest, Interface), ISecuredView, name='')
+ view, (IViewClassifier, IRequest, Interface),
+ ISecuredView, name='')
config.add_view(view=view)
wrapper = self._getViewCallable(config)
self.failUnless(IMultiView.providedBy(wrapper))
self.assertEqual(wrapper(None, None), 'OK')
+ def test_add_view_exc_multiview_replaces_existing_securedview(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import ISecuredView
+ from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ view = lambda *arg: 'OK'
+ view.__phash__ = 'abc'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IViewClassifier, IRequest, implementedBy(RuntimeError)),
+ ISecuredView, name='')
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ ISecuredView, name='')
+ config.add_view(view=view, context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failUnless(IMultiView.providedBy(wrapper))
+ self.assertEqual(wrapper(None, None), 'OK')
+
def test_add_view_with_accept_multiview_replaces_existing_view(self):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IView
from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
def view(context, request):
return 'OK'
def view2(context, request):
return 'OK2'
config = self._makeOne()
config.registry.registerAdapter(
- view, (IRequest, Interface), IView, name='')
+ view, (IViewClassifier, IRequest, Interface), IView, name='')
config.add_view(view=view2, accept='text/html')
wrapper = self._getViewCallable(config)
self.failUnless(IMultiView.providedBy(wrapper))
@@ -616,19 +830,52 @@ class ConfiguratorTests(unittest.TestCase):
request.accept = DummyAccept('text/html', 'text/html')
self.assertEqual(wrapper(None, request), 'OK2')
+ def test_add_view_exc_with_accept_multiview_replaces_existing_view(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ def view(context, request):
+ return 'OK'
+ def view2(context, request):
+ return 'OK2'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ config.add_view(view=view2, accept='text/html', context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failUnless(IMultiView.providedBy(wrapper))
+ self.assertEqual(len(wrapper.views), 1)
+ self.assertEqual(len(wrapper.media_views), 1)
+ self.assertEqual(wrapper(None, None), 'OK')
+ request = DummyRequest()
+ request.accept = DummyAccept('text/html', 'text/html')
+ self.assertEqual(wrapper(None, request), 'OK2')
+
def test_add_view_multiview_replaces_existing_view_with___accept__(self):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IView
from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
def view(context, request):
return 'OK'
def view2(context, request):
return 'OK2'
view.__accept__ = 'text/html'
+ view.__phash__ = 'abc'
config = self._makeOne()
config.registry.registerAdapter(
- view, (IRequest, Interface), IView, name='')
+ view, (IViewClassifier, IRequest, Interface), IView, name='')
config.add_view(view=view2)
wrapper = self._getViewCallable(config)
self.failUnless(IMultiView.providedBy(wrapper))
@@ -639,19 +886,78 @@ class ConfiguratorTests(unittest.TestCase):
request.accept = DummyAccept('text/html')
self.assertEqual(wrapper(None, request), 'OK')
+ def test_add_view_exc_mulview_replaces_existing_view_with___accept__(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ def view(context, request):
+ return 'OK'
+ def view2(context, request):
+ return 'OK2'
+ view.__accept__ = 'text/html'
+ view.__phash__ = 'abc'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IView, name='')
+ config.add_view(view=view2, context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failUnless(IMultiView.providedBy(wrapper))
+ self.assertEqual(len(wrapper.views), 1)
+ self.assertEqual(len(wrapper.media_views), 1)
+ self.assertEqual(wrapper(None, None), 'OK2')
+ request = DummyRequest()
+ request.accept = DummyAccept('text/html')
+ self.assertEqual(wrapper(None, request), 'OK')
+
def test_add_view_multiview_replaces_multiview(self):
from zope.interface import Interface
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
view = DummyMultiView()
config = self._makeOne()
- config.registry.registerAdapter(view, (IRequest, Interface),
- IMultiView, name='')
+ config.registry.registerAdapter(
+ view, (IViewClassifier, IRequest, Interface),
+ IMultiView, name='')
view2 = lambda *arg: 'OK2'
config.add_view(view=view2)
wrapper = self._getViewCallable(config)
self.failUnless(IMultiView.providedBy(wrapper))
- self.assertEqual(wrapper.views, [(view2, None)])
+ self.assertEqual([x[:2] for x in wrapper.views], [(view2, None)])
+ self.assertEqual(wrapper(None, None), 'OK1')
+
+ def test_add_view_exc_multiview_replaces_multiview(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ view = DummyMultiView()
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view,
+ (IViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IMultiView, name='')
+ config.registry.registerAdapter(
+ view,
+ (IExceptionViewClassifier, IRequest, implementedBy(RuntimeError)),
+ IMultiView, name='')
+ view2 = lambda *arg: 'OK2'
+ config.add_view(view=view2, context=RuntimeError)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError), exception_view=True)
+ self.failUnless(IMultiView.providedBy(wrapper))
+ self.assertEqual([x[:2] for x in wrapper.views], [(view2, None)])
self.assertEqual(wrapper(None, None), 'OK1')
def test_add_view_multiview_context_superclass_then_subclass(self):
@@ -659,6 +965,7 @@ class ConfiguratorTests(unittest.TestCase):
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IView
from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
class ISuper(Interface):
pass
class ISub(ISuper):
@@ -667,7 +974,7 @@ class ConfiguratorTests(unittest.TestCase):
view2 = lambda *arg: 'OK2'
config = self._makeOne()
config.registry.registerAdapter(
- view, (IRequest, ISuper), IView, name='')
+ view, (IViewClassifier, IRequest, ISuper), IView, name='')
config.add_view(view=view2, for_=ISub)
wrapper = self._getViewCallable(config, ISuper, IRequest)
self.failIf(IMultiView.providedBy(wrapper))
@@ -676,6 +983,40 @@ class ConfiguratorTests(unittest.TestCase):
self.failIf(IMultiView.providedBy(wrapper))
self.assertEqual(wrapper(None, None), 'OK2')
+ def test_add_view_multiview_exception_superclass_then_subclass(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IMultiView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ class Super(Exception):
+ pass
+ class Sub(Super):
+ pass
+ view = lambda *arg: 'OK'
+ view2 = lambda *arg: 'OK2'
+ config = self._makeOne()
+ config.registry.registerAdapter(
+ view, (IViewClassifier, IRequest, Super), IView, name='')
+ config.registry.registerAdapter(
+ view, (IExceptionViewClassifier, IRequest, Super), IView, name='')
+ config.add_view(view=view2, for_=Sub)
+ wrapper = self._getViewCallable(
+ config, implementedBy(Super), IRequest)
+ wrapper_exc_view = self._getViewCallable(
+ config, implementedBy(Super), IRequest, exception_view=True)
+ self.assertEqual(wrapper_exc_view, wrapper)
+ self.failIf(IMultiView.providedBy(wrapper_exc_view))
+ self.assertEqual(wrapper_exc_view(None, None), 'OK')
+ wrapper = self._getViewCallable(
+ config, implementedBy(Sub), IRequest)
+ wrapper_exc_view = self._getViewCallable(
+ config, implementedBy(Sub), IRequest, exception_view=True)
+ self.assertEqual(wrapper_exc_view, wrapper)
+ self.failIf(IMultiView.providedBy(wrapper_exc_view))
+ self.assertEqual(wrapper_exc_view(None, None), 'OK2')
+
def test_add_view_multiview_call_ordering(self):
from zope.interface import directlyProvides
def view1(context, request): return 'view1'
@@ -821,6 +1162,37 @@ class ConfiguratorTests(unittest.TestCase):
self.failIfEqual(wrapper, None)
self.assertEqual(wrapper(None, None), 'OK')
+ def test_add_view_with_route_name_exception(self):
+ from zope.interface import implementedBy
+ from zope.component import ComponentLookupError
+ view = lambda *arg: 'OK'
+ config = self._makeOne()
+ config.add_view(view=view, route_name='foo', context=RuntimeError)
+ self.assertEqual(len(config.registry.deferred_route_views), 1)
+ infos = config.registry.deferred_route_views['foo']
+ self.assertEqual(len(infos), 1)
+ info = infos[0]
+ self.assertEqual(info['route_name'], 'foo')
+ self.assertEqual(info['view'], view)
+ self.assertRaises(ComponentLookupError,
+ self._getRouteRequestIface, config, 'foo')
+ wrapper_exc_view = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError),
+ exception_view=True)
+ self.assertEqual(wrapper_exc_view, None)
+ config.add_route('foo', '/a/b')
+ request_iface = self._getRouteRequestIface(config, 'foo')
+ self.failIfEqual(request_iface, None)
+ wrapper_exc_view = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError),
+ request_iface=request_iface, exception_view=True)
+ self.failIfEqual(wrapper_exc_view, None)
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError),
+ request_iface=request_iface)
+ self.assertEqual(wrapper_exc_view, wrapper)
+ self.assertEqual(wrapper_exc_view(None, None), 'OK')
+
def test_add_view_with_request_method_true(self):
view = lambda *arg: 'OK'
config = self._makeOne()
@@ -1048,6 +1420,16 @@ class ConfiguratorTests(unittest.TestCase):
request.is_xhr = True
self.assertEqual(wrapper(None, request), 'OK')
+ def test_add_view_same_predicates(self):
+ view2 = lambda *arg: 'second'
+ view1 = lambda *arg: 'first'
+ config = self._makeOne()
+ config.add_view(view=view1)
+ config.add_view(view=view2)
+ view = self._getViewCallable(config)
+ request = self._makeRequest(config)
+ self.assertEqual(view(None, request), 'second')
+
def _assertRoute(self, config, name, path, num_predicates=0):
from repoze.bfg.interfaces import IRoutesMapper
mapper = config.registry.getUtility(IRoutesMapper)
@@ -1164,6 +1546,22 @@ class ConfiguratorTests(unittest.TestCase):
wrapper = self._getViewCallable(config, IOther, request_type)
self.assertEqual(wrapper, None)
+ def test_add_route_with_view_exception(self):
+ from zope.interface import implementedBy
+ config = self._makeOne()
+ view = lambda *arg: 'OK'
+ config.add_route('name', 'path', view=view, view_context=RuntimeError)
+ request_type = self._getRouteRequestIface(config, 'name')
+ wrapper = self._getViewCallable(
+ config, ctx_iface=implementedBy(RuntimeError),
+ request_iface=request_type, exception_view=True)
+ self.assertEqual(wrapper(None, None), 'OK')
+ self._assertRoute(config, 'name', 'path')
+ wrapper = self._getViewCallable(
+ config, ctx_iface=IOther,
+ request_iface=request_type, exception_view=True)
+ self.assertEqual(wrapper, None)
+
def test_add_route_with_view_for(self):
config = self._makeOne()
view = lambda *arg: 'OK'
@@ -1280,6 +1678,7 @@ class ConfiguratorTests(unittest.TestCase):
from zope.interface import implementedBy
from repoze.bfg.static import StaticRootFactory
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
config = self._makeOne()
config.add_static_view('static', 'fixtures/static')
request_type = self._getRouteRequestIface(config, 'static')
@@ -1287,7 +1686,7 @@ class ConfiguratorTests(unittest.TestCase):
self.assertEqual(route.factory.__class__, StaticRootFactory)
iface = implementedBy(StaticRootFactory)
wrapped = config.registry.adapters.lookup(
- (request_type, iface), IView, name='')
+ (IViewClassifier, request_type, iface), IView, name='')
request = self._makeRequest(config)
self.assertEqual(wrapped(None, request).__class__, PackageURLParser)
@@ -1296,6 +1695,7 @@ class ConfiguratorTests(unittest.TestCase):
from zope.interface import implementedBy
from repoze.bfg.static import StaticRootFactory
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
config = self._makeOne()
config.add_static_view('static', 'repoze.bfg.tests:fixtures/static')
request_type = self._getRouteRequestIface(config, 'static')
@@ -1303,7 +1703,7 @@ class ConfiguratorTests(unittest.TestCase):
self.assertEqual(route.factory.__class__, StaticRootFactory)
iface = implementedBy(StaticRootFactory)
wrapped = config.registry.adapters.lookup(
- (request_type, iface), IView, name='')
+ (IViewClassifier, request_type, iface), IView, name='')
request = self._makeRequest(config)
self.assertEqual(wrapped(None, request).__class__, PackageURLParser)
@@ -1313,6 +1713,7 @@ class ConfiguratorTests(unittest.TestCase):
from zope.interface import implementedBy
from repoze.bfg.static import StaticRootFactory
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
config = self._makeOne()
here = os.path.dirname(__file__)
static_path = os.path.join(here, 'fixtures', 'static')
@@ -1322,76 +1723,63 @@ class ConfiguratorTests(unittest.TestCase):
self.assertEqual(route.factory.__class__, StaticRootFactory)
iface = implementedBy(StaticRootFactory)
wrapped = config.registry.adapters.lookup(
- (request_type, iface), IView, name='')
+ (IViewClassifier, request_type, iface), IView, name='')
request = self._makeRequest(config)
self.assertEqual(wrapped(None, request).__class__, StaticURLParser)
- def test__system_view_no_view_no_renderer(self):
- from repoze.bfg.exceptions import ConfigurationError
- config = self._makeOne()
- self.assertRaises(ConfigurationError, config._system_view, IDummy)
-
- def test__system_view_no_view_with_renderer(self):
+ def test_set_notfound_view(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.exceptions import NotFound
config = self._makeOne()
- self._registerRenderer(config, name='.pt')
- config._system_view(IDummy,
- renderer='repoze.bfg.tests:fixtures/minimal.pt')
+ view = lambda *arg: arg
+ config.set_notfound_view(view)
request = self._makeRequest(config)
- view = config.registry.getUtility(IDummy)
+ view = self._getViewCallable(config, ctx_iface=implementedBy(NotFound),
+ request_iface=IRequest)
result = view(None, request)
- self.assertEqual(result.body, 'Hello!')
+ self.assertEqual(result, (None, request))
- def test__system_view_with_attr(self):
+ def test_set_notfound_view_request_has_context(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.exceptions import NotFound
config = self._makeOne()
- class view(object):
- def __init__(self, context, request):
- pass
- def index(self):
- return 'OK'
- config._system_view(IDummy, view=view, attr='index')
- view = config.registry.getUtility(IDummy)
+ view = lambda *arg: arg
+ config.set_notfound_view(view)
request = self._makeRequest(config)
+ request.context = 'abc'
+ view = self._getViewCallable(config, ctx_iface=implementedBy(NotFound),
+ request_iface=IRequest)
result = view(None, request)
- self.assertEqual(result, 'OK')
+ self.assertEqual(result, ('abc', request))
- def test__system_view_with_wrapper(self):
- from zope.interface import Interface
- from zope.interface import directlyProvides
+ def test_set_forbidden_view(self):
+ from zope.interface import implementedBy
from repoze.bfg.interfaces import IRequest
- from repoze.bfg.interfaces import IView
- config = self._makeOne()
- view = lambda *arg: DummyResponse()
- wrapper = lambda *arg: 'OK2'
- config.registry.registerAdapter(wrapper, (Interface, Interface),
- IView, name='wrapper')
- config._system_view(IDummy, view=view, wrapper='wrapper')
- view = config.registry.getUtility(IDummy)
- request = self._makeRequest(config)
- directlyProvides(request, IRequest)
- request.registry = config.registry
- context = DummyContext()
- result = view(context, request)
- self.assertEqual(result, 'OK2')
-
- def test_set_notfound_view(self):
- from repoze.bfg.interfaces import INotFoundView
+ from repoze.bfg.exceptions import Forbidden
config = self._makeOne()
view = lambda *arg: 'OK'
- config.set_notfound_view(view)
+ config.set_forbidden_view(view)
request = self._makeRequest(config)
- view = config.registry.getUtility(INotFoundView)
+ view = self._getViewCallable(config, ctx_iface=implementedBy(Forbidden),
+ request_iface=IRequest)
result = view(None, request)
self.assertEqual(result, 'OK')
- def test_set_forbidden_view(self):
- from repoze.bfg.interfaces import IForbiddenView
+ def test_set_forbidden_view_request_has_context(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.exceptions import Forbidden
config = self._makeOne()
- view = lambda *arg: 'OK'
+ view = lambda *arg: arg
config.set_forbidden_view(view)
request = self._makeRequest(config)
- view = config.registry.getUtility(IForbiddenView)
+ request.context = 'abc'
+ view = self._getViewCallable(config, ctx_iface=implementedBy(Forbidden),
+ request_iface=IRequest)
result = view(None, request)
- self.assertEqual(result, 'OK')
+ self.assertEqual(result, ('abc', request))
def test__set_authentication_policy(self):
from repoze.bfg.interfaces import IAuthenticationPolicy
@@ -1680,6 +2068,7 @@ class ConfiguratorTests(unittest.TestCase):
def test__derive_view_with_wrapper_viewname(self):
from webob import Response
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
inner_response = Response('OK')
def inner_view(context, request):
return inner_response
@@ -1690,7 +2079,7 @@ class ConfiguratorTests(unittest.TestCase):
return Response('outer ' + request.wrapped_body)
config = self._makeOne()
config.registry.registerAdapter(
- outer_view, (None, None), IView, 'owrap')
+ outer_view, (IViewClassifier, None, None), IView, 'owrap')
result = config._derive_view(inner_view, viewname='inner',
wrapper_viewname='owrap')
self.failIf(result is inner_view)
@@ -2396,6 +2785,183 @@ class Test_decorate_view(unittest.TestCase):
self.failUnless(view1.__predicated__.im_func is
view2.__predicated__.im_func)
+class Test__make_predicates(unittest.TestCase):
+ def _callFUT(self, **kw):
+ from repoze.bfg.configuration import _make_predicates
+ return _make_predicates(**kw)
+
+ def test_ordering_xhr_and_request_method_trump_only_containment(self):
+ order1, _, _ = self._callFUT(xhr=True, request_method='GET')
+ order2, _, _ = self._callFUT(containment=True)
+ self.failUnless(order1 < order2)
+
+ def test_ordering_number_of_predicates(self):
+ order1, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ header='header',
+ accept='accept',
+ containment='containment',
+ request_type='request_type',
+ custom=('a',)
+ )
+ order2, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ header='header',
+ accept='accept',
+ containment='containment',
+ request_type='request_type',
+ custom=('a',)
+ )
+ order3, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ header='header',
+ accept='accept',
+ containment='containment',
+ request_type='request_type',
+ )
+ order4, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ header='header',
+ accept='accept',
+ containment='containment',
+ )
+ order5, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ header='header',
+ accept='accept',
+ )
+ order6, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ header='header',
+ )
+ order7, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ request_param='param',
+ )
+ order8, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ )
+ order9, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ )
+ order10, _, _ = self._callFUT(
+ xhr='xhr',
+ )
+ order11, _, _ = self._callFUT(
+ )
+ self.assertEqual(order1, order2)
+ self.failUnless(order3 > order2)
+ self.failUnless(order4 > order3)
+ self.failUnless(order5 > order4)
+ self.failUnless(order6 > order5)
+ self.failUnless(order7 > order6)
+ self.failUnless(order8 > order7)
+ self.failUnless(order9 > order8)
+ self.failUnless(order10 > order9)
+ self.failUnless(order11 > order10)
+
+ def test_ordering_importance_of_predicates(self):
+ order1, _, _ = self._callFUT(
+ xhr='xhr',
+ )
+ order2, _, _ = self._callFUT(
+ request_method='request_method',
+ )
+ order3, _, _ = self._callFUT(
+ path_info='path_info',
+ )
+ order4, _, _ = self._callFUT(
+ request_param='param',
+ )
+ order5, _, _ = self._callFUT(
+ header='header',
+ )
+ order6, _, _ = self._callFUT(
+ accept='accept',
+ )
+ order7, _, _ = self._callFUT(
+ containment='containment',
+ )
+ order8, _, _ = self._callFUT(
+ request_type='request_type',
+ )
+ order9, _, _ = self._callFUT(
+ custom=('a',),
+ )
+ self.failUnless(order1 > order2)
+ self.failUnless(order2 > order3)
+ self.failUnless(order3 > order4)
+ self.failUnless(order4 > order5)
+ self.failUnless(order5 > order6)
+ self.failUnless(order6 > order7)
+ self.failUnless(order7 > order8)
+ self.failUnless(order8 > order9)
+
+ def test_ordering_importance_and_number(self):
+ order1, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ )
+ order2, _, _ = self._callFUT(
+ custom=('a',),
+ )
+ self.failUnless(order1 < order2)
+
+ order1, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ )
+ order2, _, _ = self._callFUT(
+ request_method='request_method',
+ custom=('a',),
+ )
+ self.failUnless(order1 > order2)
+
+ order1, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ )
+ order2, _, _ = self._callFUT(
+ request_method='request_method',
+ custom=('a',),
+ )
+ self.failUnless(order1 < order2)
+
+ order1, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ path_info='path_info',
+ )
+ order2, _, _ = self._callFUT(
+ xhr='xhr',
+ request_method='request_method',
+ custom=('a',),
+ )
+ self.failUnless(order1 > order2)
class TestMultiView(unittest.TestCase):
def _getTargetClass(self):
@@ -2418,20 +2984,33 @@ class TestMultiView(unittest.TestCase):
def test_add(self):
mv = self._makeOne()
mv.add('view', 100)
- self.assertEqual(mv.views, [(100, 'view')])
+ self.assertEqual(mv.views, [(100, 'view', None)])
mv.add('view2', 99)
- self.assertEqual(mv.views, [(99, 'view2'), (100, 'view')])
+ self.assertEqual(mv.views, [(99, 'view2', None), (100, 'view', None)])
mv.add('view3', 100, 'text/html')
- self.assertEqual(mv.media_views['text/html'], [(100, 'view3')])
+ self.assertEqual(mv.media_views['text/html'], [(100, 'view3', None)])
mv.add('view4', 99, 'text/html')
self.assertEqual(mv.media_views['text/html'],
- [(99, 'view4'), (100, 'view3')])
+ [(99, 'view4', None), (100, 'view3', None)])
mv.add('view5', 100, 'text/xml')
- self.assertEqual(mv.media_views['text/xml'], [(100, 'view5')])
+ self.assertEqual(mv.media_views['text/xml'], [(100, 'view5', None)])
self.assertEqual(set(mv.accepts), set(['text/xml', 'text/html']))
- self.assertEqual(mv.views, [(99, 'view2'), (100, 'view')])
+ self.assertEqual(mv.views, [(99, 'view2', None), (100, 'view', None)])
mv.add('view6', 98, 'text/*')
- self.assertEqual(mv.views, [(98, 'view6'),(99, 'view2'), (100, 'view')])
+ self.assertEqual(mv.views, [(98, 'view6', None),
+ (99, 'view2', None),
+ (100, 'view', None)])
+
+ def test_add_with_phash(self):
+ mv = self._makeOne()
+ mv.add('view', 100, phash='abc')
+ self.assertEqual(mv.views, [(100, 'view', 'abc')])
+ mv.add('view', 100, phash='abc')
+ self.assertEqual(mv.views, [(100, 'view', 'abc')])
+ mv.add('view', 100, phash='def')
+ self.assertEqual(mv.views, [(100, 'view', 'abc'), (100, 'view', 'def')])
+ mv.add('view', 100, phash='abc')
+ self.assertEqual(mv.views, [(100, 'view', 'abc'), (100, 'view', 'def')])
def test_get_views_request_has_no_accept(self):
request = DummyRequest()
@@ -2478,7 +3057,7 @@ class TestMultiView(unittest.TestCase):
def view(context, request):
""" """
view.__predicated__ = lambda *arg: False
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
context = DummyContext()
request = DummyRequest()
self.assertRaises(NotFound, mv.match, context, request)
@@ -2488,7 +3067,7 @@ class TestMultiView(unittest.TestCase):
def view(context, request):
""" """
view.__predicated__ = lambda *arg: True
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
context = DummyContext()
request = DummyRequest()
result = mv.match(context, request)
@@ -2505,7 +3084,7 @@ class TestMultiView(unittest.TestCase):
mv = self._makeOne()
def view(context, request):
""" """
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
self.assertEqual(mv.__permitted__(None, None), True)
def test_permitted(self):
@@ -2515,7 +3094,7 @@ class TestMultiView(unittest.TestCase):
def permitted(context, request):
return False
view.__permitted__ = permitted
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
context = DummyContext()
request = DummyRequest()
result = mv.__permitted__(context, request)
@@ -2539,7 +3118,7 @@ class TestMultiView(unittest.TestCase):
raise NotFound
def view2(context, request):
return expected_response
- mv.views = [(100, view1), (99, view2)]
+ mv.views = [(100, view1, None), (99, view2, None)]
response = mv(context, request)
self.assertEqual(response, expected_response)
@@ -2551,7 +3130,7 @@ class TestMultiView(unittest.TestCase):
expected_response = DummyResponse()
def view(context, request):
return expected_response
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
response = mv(context, request)
self.assertEqual(response, expected_response)
@@ -2573,7 +3152,7 @@ class TestMultiView(unittest.TestCase):
def permissive(context, request):
return expected_response
view.__call_permissive__ = permissive
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
response = mv.__call_permissive__(context, request)
self.assertEqual(response, expected_response)
@@ -2585,7 +3164,7 @@ class TestMultiView(unittest.TestCase):
expected_response = DummyResponse()
def view(context, request):
return expected_response
- mv.views = [(100, view)]
+ mv.views = [(100, view, None)]
response = mv.__call_permissive__(context, request)
self.assertEqual(response, expected_response)
@@ -2598,7 +3177,7 @@ class TestMultiView(unittest.TestCase):
def view(context, request):
return expected_response
mv.views = [(100, None)]
- mv.media_views['text/xml'] = [(100, view)]
+ mv.media_views['text/xml'] = [(100, view, None)]
mv.accepts = ['text/xml']
response = mv(context, request)
self.assertEqual(response, expected_response)
@@ -2611,8 +3190,8 @@ class TestMultiView(unittest.TestCase):
expected_response = DummyResponse()
def view(context, request):
return expected_response
- mv.views = [(100, view)]
- mv.media_views['text/xml'] = [(100, None)]
+ mv.views = [(100, view, None)]
+ mv.media_views['text/xml'] = [(100, None, None)]
mv.accepts = ['text/xml']
response = mv(context, request)
self.assertEqual(response, expected_response)
@@ -2932,8 +3511,8 @@ class DummyMultiView:
def __init__(self):
self.views = []
self.name = 'name'
- def add(self, view, score, accept=None):
- self.views.append((view, accept))
+ def add(self, view, order, accept=None, phash=None):
+ self.views.append((view, accept, phash))
def __call__(self, context, request):
return 'OK1'
def __permitted__(self, context, request):
diff --git a/repoze/bfg/tests/test_integration.py b/repoze/bfg/tests/test_integration.py
index 41144f7c3..c54509378 100644
--- a/repoze/bfg/tests/test_integration.py
+++ b/repoze/bfg/tests/test_integration.py
@@ -31,12 +31,14 @@ class WGSIAppPlusBFGViewTests(unittest.TestCase):
def test_scanned(self):
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.configuration import Configurator
from repoze.bfg.tests import test_integration
config = Configurator()
config.scan(test_integration)
reg = config.registry
- view = reg.adapters.lookup((IRequest, INothing), IView, name='')
+ view = reg.adapters.lookup(
+ (IViewClassifier, IRequest, INothing), IView, name='')
self.assertEqual(view, wsgiapptest)
here = os.path.dirname(__file__)
@@ -63,11 +65,12 @@ class TestStaticApp(unittest.TestCase):
open(os.path.join(here, 'fixtures/minimal.pt'), 'r').read())
class TwillBase(unittest.TestCase):
+ root_factory = None
def setUp(self):
import sys
import twill
from repoze.bfg.configuration import Configurator
- config = Configurator()
+ config = Configurator(root_factory=self.root_factory)
config.load_zcml(self.config)
twill.add_wsgi_intercept('localhost', 6543, config.make_wsgi_app)
if sys.platform is 'win32': # pragma: no cover
@@ -98,6 +101,11 @@ class TestFixtureApp(TwillBase):
self.assertEqual(browser.get_html(), 'fixture')
browser.go('http://localhost:6543/dummyskin.html')
self.assertEqual(browser.get_code(), 404)
+ browser.go('http://localhost:6543/error.html')
+ self.assertEqual(browser.get_code(), 200)
+ self.assertEqual(browser.get_html(), 'supressed')
+ browser.go('http://localhost:6543/protected.html')
+ self.assertEqual(browser.get_code(), 401)
class TestCCBug(TwillBase):
# "unordered" as reported in IRC by author of
@@ -140,6 +148,15 @@ class TestHybridApp(TwillBase):
browser.go('http://localhost:6543/pqr/global2')
self.assertEqual(browser.get_code(), 200)
self.assertEqual(browser.get_html(), 'global2')
+ browser.go('http://localhost:6543/error')
+ self.assertEqual(browser.get_code(), 200)
+ self.assertEqual(browser.get_html(), 'supressed')
+ browser.go('http://localhost:6543/error2')
+ self.assertEqual(browser.get_code(), 200)
+ self.assertEqual(browser.get_html(), 'supressed2')
+ browser.go('http://localhost:6543/error_sub')
+ self.assertEqual(browser.get_code(), 200)
+ self.assertEqual(browser.get_html(), 'supressed2')
class TestRestBugApp(TwillBase):
# test bug reported by delijati 2010/2/3 (http://pastebin.com/d4cc15515)
@@ -168,6 +185,44 @@ class TestViewDecoratorApp(TwillBase):
self.assertEqual(browser.get_code(), 200)
self.failUnless('OK3' in browser.get_html())
+from repoze.bfg.tests.exceptionviewapp.models import AnException, NotAnException
+excroot = {'anexception':AnException(),
+ 'notanexception':NotAnException()}
+
+class TestExceptionViewsApp(TwillBase):
+ config = 'repoze.bfg.tests.exceptionviewapp:configure.zcml'
+ root_factory = lambda *arg: excroot
+ def test_it(self):
+ import twill.commands
+ browser = twill.commands.get_browser()
+ browser.go('http://localhost:6543/')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('maybe' in browser.get_html())
+
+ browser.go('http://localhost:6543/notanexception')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('no' in browser.get_html())
+
+ browser.go('http://localhost:6543/anexception')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('yes' in browser.get_html())
+
+ browser.go('http://localhost:6543/route_raise_exception')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('yes' in browser.get_html())
+
+ browser.go('http://localhost:6543/route_raise_exception2')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('yes' in browser.get_html())
+
+ browser.go('http://localhost:6543/route_raise_exception3')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('whoa' in browser.get_html())
+
+ browser.go('http://localhost:6543/route_raise_exception4')
+ self.assertEqual(browser.get_code(), 200)
+ self.failUnless('whoa' in browser.get_html())
+
class DummyContext(object):
pass
diff --git a/repoze/bfg/tests/test_request.py b/repoze/bfg/tests/test_request.py
index 06255721e..7b3d0ce7b 100644
--- a/repoze/bfg/tests/test_request.py
+++ b/repoze/bfg/tests/test_request.py
@@ -165,6 +165,8 @@ class Test_route_request_iface(unittest.TestCase):
def test_it(self):
iface = self._callFUT('routename')
self.assertEqual(iface.__name__, 'routename_IRequest')
+ self.assertTrue(hasattr(iface, 'combined'))
+ self.assertEqual(iface.combined.__name__, 'routename_combined_IRequest')
class Test_add_global_response_headers(unittest.TestCase):
def _callFUT(self, request, headerlist):
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index 0d7bee720..8702b9317 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -13,11 +13,10 @@ class TestRouter(unittest.TestCase):
def _registerRouteRequest(self, name):
from repoze.bfg.interfaces import IRouteRequest
- from zope.interface import Interface
- class IRequest(Interface):
- """ """
- self.registry.registerUtility(IRequest, IRouteRequest, name=name)
- return IRequest
+ from repoze.bfg.request import route_request_iface
+ iface = route_request_iface(name)
+ self.registry.registerUtility(iface, IRouteRequest, name=name)
+ return iface
def _connectRoute(self, path, name, factory=None):
from repoze.bfg.interfaces import IRoutesMapper
@@ -75,9 +74,10 @@ class TestRouter(unittest.TestCase):
self.registry.registerAdapter(DummyTraverserFactory, (None,),
ITraverser, name='')
- def _registerView(self, app, name, *for_):
+ def _registerView(self, app, name, classifier, req_iface, ctx_iface):
from repoze.bfg.interfaces import IView
- self.registry.registerAdapter(app, for_, IView, name)
+ self.registry.registerAdapter(
+ app, (classifier, req_iface, ctx_iface), IView, name)
def _registerEventListener(self, iface):
L = []
@@ -118,44 +118,15 @@ class TestRouter(unittest.TestCase):
router = self._makeOne()
self.assertEqual(router.root_policy, rootfactory)
- def test_iforbiddenview_override(self):
- from repoze.bfg.interfaces import IForbiddenView
- def app():
- """ """
- self.registry.registerUtility(app, IForbiddenView)
- router = self._makeOne()
- self.assertEqual(router.forbidden_view, app)
-
- def test_iforbiddenview_nooverride(self):
- router = self._makeOne()
- from repoze.bfg.view import default_forbidden_view
- self.assertEqual(router.forbidden_view, default_forbidden_view)
-
- def test_inotfoundview_override(self):
- from repoze.bfg.interfaces import INotFoundView
- def app():
- """ """
- self.registry.registerUtility(app, INotFoundView)
- router = self._makeOne()
- self.assertEqual(router.notfound_view, app)
-
- def test_inotfoundview_nooverride(self):
- router = self._makeOne()
- from repoze.bfg.view import default_notfound_view
- self.assertEqual(router.notfound_view, default_notfound_view)
-
def test_call_traverser_default(self):
+ from repoze.bfg.exceptions import NotFound
environ = self._makeEnviron()
logger = self._registerLogger()
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '404 Not Found')
- self.failUnless('<code>/</code>' in result[0], result)
- self.failIf('debug_notfound' in result[0])
+ why = exc_raised(NotFound, router, environ, start_response)
+ self.failUnless('/' in why[0], why)
+ self.failIf('debug_notfound' in why[0])
self.assertEqual(len(logger.messages), 0)
def test_traverser_raises_notfound_class(self):
@@ -165,12 +136,7 @@ class TestRouter(unittest.TestCase):
self._registerTraverserFactory(context, raise_error=NotFound)
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '404 Not Found')
- self.failUnless('<code></code>' in result[0], result)
+ self.assertRaises(NotFound, router, environ, start_response)
def test_traverser_raises_notfound_instance(self):
from repoze.bfg.exceptions import NotFound
@@ -179,12 +145,8 @@ class TestRouter(unittest.TestCase):
self._registerTraverserFactory(context, raise_error=NotFound('foo'))
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '404 Not Found')
- self.failUnless('<code>foo</code>' in result[0], result)
+ why = exc_raised(NotFound, router, environ, start_response)
+ self.failUnless('foo' in why[0], why)
def test_traverser_raises_forbidden_class(self):
from repoze.bfg.exceptions import Forbidden
@@ -193,12 +155,7 @@ class TestRouter(unittest.TestCase):
self._registerTraverserFactory(context, raise_error=Forbidden)
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '401 Unauthorized')
- self.failUnless('<code></code>' in result[0], result)
+ self.assertRaises(Forbidden, router, environ, start_response)
def test_traverser_raises_forbidden_instance(self):
from repoze.bfg.exceptions import Forbidden
@@ -207,30 +164,24 @@ class TestRouter(unittest.TestCase):
self._registerTraverserFactory(context, raise_error=Forbidden('foo'))
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '401 Unauthorized')
- self.failUnless('<code>foo</code>' in result[0], result)
+ why = exc_raised(Forbidden, router, environ, start_response)
+ self.failUnless('foo' in why[0], why)
def test_call_no_view_registered_no_isettings(self):
+ from repoze.bfg.exceptions import NotFound
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
logger = self._registerLogger()
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '404 Not Found')
- self.failUnless('<code>/</code>' in result[0], result)
- self.failIf('debug_notfound' in result[0])
+ why = exc_raised(NotFound, router, environ, start_response)
+ self.failUnless('/' in why[0], why)
+ self.failIf('debug_notfound' in why[0])
self.assertEqual(len(logger.messages), 0)
def test_call_no_view_registered_debug_notfound_false(self):
+ from repoze.bfg.exceptions import NotFound
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
@@ -238,16 +189,13 @@ class TestRouter(unittest.TestCase):
self._registerSettings(debug_notfound=False)
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '404 Not Found')
- self.failUnless('<code>/</code>' in result[0], result)
- self.failIf('debug_notfound' in result[0])
+ why = exc_raised(NotFound, router, environ, start_response)
+ self.failUnless('/' in why[0], why)
+ self.failIf('debug_notfound' in why[0])
self.assertEqual(len(logger.messages), 0)
def test_call_no_view_registered_debug_notfound_true(self):
+ from repoze.bfg.exceptions import NotFound
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
@@ -255,17 +203,13 @@ class TestRouter(unittest.TestCase):
logger = self._registerLogger()
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- headers = start_response.headers
- self.assertEqual(len(headers), 2)
- status = start_response.status
- self.assertEqual(status, '404 Not Found')
+ why = exc_raised(NotFound, router, environ, start_response)
self.failUnless(
"debug_notfound of url http://localhost:8080/; path_info: '/', "
- "context:" in result[0])
- self.failUnless(
- "view_name: '', subpath: []" in result[0])
- self.failUnless('http://localhost:8080' in result[0], result)
+ "context:" in why[0])
+ self.failUnless("view_name: '', subpath: []" in why[0])
+ self.failUnless('http://localhost:8080' in why[0], why)
+
self.assertEqual(len(logger.messages), 1)
message = logger.messages[0]
self.failUnless('of url http://localhost:8080' in message)
@@ -275,35 +219,25 @@ class TestRouter(unittest.TestCase):
self.failUnless("subpath: []" in message)
def test_call_view_returns_nonresponse(self):
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
self._registerTraverserFactory(context)
environ = self._makeEnviron()
view = DummyView('abc')
- self._registerView(view, '', None, None)
- router = self._makeOne()
- start_response = DummyStartResponse()
- self.assertRaises(ValueError, router, environ, start_response)
-
- def test_inotfoundview_returns_nonresponse(self):
- from repoze.bfg.interfaces import INotFoundView
- context = DummyContext()
- environ = self._makeEnviron()
- self._registerTraverserFactory(context)
- def app(context, request):
- """ """
- self.registry.registerUtility(app, INotFoundView)
+ self._registerView(view, '', IViewClassifier, None, None)
router = self._makeOne()
start_response = DummyStartResponse()
self.assertRaises(ValueError, router, environ, start_response)
def test_call_view_registered_nonspecific_default_path(self):
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
- self._registerView(view, '', None, None)
+ self._registerView(view, '', IViewClassifier, None, None)
self._registerRootFactory(context)
router = self._makeOne()
start_response = DummyStartResponse()
@@ -318,6 +252,7 @@ class TestRouter(unittest.TestCase):
self.assertEqual(request.root, context)
def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self):
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
self._registerTraverserFactory(context, view_name='foo',
subpath=['bar'],
@@ -327,7 +262,7 @@ class TestRouter(unittest.TestCase):
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
- self._registerView(view, 'foo', None, None)
+ self._registerView(view, 'foo', IViewClassifier, None, None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -346,6 +281,7 @@ class TestRouter(unittest.TestCase):
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context)
@@ -354,7 +290,7 @@ class TestRouter(unittest.TestCase):
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
- self._registerView(view, '', IRequest, IContext)
+ self._registerView(view, '', IViewClassifier, IRequest, IContext)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -370,6 +306,8 @@ class TestRouter(unittest.TestCase):
def test_call_view_registered_specific_fail(self):
from zope.interface import Interface
from zope.interface import directlyProvides
+ from repoze.bfg.exceptions import NotFound
+ from repoze.bfg.interfaces import IViewClassifier
class IContext(Interface):
pass
class INotContext(Interface):
@@ -381,31 +319,30 @@ class TestRouter(unittest.TestCase):
response = DummyResponse()
view = DummyView(response)
environ = self._makeEnviron()
- self._registerView(view, '', IRequest, IContext)
+ self._registerView(view, '', IViewClassifier, IRequest, IContext)
router = self._makeOne()
start_response = DummyStartResponse()
- result = router(environ, start_response)
- self.assertEqual(start_response.status, '404 Not Found')
- self.failUnless('404' in result[0])
+ self.assertRaises(NotFound, router, environ, start_response)
def test_call_view_raises_forbidden(self):
from zope.interface import Interface
from zope.interface import directlyProvides
+ from repoze.bfg.exceptions import Forbidden
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
- view = DummyView(response, raise_unauthorized=True)
+ view = DummyView(response, raise_exception=Forbidden("unauthorized"))
environ = self._makeEnviron()
- self._registerView(view, '', IRequest, IContext)
+ self._registerView(view, '', IViewClassifier, IRequest, IContext)
router = self._makeOne()
start_response = DummyStartResponse()
- response = router(environ, start_response)
- self.assertEqual(start_response.status, '401 Unauthorized')
- self.assertEqual(environ['repoze.bfg.message'], 'unauthorized')
+ why = exc_raised(Forbidden, router, environ, start_response)
+ self.assertEqual(why[0], 'unauthorized')
def test_call_view_raises_notfound(self):
from zope.interface import Interface
@@ -413,18 +350,19 @@ class TestRouter(unittest.TestCase):
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.exceptions import NotFound
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
- view = DummyView(response, raise_notfound=True)
+ view = DummyView(response, raise_exception=NotFound("notfound"))
environ = self._makeEnviron()
- self._registerView(view, '', IRequest, IContext)
+ self._registerView(view, '', IViewClassifier, IRequest, IContext)
router = self._makeOne()
start_response = DummyStartResponse()
- response = router(environ, start_response)
- self.assertEqual(start_response.status, '404 Not Found')
- self.assertEqual(environ['repoze.bfg.message'], 'notfound')
+ why = exc_raised(NotFound, router, environ, start_response)
+ self.assertEqual(why[0], 'notfound')
def test_call_request_has_global_response_headers(self):
from zope.interface import Interface
@@ -432,6 +370,7 @@ class TestRouter(unittest.TestCase):
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
@@ -441,7 +380,7 @@ class TestRouter(unittest.TestCase):
request.global_response_headers = [('b', 2)]
return response
environ = self._makeEnviron()
- self._registerView(view, '', IRequest, IContext)
+ self._registerView(view, '', IViewClassifier, IRequest, IContext)
router = self._makeOne()
start_response = DummyStartResponse()
router(environ, start_response)
@@ -452,13 +391,14 @@ class TestRouter(unittest.TestCase):
from repoze.bfg.interfaces import INewRequest
from repoze.bfg.interfaces import INewResponse
from repoze.bfg.interfaces import IAfterTraversal
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
- self._registerView(view, '', None, None)
+ self._registerView(view, '', IViewClassifier, None, None)
request_events = self._registerEventListener(INewRequest)
aftertraversal_events = self._registerEventListener(IAfterTraversal)
response_events = self._registerEventListener(INewResponse)
@@ -474,13 +414,14 @@ class TestRouter(unittest.TestCase):
self.assertEqual(result, response.app_iter)
def test_call_pushes_and_pops_threadlocal_manager(self):
+ from repoze.bfg.interfaces import IViewClassifier
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
- self._registerView(view, '', None, None)
+ self._registerView(view, '', IViewClassifier, None, None)
router = self._makeOne()
start_response = DummyStartResponse()
router.threadlocal_manager = DummyThreadLocalManager()
@@ -489,7 +430,8 @@ class TestRouter(unittest.TestCase):
self.assertEqual(len(router.threadlocal_manager.popped), 1)
def test_call_route_matches_and_has_factory(self):
- req_iface = self._registerRouteRequest('foo')
+ from repoze.bfg.interfaces import IViewClassifier
+ self._registerRouteRequest('foo')
root = object()
def factory(request):
return root
@@ -500,7 +442,7 @@ class TestRouter(unittest.TestCase):
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
- self._registerView(view, '', None, None)
+ self._registerView(view, '', IViewClassifier, None, None)
self._registerRootFactory(context)
router = self._makeOne()
start_response = DummyStartResponse()
@@ -522,9 +464,10 @@ class TestRouter(unittest.TestCase):
def test_call_route_matches_doesnt_overwrite_subscriber_iface(self):
from repoze.bfg.interfaces import INewRequest
+ from repoze.bfg.interfaces import IViewClassifier
from zope.interface import alsoProvides
from zope.interface import Interface
- req_iface = self._registerRouteRequest('foo')
+ self._registerRouteRequest('foo')
class IFoo(Interface):
pass
def listener(event):
@@ -540,7 +483,7 @@ class TestRouter(unittest.TestCase):
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
- self._registerView(view, '', None, None)
+ self._registerView(view, '', IViewClassifier, None, None)
self._registerRootFactory(context)
router = self._makeOne()
start_response = DummyStartResponse()
@@ -576,9 +519,8 @@ class TestRouter(unittest.TestCase):
environ = self._makeEnviron()
router = self._makeOne()
start_response = DummyStartResponse()
- app_iter = router(environ, start_response)
- self.assertEqual(start_response.status, '404 Not Found')
- self.failUnless('from root factory' in app_iter[0])
+ why = exc_raised(NotFound, router, environ, start_response)
+ self.failUnless('from root factory' in why[0])
def test_root_factory_raises_forbidden(self):
from repoze.bfg.interfaces import IRootFactory
@@ -595,29 +537,378 @@ class TestRouter(unittest.TestCase):
environ = self._makeEnviron()
router = self._makeOne()
start_response = DummyStartResponse()
+ why = exc_raised(Forbidden, router, environ, start_response)
+ self.failUnless('from root factory' in why[0])
+
+ def test_root_factory_exception_propagating(self):
+ from repoze.bfg.interfaces import IRootFactory
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ def rootfactory(request):
+ raise RuntimeError()
+ self.registry.registerUtility(rootfactory, IRootFactory)
+ class IContext(Interface):
+ pass
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ environ = self._makeEnviron()
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(RuntimeError, router, environ, start_response)
+
+ def test_traverser_exception_propagating(self):
+ environ = self._makeEnviron()
+ context = DummyContext()
+ self._registerTraverserFactory(context, raise_error=RuntimeError())
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(RuntimeError, router, environ, start_response)
+
+ def test_call_view_exception_propagating(self):
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ self._registerTraverserFactory(context, subpath=[''])
+ response = DummyResponse()
+ view = DummyView(response, raise_exception=RuntimeError)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IViewClassifier, IRequest, IContext)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(RuntimeError, router, environ, start_response)
+
+ def test_call_view_raises_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ response = DummyResponse()
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ view = DummyView(response, raise_exception=RuntimeError)
+ exception_view = DummyView(exception_response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, RuntimeError)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, world"])
+ self.assertEqual(view.request.exception.__class__, RuntimeError)
+
+ def test_call_view_raises_super_exception_sub_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ class SuperException(Exception):
+ pass
+ class SubException(SuperException):
+ pass
+ response = DummyResponse()
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ view = DummyView(response, raise_exception=SuperException)
+ exception_view = DummyView(exception_response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, SubException)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(SuperException, router, environ, start_response)
+
+ def test_call_view_raises_sub_exception_super_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ class SuperException(Exception):
+ pass
+ class SubException(SuperException):
+ pass
+ response = DummyResponse()
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ view = DummyView(response, raise_exception=SubException)
+ exception_view = DummyView(exception_response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, SuperException)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, world"])
+
+ def test_call_view_raises_exception_another_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ class MyException(Exception):
+ pass
+ class AnotherException(Exception):
+ pass
+ response = DummyResponse()
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ view = DummyView(response, raise_exception=MyException)
+ exception_view = DummyView(exception_response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, AnotherException)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(MyException, router, environ, start_response)
+
+ def test_root_factory_raises_exception_view(self):
+ from repoze.bfg.interfaces import IRootFactory
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ def rootfactory(request):
+ raise RuntimeError()
+ self.registry.registerUtility(rootfactory, IRootFactory)
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ exception_view = DummyView(exception_response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, RuntimeError)
+ environ = self._makeEnviron()
+ router = self._makeOne()
+ start_response = DummyStartResponse()
app_iter = router(environ, start_response)
- self.assertEqual(start_response.status, '401 Unauthorized')
- self.failUnless('from root factory' in app_iter[0])
+ self.assertEqual(app_iter, ["Hello, world"])
+
+ def test_traverser_raises_exception_view(self):
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ environ = self._makeEnviron()
+ context = DummyContext()
+ self._registerTraverserFactory(context, raise_error=RuntimeError())
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ exception_view = DummyView(exception_response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, RuntimeError)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, world"])
+
+ def test_exception_view_returns_non_response(self):
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ environ = self._makeEnviron()
+ response = DummyResponse()
+ view = DummyView(response, raise_exception=RuntimeError)
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ exception_view = DummyView(None)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, RuntimeError)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(ValueError, router, environ, start_response)
+
+ def test_call_route_raises_route_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=RuntimeError)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ req_iface, RuntimeError)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, world"])
+
+ def test_call_view_raises_exception_route_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=RuntimeError)
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ req_iface, RuntimeError)
+ environ = self._makeEnviron()
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ self.assertRaises(RuntimeError, router, environ, start_response)
+
+ def test_call_route_raises_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=RuntimeError)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, RuntimeError)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, world"])
+
+ def test_call_route_raises_super_exception_sub_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ class SuperException(Exception):
+ pass
+ class SubException(SuperException):
+ pass
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=SuperException)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, SubException)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ self.assertRaises(SuperException, router, environ, start_response)
+
+ def test_call_route_raises_sub_exception_super_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ class SuperException(Exception):
+ pass
+ class SubException(SuperException):
+ pass
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=SubException)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, SuperException)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, world"])
+
+ def test_call_route_raises_exception_another_exception_view(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ class MyException(Exception):
+ pass
+ class AnotherException(Exception):
+ pass
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=MyException)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, AnotherException)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ self.assertRaises(MyException, router, environ, start_response)
+
+ def test_call_route_raises_exception_view_specializing(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ from repoze.bfg.interfaces import IRequest
+ req_iface = self._registerRouteRequest('foo')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=RuntimeError)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ IRequest, RuntimeError)
+ response_spec = DummyResponse()
+ response_spec.app_iter = ["Hello, special world"]
+ exception_view_spec = DummyView(response_spec)
+ self._registerView(exception_view_spec, '', IExceptionViewClassifier,
+ req_iface, RuntimeError)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ result = router(environ, start_response)
+ self.assertEqual(result, ["Hello, special world"])
+
+ def test_call_route_raises_exception_view_another_route(self):
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ req_iface = self._registerRouteRequest('foo')
+ another_req_iface = self._registerRouteRequest('bar')
+ self._connectRoute('archives/:action/:article', 'foo', None)
+ view = DummyView(DummyResponse(), raise_exception=RuntimeError)
+ self._registerView(view, '', IViewClassifier, req_iface, None)
+ response = DummyResponse()
+ response.app_iter = ["Hello, world"]
+ exception_view = DummyView(response)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ another_req_iface, RuntimeError)
+ environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
+ start_response = DummyStartResponse()
+ router = self._makeOne()
+ self.assertRaises(RuntimeError, router, environ, start_response)
+
+ def test_call_view_raises_exception_view_route(self):
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.interfaces import IExceptionViewClassifier
+ req_iface = self._registerRouteRequest('foo')
+ response = DummyResponse()
+ exception_response = DummyResponse()
+ exception_response.app_iter = ["Hello, world"]
+ view = DummyView(response, raise_exception=RuntimeError)
+ exception_view = DummyView(exception_response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IViewClassifier, IRequest, None)
+ self._registerView(exception_view, '', IExceptionViewClassifier,
+ req_iface, RuntimeError)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ self.assertRaises(RuntimeError, router, environ, start_response)
class DummyContext:
pass
class DummyView:
- def __init__(self, response, raise_unauthorized=False,
- raise_notfound=False):
+ def __init__(self, response, raise_exception=None):
self.response = response
- self.raise_unauthorized = raise_unauthorized
- self.raise_notfound = raise_notfound
+ self.raise_exception = raise_exception
def __call__(self, context, request):
self.context = context
self.request = request
- if self.raise_unauthorized:
- from repoze.bfg.exceptions import Forbidden
- raise Forbidden('unauthorized')
- if self.raise_notfound:
- from repoze.bfg.exceptions import NotFound
- raise NotFound('notfound')
+ if not self.raise_exception is None:
+ raise self.raise_exception
return self.response
class DummyRootFactory:
@@ -662,3 +953,12 @@ class DummyLogger:
warn = info
debug = info
+def exc_raised(exc, func, *arg, **kw):
+ try:
+ func(*arg, **kw)
+ except exc, e:
+ return e
+ else:
+ raise AssertionError('%s not raised' % exc) # pragma: no cover
+
+
diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py
index 0a15831b7..13a0e2d9b 100644
--- a/repoze/bfg/tests/test_security.py
+++ b/repoze/bfg/tests/test_security.py
@@ -118,6 +118,7 @@ class TestViewExecutionPermitted(unittest.TestCase):
from repoze.bfg.threadlocal import get_current_registry
from zope.interface import Interface
from repoze.bfg.interfaces import ISecuredView
+ from repoze.bfg.interfaces import IViewClassifier
class Checker(object):
def __permitted__(self, context, request):
self.context = context
@@ -125,7 +126,7 @@ class TestViewExecutionPermitted(unittest.TestCase):
return allow
checker = Checker()
reg = get_current_registry()
- reg.registerAdapter(checker, (Interface, Interface),
+ reg.registerAdapter(checker, (IViewClassifier, Interface, Interface),
ISecuredView, view_name)
return checker
diff --git a/repoze/bfg/tests/test_view.py b/repoze/bfg/tests/test_view.py
index 5f053d94d..bcfa45e91 100644
--- a/repoze/bfg/tests/test_view.py
+++ b/repoze/bfg/tests/test_view.py
@@ -11,7 +11,8 @@ class BaseTest(object):
def _registerView(self, reg, app, name):
from repoze.bfg.interfaces import IRequest
- for_ = (IRequest, IContext)
+ from repoze.bfg.interfaces import IViewClassifier
+ for_ = (IViewClassifier, IRequest, IContext)
from repoze.bfg.interfaces import IView
reg.registerAdapter(app, for_, IView, name)
diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py
index 426e6e24d..452769de8 100644
--- a/repoze/bfg/tests/test_zcml.py
+++ b/repoze/bfg/tests/test_zcml.py
@@ -23,6 +23,7 @@ class TestViewDirective(unittest.TestCase):
def test_request_type_ashttpmethod(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRequest
context = DummyContext()
view = lambda *arg: None
@@ -37,7 +38,8 @@ class TestViewDirective(unittest.TestCase):
register = action['callable']
register()
reg = get_current_registry()
- wrapper = reg.adapters.lookup((IRequest, IDummy), IView, name='')
+ wrapper = reg.adapters.lookup(
+ (IViewClassifier, IRequest, IDummy), IView, name='')
request = DummyRequest()
request.method = 'GET'
self.assertEqual(wrapper.__predicated__(None, request), True)
@@ -48,6 +50,7 @@ class TestViewDirective(unittest.TestCase):
from zope.interface import directlyProvides
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRequest
context = DummyContext(IDummy)
view = lambda *arg: 'OK'
@@ -61,7 +64,8 @@ class TestViewDirective(unittest.TestCase):
register = actions[0]['callable']
register()
reg = get_current_registry()
- regview = reg.adapters.lookup((IRequest, IDummy), IView, name='')
+ regview = reg.adapters.lookup(
+ (IViewClassifier, IRequest, IDummy), IView, name='')
self.assertNotEqual(view, regview)
request = DummyRequest()
directlyProvides(request, IDummy)
@@ -81,6 +85,7 @@ class TestViewDirective(unittest.TestCase):
def test_with_dotted_renderer(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRendererFactory
from repoze.bfg.interfaces import IRequest
context = DummyContext()
@@ -100,12 +105,14 @@ class TestViewDirective(unittest.TestCase):
self.assertEqual(actions[0]['discriminator'], discrim)
register = actions[0]['callable']
register()
- regview = reg.adapters.lookup((IRequest, IDummy), IView, name='')
+ regview = reg.adapters.lookup(
+ (IViewClassifier, IRequest, IDummy), IView, name='')
self.assertEqual(regview(None, None).body, 'OK')
def test_with_custom_predicates(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRequest
context = DummyContext()
reg = get_current_registry()
@@ -125,12 +132,14 @@ class TestViewDirective(unittest.TestCase):
self.assertEqual(actions[0]['discriminator'], discrim)
register = actions[0]['callable']
register()
- regview = reg.adapters.lookup((IRequest, IDummy), IView, name='')
+ regview = reg.adapters.lookup(
+ (IViewClassifier, IRequest, IDummy), IView, name='')
self.assertEqual(regview(None, None), 'OK')
def test_context_trumps_for(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRequest
context = DummyContext()
reg = get_current_registry()
@@ -146,12 +155,14 @@ class TestViewDirective(unittest.TestCase):
self.assertEqual(actions[0]['discriminator'], discrim)
register = actions[0]['callable']
register()
- regview = reg.adapters.lookup((IRequest, IDummy), IView, name='')
+ regview = reg.adapters.lookup(
+ (IViewClassifier, IRequest, IDummy), IView, name='')
self.assertEqual(regview(None, None), 'OK')
def test_with_for(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRequest
context = DummyContext()
reg = get_current_registry()
@@ -166,7 +177,8 @@ class TestViewDirective(unittest.TestCase):
self.assertEqual(actions[0]['discriminator'], discrim)
register = actions[0]['callable']
register()
- regview = reg.adapters.lookup((IRequest, IDummy), IView, name='')
+ regview = reg.adapters.lookup(
+ (IViewClassifier, IRequest, IDummy), IView, name='')
self.assertEqual(regview(None, None), 'OK')
class TestNotFoundDirective(unittest.TestCase):
@@ -176,13 +188,17 @@ class TestNotFoundDirective(unittest.TestCase):
def tearDown(self):
testing.tearDown()
- def _callFUT(self, context, view):
+ def _callFUT(self, context, view, **kw):
from repoze.bfg.zcml import notfound
- return notfound(context, view)
+ return notfound(context, view, **kw)
def test_it(self):
+ from zope.interface import implementedBy
from repoze.bfg.threadlocal import get_current_registry
- from repoze.bfg.interfaces import INotFoundView
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.exceptions import NotFound
context = DummyContext()
def view(request):
@@ -191,14 +207,48 @@ class TestNotFoundDirective(unittest.TestCase):
actions = context.actions
self.assertEqual(len(actions), 1)
+ discrim = ('view', NotFound, '', None, IView, None, None, None, None,
+ None, False, None, None, None)
regadapt = actions[0]
- self.assertEqual(regadapt['discriminator'], INotFoundView)
+ self.assertEqual(regadapt['discriminator'], discrim)
register = regadapt['callable']
register()
reg = get_current_registry()
- derived_view = reg.getUtility(INotFoundView)
+ derived_view = reg.adapters.lookup(
+ (IViewClassifier, IRequest, implementedBy(NotFound)),
+ IView, default=None)
+
+ self.assertNotEqual(derived_view, None)
self.assertEqual(derived_view(None, None), 'OK')
- self.assertEqual(derived_view.__name__, view.__name__)
+ self.assertEqual(derived_view.__name__, 'bwcompat_view')
+
+ def test_it_with_dotted_renderer(self):
+ from zope.interface import implementedBy
+ from repoze.bfg.threadlocal import get_current_registry
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.exceptions import NotFound
+ from repoze.bfg.configuration import Configurator
+ context = DummyContext()
+ reg = get_current_registry()
+ config = Configurator(reg)
+ def dummy_renderer_factory(*arg, **kw):
+ return lambda *arg, **kw: 'OK'
+ config.add_renderer('.pt', dummy_renderer_factory)
+ def view(request):
+ return {}
+ self._callFUT(context, view, renderer='fake.pt')
+ actions = context.actions
+ regadapt = actions[0]
+ register = regadapt['callable']
+ register()
+ derived_view = reg.adapters.lookup(
+ (IViewClassifier, IRequest, implementedBy(NotFound)),
+ IView, default=None)
+ self.assertNotEqual(derived_view, None)
+ self.assertEqual(derived_view(None, None).body, 'OK')
+ self.assertEqual(derived_view.__name__, 'bwcompat_view')
class TestForbiddenDirective(unittest.TestCase):
def setUp(self):
@@ -207,92 +257,67 @@ class TestForbiddenDirective(unittest.TestCase):
def tearDown(self):
testing.tearDown()
- def _callFUT(self, context, view):
+ def _callFUT(self, context, view, **kw):
from repoze.bfg.zcml import forbidden
- return forbidden(context, view)
+ return forbidden(context, view, **kw)
def test_it(self):
+ from zope.interface import implementedBy
from repoze.bfg.threadlocal import get_current_registry
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.exceptions import Forbidden
context = DummyContext()
def view(request):
return 'OK'
self._callFUT(context, view)
actions = context.actions
- from repoze.bfg.interfaces import IForbiddenView
self.assertEqual(len(actions), 1)
+ discrim = ('view', Forbidden, '', None, IView, None, None, None, None,
+ None, False, None, None, None)
regadapt = actions[0]
- self.assertEqual(regadapt['discriminator'], IForbiddenView)
+ self.assertEqual(regadapt['discriminator'], discrim)
register = regadapt['callable']
register()
reg = get_current_registry()
- derived_view = reg.getUtility(IForbiddenView)
- self.assertEqual(derived_view(None, None), 'OK')
- self.assertEqual(derived_view.__name__, view.__name__)
-
-class TestSystemViewHandler(unittest.TestCase):
- def setUp(self):
- testing.setUp()
-
- def tearDown(self):
- testing.tearDown()
+ derived_view = reg.adapters.lookup(
+ (IViewClassifier, IRequest, implementedBy(Forbidden)),
+ IView, default=None)
- def _makeOne(self, iface):
- from repoze.bfg.zcml import SystemViewHandler
- return SystemViewHandler(iface)
+ self.assertNotEqual(derived_view, None)
+ self.assertEqual(derived_view(None, None), 'OK')
+ self.assertEqual(derived_view.__name__, 'bwcompat_view')
- def test_no_view_no_renderer(self):
- handler = self._makeOne(IDummy)
- from repoze.bfg.exceptions import ConfigurationError
- context = DummyContext()
- handler(context)
- actions = context.actions
- self.assertEqual(len(actions), 1)
- regadapt = actions[0]
- self.assertEqual(regadapt['discriminator'], IDummy)
- register = regadapt['callable']
- self.assertRaises(ConfigurationError, register)
-
- def test_no_view_with_renderer(self):
+ def test_it_with_dotted_renderer(self):
+ from zope.interface import implementedBy
from repoze.bfg.threadlocal import get_current_registry
- from repoze.bfg.interfaces import IRendererFactory
- reg = get_current_registry()
- def renderer(path):
- return lambda *arg: 'OK'
- reg.registerUtility(renderer, IRendererFactory, name='dummy')
+ from repoze.bfg.interfaces import IRequest
+ from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
+ from repoze.bfg.exceptions import Forbidden
+ from repoze.bfg.configuration import Configurator
context = DummyContext()
- handler = self._makeOne(IDummy)
- handler(context, renderer='dummy')
- actions = context.actions
- self.assertEqual(len(actions), 1)
- regadapt = actions[0]
- self.assertEqual(regadapt['discriminator'], IDummy)
- register = regadapt['callable']
- register()
- derived_view = reg.getUtility(IDummy)
- request = DummyRequest()
- self.assertEqual(derived_view(None, request).body, 'OK')
-
- def test_template_renderer(self):
- from repoze.bfg.threadlocal import get_current_registry
- from repoze.bfg.interfaces import IRendererFactory
reg = get_current_registry()
- def renderer(path):
- return lambda *arg: 'OK'
- reg.registerUtility(renderer, IRendererFactory, name='.pt')
- context = DummyContext()
- handler = self._makeOne(IDummy)
- handler(context, renderer='fixtures/minimal.pt')
+ config = Configurator(reg)
+ def dummy_renderer_factory(*arg, **kw):
+ return lambda *arg, **kw: 'OK'
+ config.add_renderer('.pt', dummy_renderer_factory)
+ def view(request):
+ return {}
+ self._callFUT(context, view, renderer='fake.pt')
actions = context.actions
- self.assertEqual(len(actions), 1)
regadapt = actions[0]
- self.assertEqual(regadapt['discriminator'], IDummy)
register = regadapt['callable']
register()
- derived_view = reg.getUtility(IDummy)
- request = DummyRequest()
- self.assertEqual(derived_view(None, request).body, 'OK')
+ derived_view = reg.adapters.lookup(
+ (IViewClassifier, IRequest, implementedBy(Forbidden)),
+ IView, default=None)
+ self.assertNotEqual(derived_view, None)
+ self.assertEqual(derived_view(None, None).body, 'OK')
+ self.assertEqual(derived_view.__name__, 'bwcompat_view')
class TestRepozeWho1AuthenticationPolicyDirective(unittest.TestCase):
def setUp(self):
@@ -506,6 +531,7 @@ class TestRouteDirective(unittest.TestCase):
from repoze.bfg.threadlocal import get_current_registry
from zope.interface import Interface
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRouteRequest
context = DummyContext()
view = lambda *arg: 'OK'
@@ -526,12 +552,14 @@ class TestRouteDirective(unittest.TestCase):
view_discriminator = view_action['discriminator']
discrim = ('view', None, '', None, IView, 'name', None)
self.assertEqual(view_discriminator, discrim)
- wrapped = reg.adapters.lookup((request_type, Interface), IView, name='')
+ wrapped = reg.adapters.lookup(
+ (IViewClassifier, request_type, Interface), IView, name='')
self.failUnless(wrapped)
def test_with_view_and_view_context(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRouteRequest
context = DummyContext()
view = lambda *arg: 'OK'
@@ -552,12 +580,14 @@ class TestRouteDirective(unittest.TestCase):
view_discriminator = view_action['discriminator']
discrim = ('view', IDummy, '', None, IView, 'name', None)
self.assertEqual(view_discriminator, discrim)
- wrapped = reg.adapters.lookup((request_type, IDummy), IView, name='')
+ wrapped = reg.adapters.lookup(
+ (IViewClassifier, request_type, IDummy), IView, name='')
self.failUnless(wrapped)
def test_with_view_context_trumps_view_for(self):
from repoze.bfg.threadlocal import get_current_registry
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRouteRequest
context = DummyContext()
view = lambda *arg: 'OK'
@@ -581,7 +611,8 @@ class TestRouteDirective(unittest.TestCase):
view_discriminator = view_action['discriminator']
discrim = ('view', IDummy, '', None, IView, 'name', None)
self.assertEqual(view_discriminator, discrim)
- wrapped = reg.adapters.lookup((request_type, IDummy), IView, name='')
+ wrapped = reg.adapters.lookup(
+ (IViewClassifier, request_type, IDummy), IView, name='')
self.failUnless(wrapped)
def test_with_dotted_renderer(self):
@@ -589,9 +620,8 @@ class TestRouteDirective(unittest.TestCase):
from repoze.bfg.threadlocal import get_current_registry
from zope.interface import Interface
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRouteRequest
-
-
from repoze.bfg.interfaces import IRendererFactory
reg = get_current_registry()
def renderer(path):
@@ -617,7 +647,8 @@ class TestRouteDirective(unittest.TestCase):
view_discriminator = view_action['discriminator']
discrim = ('view', None, '', None, IView, 'name', None)
self.assertEqual(view_discriminator, discrim)
- wrapped = reg.adapters.lookup((request_type, Interface), IView, name='')
+ wrapped = reg.adapters.lookup(
+ (IViewClassifier, request_type, Interface), IView, name='')
self.failUnless(wrapped)
request = DummyRequest()
result = wrapped(None, request)
@@ -658,6 +689,7 @@ class TestStaticDirective(unittest.TestCase):
from zope.interface import implementedBy
from repoze.bfg.static import StaticRootFactory
from repoze.bfg.interfaces import IView
+ from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.interfaces import IRouteRequest
from repoze.bfg.interfaces import IRoutesMapper
context = DummyContext()
@@ -684,7 +716,8 @@ class TestStaticDirective(unittest.TestCase):
self.assertEqual(discriminator[4], IView)
iface = implementedBy(StaticRootFactory)
request_type = reg.getUtility(IRouteRequest, 'name')
- view = reg.adapters.lookup((request_type, iface), IView, name='')
+ view = reg.adapters.lookup(
+ (IViewClassifier, request_type, iface), IView, name='')
request = DummyRequest()
self.assertEqual(view(None, request).__class__, PackageURLParser)
diff --git a/repoze/bfg/view.py b/repoze/bfg/view.py
index 8c1430654..1f83faa5b 100644
--- a/repoze/bfg/view.py
+++ b/repoze/bfg/view.py
@@ -25,6 +25,7 @@ from zope.interface.advice import getFrameInfo
from repoze.bfg.interfaces import IResponseFactory
from repoze.bfg.interfaces import IRoutesMapper
from repoze.bfg.interfaces import IView
+from repoze.bfg.interfaces import IViewClassifier
from repoze.bfg.path import caller_package
from repoze.bfg.path import package_path
@@ -70,7 +71,7 @@ def render_view_to_response(context, request, name='', secure=True):
was disallowed.
If ``secure`` is ``False``, no permission checking is done."""
- provides = map(providedBy, (request, context))
+ provides = [IViewClassifier] + map(providedBy, (request, context))
try:
reg = request.registry
except AttributeError:
diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py
index 4ae04387f..3935303c5 100644
--- a/repoze/bfg/zcml.py
+++ b/repoze/bfg/zcml.py
@@ -18,8 +18,6 @@ from zope.schema import TextLine
from repoze.bfg.interfaces import IAuthenticationPolicy
from repoze.bfg.interfaces import IAuthorizationPolicy
-from repoze.bfg.interfaces import IForbiddenView
-from repoze.bfg.interfaces import INotFoundView
from repoze.bfg.interfaces import IRendererFactory
from repoze.bfg.interfaces import IRouteRequest
from repoze.bfg.interfaces import IView
@@ -30,6 +28,8 @@ from repoze.bfg.authentication import RepozeWho1AuthenticationPolicy
from repoze.bfg.authorization import ACLAuthorizationPolicy
from repoze.bfg.configuration import Configurator
from repoze.bfg.exceptions import ConfigurationError
+from repoze.bfg.exceptions import NotFound
+from repoze.bfg.exceptions import Forbidden
from repoze.bfg.request import route_request_iface
from repoze.bfg.resource import resource_spec_from_abspath
from repoze.bfg.static import StaticRootFactory
@@ -356,29 +356,52 @@ class ISystemViewDirective(Interface):
description = u'',
required=False)
-class SystemViewHandler(object):
- def __init__(self, iface):
- self.iface = iface
+def notfound(_context,
+ view=None,
+ attr=None,
+ renderer=None,
+ wrapper=None):
- def __call__(self, _context, view=None, attr=None, renderer=None,
- wrapper=None):
- if renderer and '.' in renderer:
- renderer = path_spec(_context, renderer)
+ if renderer and '.' in renderer:
+ renderer = path_spec(_context, renderer)
- def register(iface=self.iface):
- reg = get_current_registry()
- config = Configurator(reg, package=_context.package)
- config._system_view(iface, view=view, attr=attr, renderer=renderer,
- wrapper=wrapper, _info=_context.info)
+ def register():
+ reg = get_current_registry()
+ config = Configurator(reg, package=_context.package)
+ config.set_notfound_view(view=view, attr=attr, renderer=renderer,
+ wrapper=wrapper, _info=_context.info)
- _context.action(
- discriminator = self.iface,
- callable = register,
- )
-
-notfound = SystemViewHandler(INotFoundView)
-forbidden = SystemViewHandler(IForbiddenView)
+ discriminator = ('view', NotFound, '', None, IView, None, None, None,
+ None, attr, False, None, None, None)
+
+ _context.action(
+ discriminator = discriminator,
+ callable = register,
+ )
+
+def forbidden(_context,
+ view=None,
+ attr=None,
+ renderer=None,
+ wrapper=None):
+ if renderer and '.' in renderer:
+ renderer = path_spec(_context, renderer)
+
+ def register():
+ reg = get_current_registry()
+ config = Configurator(reg, package=_context.package)
+ config.set_forbidden_view(view=view, attr=attr, renderer=renderer,
+ wrapper=wrapper, _info=_context.info)
+
+ discriminator = ('view', Forbidden, '', None, IView, None, None, None,
+ None, attr, False, None, None, None)
+
+ _context.action(
+ discriminator = discriminator,
+ callable = register,
+ )
+
class IResourceDirective(Interface):
"""
Directive for specifying that one package may override resources from