summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pyramid/config/__init__.py571
-rw-r--r--src/pyramid/config/actions.py581
-rw-r--r--src/pyramid/config/adapters.py2
-rw-r--r--src/pyramid/config/assets.py2
-rw-r--r--src/pyramid/config/factories.py2
-rw-r--r--src/pyramid/config/i18n.py2
-rw-r--r--src/pyramid/config/predicates.py257
-rw-r--r--src/pyramid/config/rendering.py2
-rw-r--r--src/pyramid/config/routes.py10
-rw-r--r--src/pyramid/config/security.py2
-rw-r--r--src/pyramid/config/testing.py2
-rw-r--r--src/pyramid/config/tweens.py2
-rw-r--r--src/pyramid/config/util.py276
-rw-r--r--src/pyramid/config/views.py4
-rw-r--r--tests/test_config/test_actions.py1090
-rw-r--r--tests/test_config/test_init.py1038
-rw-r--r--tests/test_config/test_predicates.py (renamed from tests/test_config/test_util.py)62
-rw-r--r--tests/test_config/test_testing.py2
-rw-r--r--tests/test_config/test_views.py4
-rw-r--r--tests/test_viewderivers.py2
20 files changed, 1962 insertions, 1951 deletions
diff --git a/src/pyramid/config/__init__.py b/src/pyramid/config/__init__.py
index f5790352e..00c3e6a02 100644
--- a/src/pyramid/config/__init__.py
+++ b/src/pyramid/config/__init__.py
@@ -1,9 +1,6 @@
import inspect
-import itertools
import logging
-import operator
import os
-import sys
import threading
import venusian
@@ -12,7 +9,6 @@ from webob.exc import WSGIHTTPException as WebobWSGIHTTPException
from pyramid.interfaces import (
IDebugLogger,
IExceptionResponse,
- IPredicateList,
PHASE0_CONFIG,
PHASE1_CONFIG,
PHASE2_CONFIG,
@@ -23,21 +19,17 @@ from pyramid.asset import resolve_asset_spec
from pyramid.authorization import ACLAuthorizationPolicy
-from pyramid.compat import text_, reraise, string_types
+from pyramid.compat import text_, string_types
from pyramid.events import ApplicationCreated
-from pyramid.exceptions import (
- ConfigurationConflictError,
- ConfigurationError,
- ConfigurationExecutionError,
-)
+from pyramid.exceptions import ConfigurationError
from pyramid.httpexceptions import default_exceptionresponse_view
from pyramid.path import caller_package, package_of
-from pyramid.registry import Introspectable, Introspector, Registry, undefer
+from pyramid.registry import Introspectable, Introspector, Registry
from pyramid.router import Router
@@ -47,12 +39,15 @@ from pyramid.threadlocal import manager
from pyramid.util import WeakOrderedSet, object_description
-from pyramid.config.util import ActionInfo, PredicateList, action_method, not_
+from pyramid.config.actions import action_method, ActionState
+from pyramid.config.predicates import not_
+from pyramid.config.actions import ActionConfiguratorMixin
from pyramid.config.adapters import AdaptersConfiguratorMixin
from pyramid.config.assets import AssetsConfiguratorMixin
from pyramid.config.factories import FactoriesConfiguratorMixin
from pyramid.config.i18n import I18NConfiguratorMixin
+from pyramid.config.predicates import PredicateConfiguratorMixin
from pyramid.config.rendering import RenderingConfiguratorMixin
from pyramid.config.routes import RoutesConfiguratorMixin
from pyramid.config.security import SecurityConfiguratorMixin
@@ -74,8 +69,12 @@ PHASE1_CONFIG = PHASE1_CONFIG # api
PHASE2_CONFIG = PHASE2_CONFIG # api
PHASE3_CONFIG = PHASE3_CONFIG # api
+ActionState = ActionState # bw-compat for pyramid_zcml
+
class Configurator(
+ ActionConfiguratorMixin,
+ PredicateConfiguratorMixin,
TestingConfiguratorMixin,
TweensConfiguratorMixin,
SecurityConfiguratorMixin,
@@ -536,182 +535,6 @@ class Configurator(
_get_introspector, _set_introspector, _del_introspector
)
- def get_predlist(self, name):
- predlist = self.registry.queryUtility(IPredicateList, name=name)
- if predlist is None:
- predlist = PredicateList()
- self.registry.registerUtility(predlist, IPredicateList, name=name)
- return predlist
-
- def _add_predicate(
- self, type, name, factory, weighs_more_than=None, weighs_less_than=None
- ):
- factory = self.maybe_dotted(factory)
- discriminator = ('%s option' % type, name)
- intr = self.introspectable(
- '%s predicates' % type,
- discriminator,
- '%s predicate named %s' % (type, name),
- '%s predicate' % type,
- )
- intr['name'] = name
- intr['factory'] = factory
- intr['weighs_more_than'] = weighs_more_than
- intr['weighs_less_than'] = weighs_less_than
-
- def register():
- predlist = self.get_predlist(type)
- predlist.add(
- name,
- factory,
- weighs_more_than=weighs_more_than,
- weighs_less_than=weighs_less_than,
- )
-
- self.action(
- discriminator,
- register,
- introspectables=(intr,),
- order=PHASE1_CONFIG,
- ) # must be registered early
-
- @property
- def action_info(self):
- info = self.info # usually a ZCML action (ParserInfo) if self.info
- if not info:
- # Try to provide more accurate info for conflict reports
- if self._ainfo:
- info = self._ainfo[0]
- else:
- info = ActionInfo(None, 0, '', '')
- return info
-
- def action(
- self,
- discriminator,
- callable=None,
- args=(),
- kw=None,
- order=0,
- introspectables=(),
- **extra
- ):
- """ Register an action which will be executed when
- :meth:`pyramid.config.Configurator.commit` is called (or executed
- immediately if ``autocommit`` is ``True``).
-
- .. warning:: This method is typically only used by :app:`Pyramid`
- framework extension authors, not by :app:`Pyramid` application
- developers.
-
- The ``discriminator`` uniquely identifies the action. It must be
- given, but it can be ``None``, to indicate that the action never
- conflicts. It must be a hashable value.
-
- The ``callable`` is a callable object which performs the task
- associated with the action when the action is executed. It is
- optional.
-
- ``args`` and ``kw`` are tuple and dict objects respectively, which
- are passed to ``callable`` when this action is executed. Both are
- optional.
-
- ``order`` is a grouping mechanism; an action with a lower order will
- be executed before an action with a higher order (has no effect when
- autocommit is ``True``).
-
- ``introspectables`` is a sequence of :term:`introspectable` objects
- (or the empty sequence if no introspectable objects are associated
- with this action). If this configurator's ``introspection``
- attribute is ``False``, these introspectables will be ignored.
-
- ``extra`` provides a facility for inserting extra keys and values
- into an action dictionary.
- """
- # catch nonhashable discriminators here; most unit tests use
- # autocommit=False, which won't catch unhashable discriminators
- assert hash(discriminator)
-
- if kw is None:
- kw = {}
-
- autocommit = self.autocommit
- action_info = self.action_info
-
- if not self.introspection:
- # if we're not introspecting, ignore any introspectables passed
- # to us
- introspectables = ()
-
- if autocommit:
- # callables can depend on the side effects of resolving a
- # deferred discriminator
- self.begin()
- try:
- undefer(discriminator)
- if callable is not None:
- callable(*args, **kw)
- for introspectable in introspectables:
- introspectable.register(self.introspector, action_info)
- finally:
- self.end()
-
- else:
- action = extra
- action.update(
- dict(
- discriminator=discriminator,
- callable=callable,
- args=args,
- kw=kw,
- order=order,
- info=action_info,
- includepath=self.includepath,
- introspectables=introspectables,
- )
- )
- self.action_state.action(**action)
-
- def _get_action_state(self):
- registry = self.registry
- try:
- state = registry.action_state
- except AttributeError:
- state = ActionState()
- registry.action_state = state
- return state
-
- def _set_action_state(self, state):
- self.registry.action_state = state
-
- action_state = property(_get_action_state, _set_action_state)
-
- _ctx = action_state # bw compat
-
- def commit(self):
- """
- Commit any pending configuration actions. If a configuration
- conflict is detected in the pending configuration actions, this method
- will raise a :exc:`ConfigurationConflictError`; within the traceback
- of this error will be information about the source of the conflict,
- usually including file names and line numbers of the cause of the
- configuration conflicts.
-
- .. warning::
- You should think very carefully before manually invoking
- ``commit()``. Especially not as part of any reusable configuration
- methods. Normally it should only be done by an application author at
- the end of configuration in order to override certain aspects of an
- addon.
-
- """
- self.begin()
- try:
- self.action_state.execute_actions(introspector=self.introspector)
- finally:
- self.end()
- self.action_state = ActionState() # old actions have been processed
-
def include(self, callable, route_prefix=None):
"""Include a configuration callable, to support imperative
application extensibility.
@@ -1084,376 +907,4 @@ class Configurator(
return app
-# this class is licensed under the ZPL (stolen from Zope)
-class ActionState(object):
- def __init__(self):
- # NB "actions" is an API, dep'd upon by pyramid_zcml's load_zcml func
- self.actions = []
- self._seen_files = set()
-
- def processSpec(self, spec):
- """Check whether a callable needs to be processed. The ``spec``
- refers to a unique identifier for the callable.
-
- Return True if processing is needed and False otherwise. If
- the callable needs to be processed, it will be marked as
- processed, assuming that the caller will procces the callable if
- it needs to be processed.
- """
- if spec in self._seen_files:
- return False
- self._seen_files.add(spec)
- return True
-
- def action(
- self,
- discriminator,
- callable=None,
- args=(),
- kw=None,
- order=0,
- includepath=(),
- info=None,
- introspectables=(),
- **extra
- ):
- """Add an action with the given discriminator, callable and arguments
- """
- if kw is None:
- kw = {}
- action = extra
- action.update(
- dict(
- discriminator=discriminator,
- callable=callable,
- args=args,
- kw=kw,
- includepath=includepath,
- info=info,
- order=order,
- introspectables=introspectables,
- )
- )
- self.actions.append(action)
-
- def execute_actions(self, clear=True, introspector=None):
- """Execute the configuration actions
-
- This calls the action callables after resolving conflicts
-
- For example:
-
- >>> output = []
- >>> def f(*a, **k):
- ... output.append(('f', a, k))
- >>> context = ActionState()
- >>> context.actions = [
- ... (1, f, (1,)),
- ... (1, f, (11,), {}, ('x', )),
- ... (2, f, (2,)),
- ... ]
- >>> context.execute_actions()
- >>> output
- [('f', (1,), {}), ('f', (2,), {})]
-
- If the action raises an error, we convert it to a
- ConfigurationExecutionError.
-
- >>> output = []
- >>> def bad():
- ... bad.xxx
- >>> context.actions = [
- ... (1, f, (1,)),
- ... (1, f, (11,), {}, ('x', )),
- ... (2, f, (2,)),
- ... (3, bad, (), {}, (), 'oops')
- ... ]
- >>> try:
- ... v = context.execute_actions()
- ... except ConfigurationExecutionError, v:
- ... pass
- >>> print(v)
- exceptions.AttributeError: 'function' object has no attribute 'xxx'
- in:
- oops
-
- Note that actions executed before the error still have an effect:
-
- >>> output
- [('f', (1,), {}), ('f', (2,), {})]
-
- The execution is re-entrant such that actions may be added by other
- actions with the one caveat that the order of any added actions must
- be equal to or larger than the current action.
-
- >>> output = []
- >>> def f(*a, **k):
- ... output.append(('f', a, k))
- ... context.actions.append((3, g, (8,), {}))
- >>> def g(*a, **k):
- ... output.append(('g', a, k))
- >>> context.actions = [
- ... (1, f, (1,)),
- ... ]
- >>> context.execute_actions()
- >>> output
- [('f', (1,), {}), ('g', (8,), {})]
-
- """
- try:
- all_actions = []
- executed_actions = []
- action_iter = iter([])
- conflict_state = ConflictResolverState()
-
- while True:
- # We clear the actions list prior to execution so if there
- # are some new actions then we add them to the mix and resolve
- # conflicts again. This orders the new actions as well as
- # ensures that the previously executed actions have no new
- # conflicts.
- if self.actions:
- all_actions.extend(self.actions)
- action_iter = resolveConflicts(
- self.actions, state=conflict_state
- )
- self.actions = []
-
- action = next(action_iter, None)
- if action is None:
- # we are done!
- break
-
- callable = action['callable']
- args = action['args']
- kw = action['kw']
- info = action['info']
- # we use "get" below in case an action was added via a ZCML
- # directive that did not know about introspectables
- introspectables = action.get('introspectables', ())
-
- try:
- if callable is not None:
- callable(*args, **kw)
- except Exception:
- t, v, tb = sys.exc_info()
- try:
- reraise(
- ConfigurationExecutionError,
- ConfigurationExecutionError(t, v, info),
- tb,
- )
- finally:
- del t, v, tb
-
- if introspector is not None:
- for introspectable in introspectables:
- introspectable.register(introspector, info)
-
- executed_actions.append(action)
-
- self.actions = all_actions
- return executed_actions
-
- finally:
- if clear:
- self.actions = []
-
-
-class ConflictResolverState(object):
- def __init__(self):
- # keep a set of resolved discriminators to test against to ensure
- # that a new action does not conflict with something already executed
- self.resolved_ainfos = {}
-
- # actions left over from a previous iteration
- self.remaining_actions = []
-
- # after executing an action we memoize its order to avoid any new
- # actions sending us backward
- self.min_order = None
-
- # unique tracks the index of the action so we need it to increase
- # monotonically across invocations to resolveConflicts
- self.start = 0
-
-
-# this function is licensed under the ZPL (stolen from Zope)
-def resolveConflicts(actions, state=None):
- """Resolve conflicting actions
-
- Given an actions list, identify and try to resolve conflicting actions.
- Actions conflict if they have the same non-None discriminator.
-
- Conflicting actions can be resolved if the include path of one of
- the actions is a prefix of the includepaths of the other
- conflicting actions and is unequal to the include paths in the
- other conflicting actions.
-
- Actions are resolved on a per-order basis because some discriminators
- cannot be computed until earlier actions have executed. An action in an
- earlier order may execute successfully only to find out later that it was
- overridden by another action with a smaller include path. This will result
- in a conflict as there is no way to revert the original action.
-
- ``state`` may be an instance of ``ConflictResolverState`` that
- can be used to resume execution and resolve the new actions against the
- list of executed actions from a previous call.
-
- """
- if state is None:
- state = ConflictResolverState()
-
- # pick up where we left off last time, but track the new actions as well
- state.remaining_actions.extend(normalize_actions(actions))
- actions = state.remaining_actions
-
- def orderandpos(v):
- n, v = v
- return (v['order'] or 0, n)
-
- def orderonly(v):
- n, v = v
- return v['order'] or 0
-
- sactions = sorted(enumerate(actions, start=state.start), key=orderandpos)
- for order, actiongroup in itertools.groupby(sactions, orderonly):
- # "order" is an integer grouping. Actions in a lower order will be
- # executed before actions in a higher order. All of the actions in
- # one grouping will be executed (its callable, if any will be called)
- # before any of the actions in the next.
- output = []
- unique = {}
-
- # error out if we went backward in order
- if state.min_order is not None and order < state.min_order:
- r = [
- 'Actions were added to order={0} after execution had moved '
- 'on to order={1}. Conflicting actions: '.format(
- order, state.min_order
- )
- ]
- for i, action in actiongroup:
- for line in str(action['info']).rstrip().split('\n'):
- r.append(" " + line)
- raise ConfigurationError('\n'.join(r))
-
- for i, action in actiongroup:
- # Within an order, actions are executed sequentially based on
- # original action ordering ("i").
-
- # "ainfo" is a tuple of (i, action) where "i" is an integer
- # expressing the relative position of this action in the action
- # list being resolved, and "action" is an action dictionary. The
- # purpose of an ainfo is to associate an "i" with a particular
- # action; "i" exists for sorting after conflict resolution.
- ainfo = (i, action)
-
- # wait to defer discriminators until we are on their order because
- # the discriminator may depend on state from a previous order
- discriminator = undefer(action['discriminator'])
- action['discriminator'] = discriminator
-
- if discriminator is None:
- # The discriminator is None, so this action can never conflict.
- # We can add it directly to the result.
- output.append(ainfo)
- continue
-
- L = unique.setdefault(discriminator, [])
- L.append(ainfo)
-
- # Check for conflicts
- conflicts = {}
- for discriminator, ainfos in unique.items():
- # We use (includepath, i) as a sort key because we need to
- # sort the actions by the paths so that the shortest path with a
- # given prefix comes first. The "first" action is the one with the
- # shortest include path. We break sorting ties using "i".
- def bypath(ainfo):
- path, i = ainfo[1]['includepath'], ainfo[0]
- return path, order, i
-
- ainfos.sort(key=bypath)
- ainfo, rest = ainfos[0], ainfos[1:]
- _, action = ainfo
-
- # ensure this new action does not conflict with a previously
- # resolved action from an earlier order / invocation
- prev_ainfo = state.resolved_ainfos.get(discriminator)
- if prev_ainfo is not None:
- _, paction = prev_ainfo
- basepath, baseinfo = paction['includepath'], paction['info']
- includepath = action['includepath']
- # if the new action conflicts with the resolved action then
- # note the conflict, otherwise drop the action as it's
- # effectively overriden by the previous action
- if (
- includepath[: len(basepath)] != basepath
- or includepath == basepath
- ):
- L = conflicts.setdefault(discriminator, [baseinfo])
- L.append(action['info'])
-
- else:
- output.append(ainfo)
-
- basepath, baseinfo = action['includepath'], action['info']
- for _, action in rest:
- includepath = action['includepath']
- # Test whether path is a prefix of opath
- if (
- includepath[: len(basepath)] != basepath
- or includepath == basepath # not a prefix
- ):
- L = conflicts.setdefault(discriminator, [baseinfo])
- L.append(action['info'])
-
- if conflicts:
- raise ConfigurationConflictError(conflicts)
-
- # sort resolved actions by "i" and yield them one by one
- for i, action in sorted(output, key=operator.itemgetter(0)):
- # do not memoize the order until we resolve an action inside it
- state.min_order = action['order']
- state.start = i + 1
- state.remaining_actions.remove(action)
- state.resolved_ainfos[action['discriminator']] = (i, action)
- yield action
-
-
-def normalize_actions(actions):
- """Convert old-style tuple actions to new-style dicts."""
- result = []
- for v in actions:
- if not isinstance(v, dict):
- v = expand_action_tuple(*v)
- result.append(v)
- return result
-
-
-def expand_action_tuple(
- discriminator,
- callable=None,
- args=(),
- kw=None,
- includepath=(),
- info=None,
- order=0,
- introspectables=(),
-):
- if kw is None:
- kw = {}
- return dict(
- discriminator=discriminator,
- callable=callable,
- args=args,
- kw=kw,
- includepath=includepath,
- info=info,
- order=order,
- introspectables=introspectables,
- )
-
-
global_registries = WeakOrderedSet()
diff --git a/src/pyramid/config/actions.py b/src/pyramid/config/actions.py
new file mode 100644
index 000000000..9c1227d4a
--- /dev/null
+++ b/src/pyramid/config/actions.py
@@ -0,0 +1,581 @@
+import functools
+import itertools
+import operator
+import sys
+import traceback
+from zope.interface import implementer
+
+from pyramid.compat import reraise
+from pyramid.exceptions import (
+ ConfigurationConflictError,
+ ConfigurationError,
+ ConfigurationExecutionError,
+)
+from pyramid.interfaces import IActionInfo
+from pyramid.registry import undefer
+from pyramid.util import is_nonstr_iter
+
+
+class ActionConfiguratorMixin(object):
+ @property
+ def action_info(self):
+ info = self.info # usually a ZCML action (ParserInfo) if self.info
+ if not info:
+ # Try to provide more accurate info for conflict reports
+ if self._ainfo:
+ info = self._ainfo[0]
+ else:
+ info = ActionInfo(None, 0, '', '')
+ return info
+
+ def action(
+ self,
+ discriminator,
+ callable=None,
+ args=(),
+ kw=None,
+ order=0,
+ introspectables=(),
+ **extra
+ ):
+ """ Register an action which will be executed when
+ :meth:`pyramid.config.Configurator.commit` is called (or executed
+ immediately if ``autocommit`` is ``True``).
+
+ .. warning:: This method is typically only used by :app:`Pyramid`
+ framework extension authors, not by :app:`Pyramid` application
+ developers.
+
+ The ``discriminator`` uniquely identifies the action. It must be
+ given, but it can be ``None``, to indicate that the action never
+ conflicts. It must be a hashable value.
+
+ The ``callable`` is a callable object which performs the task
+ associated with the action when the action is executed. It is
+ optional.
+
+ ``args`` and ``kw`` are tuple and dict objects respectively, which
+ are passed to ``callable`` when this action is executed. Both are
+ optional.
+
+ ``order`` is a grouping mechanism; an action with a lower order will
+ be executed before an action with a higher order (has no effect when
+ autocommit is ``True``).
+
+ ``introspectables`` is a sequence of :term:`introspectable` objects
+ (or the empty sequence if no introspectable objects are associated
+ with this action). If this configurator's ``introspection``
+ attribute is ``False``, these introspectables will be ignored.
+
+ ``extra`` provides a facility for inserting extra keys and values
+ into an action dictionary.
+ """
+ # catch nonhashable discriminators here; most unit tests use
+ # autocommit=False, which won't catch unhashable discriminators
+ assert hash(discriminator)
+
+ if kw is None:
+ kw = {}
+
+ autocommit = self.autocommit
+ action_info = self.action_info
+
+ if not self.introspection:
+ # if we're not introspecting, ignore any introspectables passed
+ # to us
+ introspectables = ()
+
+ if autocommit:
+ # callables can depend on the side effects of resolving a
+ # deferred discriminator
+ self.begin()
+ try:
+ undefer(discriminator)
+ if callable is not None:
+ callable(*args, **kw)
+ for introspectable in introspectables:
+ introspectable.register(self.introspector, action_info)
+ finally:
+ self.end()
+
+ else:
+ action = extra
+ action.update(
+ dict(
+ discriminator=discriminator,
+ callable=callable,
+ args=args,
+ kw=kw,
+ order=order,
+ info=action_info,
+ includepath=self.includepath,
+ introspectables=introspectables,
+ )
+ )
+ self.action_state.action(**action)
+
+ def _get_action_state(self):
+ registry = self.registry
+ try:
+ state = registry.action_state
+ except AttributeError:
+ state = ActionState()
+ registry.action_state = state
+ return state
+
+ def _set_action_state(self, state):
+ self.registry.action_state = state
+
+ action_state = property(_get_action_state, _set_action_state)
+
+ _ctx = action_state # bw compat
+
+ def commit(self):
+ """
+ Commit any pending configuration actions. If a configuration
+ conflict is detected in the pending configuration actions, this method
+ will raise a :exc:`ConfigurationConflictError`; within the traceback
+ of this error will be information about the source of the conflict,
+ usually including file names and line numbers of the cause of the
+ configuration conflicts.
+
+ .. warning::
+ You should think very carefully before manually invoking
+ ``commit()``. Especially not as part of any reusable configuration
+ methods. Normally it should only be done by an application author at
+ the end of configuration in order to override certain aspects of an
+ addon.
+
+ """
+ self.begin()
+ try:
+ self.action_state.execute_actions(introspector=self.introspector)
+ finally:
+ self.end()
+ self.action_state = ActionState() # old actions have been processed
+
+
+# this class is licensed under the ZPL (stolen from Zope)
+class ActionState(object):
+ def __init__(self):
+ # NB "actions" is an API, dep'd upon by pyramid_zcml's load_zcml func
+ self.actions = []
+ self._seen_files = set()
+
+ def processSpec(self, spec):
+ """Check whether a callable needs to be processed. The ``spec``
+ refers to a unique identifier for the callable.
+
+ Return True if processing is needed and False otherwise. If
+ the callable needs to be processed, it will be marked as
+ processed, assuming that the caller will procces the callable if
+ it needs to be processed.
+ """
+ if spec in self._seen_files:
+ return False
+ self._seen_files.add(spec)
+ return True
+
+ def action(
+ self,
+ discriminator,
+ callable=None,
+ args=(),
+ kw=None,
+ order=0,
+ includepath=(),
+ info=None,
+ introspectables=(),
+ **extra
+ ):
+ """Add an action with the given discriminator, callable and arguments
+ """
+ if kw is None:
+ kw = {}
+ action = extra
+ action.update(
+ dict(
+ discriminator=discriminator,
+ callable=callable,
+ args=args,
+ kw=kw,
+ includepath=includepath,
+ info=info,
+ order=order,
+ introspectables=introspectables,
+ )
+ )
+ self.actions.append(action)
+
+ def execute_actions(self, clear=True, introspector=None):
+ """Execute the configuration actions
+
+ This calls the action callables after resolving conflicts
+
+ For example:
+
+ >>> output = []
+ >>> def f(*a, **k):
+ ... output.append(('f', a, k))
+ >>> context = ActionState()
+ >>> context.actions = [
+ ... (1, f, (1,)),
+ ... (1, f, (11,), {}, ('x', )),
+ ... (2, f, (2,)),
+ ... ]
+ >>> context.execute_actions()
+ >>> output
+ [('f', (1,), {}), ('f', (2,), {})]
+
+ If the action raises an error, we convert it to a
+ ConfigurationExecutionError.
+
+ >>> output = []
+ >>> def bad():
+ ... bad.xxx
+ >>> context.actions = [
+ ... (1, f, (1,)),
+ ... (1, f, (11,), {}, ('x', )),
+ ... (2, f, (2,)),
+ ... (3, bad, (), {}, (), 'oops')
+ ... ]
+ >>> try:
+ ... v = context.execute_actions()
+ ... except ConfigurationExecutionError, v:
+ ... pass
+ >>> print(v)
+ exceptions.AttributeError: 'function' object has no attribute 'xxx'
+ in:
+ oops
+
+ Note that actions executed before the error still have an effect:
+
+ >>> output
+ [('f', (1,), {}), ('f', (2,), {})]
+
+ The execution is re-entrant such that actions may be added by other
+ actions with the one caveat that the order of any added actions must
+ be equal to or larger than the current action.
+
+ >>> output = []
+ >>> def f(*a, **k):
+ ... output.append(('f', a, k))
+ ... context.actions.append((3, g, (8,), {}))
+ >>> def g(*a, **k):
+ ... output.append(('g', a, k))
+ >>> context.actions = [
+ ... (1, f, (1,)),
+ ... ]
+ >>> context.execute_actions()
+ >>> output
+ [('f', (1,), {}), ('g', (8,), {})]
+
+ """
+ try:
+ all_actions = []
+ executed_actions = []
+ action_iter = iter([])
+ conflict_state = ConflictResolverState()
+
+ while True:
+ # We clear the actions list prior to execution so if there
+ # are some new actions then we add them to the mix and resolve
+ # conflicts again. This orders the new actions as well as
+ # ensures that the previously executed actions have no new
+ # conflicts.
+ if self.actions:
+ all_actions.extend(self.actions)
+ action_iter = resolveConflicts(
+ self.actions, state=conflict_state
+ )
+ self.actions = []
+
+ action = next(action_iter, None)
+ if action is None:
+ # we are done!
+ break
+
+ callable = action['callable']
+ args = action['args']
+ kw = action['kw']
+ info = action['info']
+ # we use "get" below in case an action was added via a ZCML
+ # directive that did not know about introspectables
+ introspectables = action.get('introspectables', ())
+
+ try:
+ if callable is not None:
+ callable(*args, **kw)
+ except Exception:
+ t, v, tb = sys.exc_info()
+ try:
+ reraise(
+ ConfigurationExecutionError,
+ ConfigurationExecutionError(t, v, info),
+ tb,
+ )
+ finally:
+ del t, v, tb
+
+ if introspector is not None:
+ for introspectable in introspectables:
+ introspectable.register(introspector, info)
+
+ executed_actions.append(action)
+
+ self.actions = all_actions
+ return executed_actions
+
+ finally:
+ if clear:
+ self.actions = []
+
+
+class ConflictResolverState(object):
+ def __init__(self):
+ # keep a set of resolved discriminators to test against to ensure
+ # that a new action does not conflict with something already executed
+ self.resolved_ainfos = {}
+
+ # actions left over from a previous iteration
+ self.remaining_actions = []
+
+ # after executing an action we memoize its order to avoid any new
+ # actions sending us backward
+ self.min_order = None
+
+ # unique tracks the index of the action so we need it to increase
+ # monotonically across invocations to resolveConflicts
+ self.start = 0
+
+
+# this function is licensed under the ZPL (stolen from Zope)
+def resolveConflicts(actions, state=None):
+ """Resolve conflicting actions
+
+ Given an actions list, identify and try to resolve conflicting actions.
+ Actions conflict if they have the same non-None discriminator.
+
+ Conflicting actions can be resolved if the include path of one of
+ the actions is a prefix of the includepaths of the other
+ conflicting actions and is unequal to the include paths in the
+ other conflicting actions.
+
+ Actions are resolved on a per-order basis because some discriminators
+ cannot be computed until earlier actions have executed. An action in an
+ earlier order may execute successfully only to find out later that it was
+ overridden by another action with a smaller include path. This will result
+ in a conflict as there is no way to revert the original action.
+
+ ``state`` may be an instance of ``ConflictResolverState`` that
+ can be used to resume execution and resolve the new actions against the
+ list of executed actions from a previous call.
+
+ """
+ if state is None:
+ state = ConflictResolverState()
+
+ # pick up where we left off last time, but track the new actions as well
+ state.remaining_actions.extend(normalize_actions(actions))
+ actions = state.remaining_actions
+
+ def orderandpos(v):
+ n, v = v
+ return (v['order'] or 0, n)
+
+ def orderonly(v):
+ n, v = v
+ return v['order'] or 0
+
+ sactions = sorted(enumerate(actions, start=state.start), key=orderandpos)
+ for order, actiongroup in itertools.groupby(sactions, orderonly):
+ # "order" is an integer grouping. Actions in a lower order will be
+ # executed before actions in a higher order. All of the actions in
+ # one grouping will be executed (its callable, if any will be called)
+ # before any of the actions in the next.
+ output = []
+ unique = {}
+
+ # error out if we went backward in order
+ if state.min_order is not None and order < state.min_order:
+ r = [
+ 'Actions were added to order={0} after execution had moved '
+ 'on to order={1}. Conflicting actions: '.format(
+ order, state.min_order
+ )
+ ]
+ for i, action in actiongroup:
+ for line in str(action['info']).rstrip().split('\n'):
+ r.append(" " + line)
+ raise ConfigurationError('\n'.join(r))
+
+ for i, action in actiongroup:
+ # Within an order, actions are executed sequentially based on
+ # original action ordering ("i").
+
+ # "ainfo" is a tuple of (i, action) where "i" is an integer
+ # expressing the relative position of this action in the action
+ # list being resolved, and "action" is an action dictionary. The
+ # purpose of an ainfo is to associate an "i" with a particular
+ # action; "i" exists for sorting after conflict resolution.
+ ainfo = (i, action)
+
+ # wait to defer discriminators until we are on their order because
+ # the discriminator may depend on state from a previous order
+ discriminator = undefer(action['discriminator'])
+ action['discriminator'] = discriminator
+
+ if discriminator is None:
+ # The discriminator is None, so this action can never conflict.
+ # We can add it directly to the result.
+ output.append(ainfo)
+ continue
+
+ L = unique.setdefault(discriminator, [])
+ L.append(ainfo)
+
+ # Check for conflicts
+ conflicts = {}
+ for discriminator, ainfos in unique.items():
+ # We use (includepath, i) as a sort key because we need to
+ # sort the actions by the paths so that the shortest path with a
+ # given prefix comes first. The "first" action is the one with the
+ # shortest include path. We break sorting ties using "i".
+ def bypath(ainfo):
+ path, i = ainfo[1]['includepath'], ainfo[0]
+ return path, order, i
+
+ ainfos.sort(key=bypath)
+ ainfo, rest = ainfos[0], ainfos[1:]
+ _, action = ainfo
+
+ # ensure this new action does not conflict with a previously
+ # resolved action from an earlier order / invocation
+ prev_ainfo = state.resolved_ainfos.get(discriminator)
+ if prev_ainfo is not None:
+ _, paction = prev_ainfo
+ basepath, baseinfo = paction['includepath'], paction['info']
+ includepath = action['includepath']
+ # if the new action conflicts with the resolved action then
+ # note the conflict, otherwise drop the action as it's
+ # effectively overriden by the previous action
+ if (
+ includepath[: len(basepath)] != basepath
+ or includepath == basepath
+ ):
+ L = conflicts.setdefault(discriminator, [baseinfo])
+ L.append(action['info'])
+
+ else:
+ output.append(ainfo)
+
+ basepath, baseinfo = action['includepath'], action['info']
+ for _, action in rest:
+ includepath = action['includepath']
+ # Test whether path is a prefix of opath
+ if (
+ includepath[: len(basepath)] != basepath
+ or includepath == basepath # not a prefix
+ ):
+ L = conflicts.setdefault(discriminator, [baseinfo])
+ L.append(action['info'])
+
+ if conflicts:
+ raise ConfigurationConflictError(conflicts)
+
+ # sort resolved actions by "i" and yield them one by one
+ for i, action in sorted(output, key=operator.itemgetter(0)):
+ # do not memoize the order until we resolve an action inside it
+ state.min_order = action['order']
+ state.start = i + 1
+ state.remaining_actions.remove(action)
+ state.resolved_ainfos[action['discriminator']] = (i, action)
+ yield action
+
+
+def normalize_actions(actions):
+ """Convert old-style tuple actions to new-style dicts."""
+ result = []
+ for v in actions:
+ if not isinstance(v, dict):
+ v = expand_action_tuple(*v)
+ result.append(v)
+ return result
+
+
+def expand_action_tuple(
+ discriminator,
+ callable=None,
+ args=(),
+ kw=None,
+ includepath=(),
+ info=None,
+ order=0,
+ introspectables=(),
+):
+ if kw is None:
+ kw = {}
+ return dict(
+ discriminator=discriminator,
+ callable=callable,
+ args=args,
+ kw=kw,
+ includepath=includepath,
+ info=info,
+ order=order,
+ introspectables=introspectables,
+ )
+
+
+@implementer(IActionInfo)
+class ActionInfo(object):
+ def __init__(self, file, line, function, src):
+ self.file = file
+ self.line = line
+ self.function = function
+ self.src = src
+
+ def __str__(self):
+ srclines = self.src.split('\n')
+ src = '\n'.join(' %s' % x for x in srclines)
+ return 'Line %s of file %s:\n%s' % (self.line, self.file, src)
+
+
+def action_method(wrapped):
+ """ Wrapper to provide the right conflict info report data when a method
+ that calls Configurator.action calls another that does the same. Not a
+ documented API but used by some external systems."""
+
+ def wrapper(self, *arg, **kw):
+ if self._ainfo is None:
+ self._ainfo = []
+ info = kw.pop('_info', None)
+ # backframes for outer decorators to actionmethods
+ backframes = kw.pop('_backframes', 0) + 2
+ if is_nonstr_iter(info) and len(info) == 4:
+ # _info permitted as extract_stack tuple
+ info = ActionInfo(*info)
+ if info is None:
+ try:
+ f = traceback.extract_stack(limit=4)
+
+ # Work around a Python 3.5 issue whereby it would insert an
+ # extra stack frame. This should no longer be necessary in
+ # Python 3.5.1
+ last_frame = ActionInfo(*f[-1])
+ if last_frame.function == 'extract_stack': # pragma: no cover
+ f.pop()
+ info = ActionInfo(*f[-backframes])
+ except Exception: # pragma: no cover
+ info = ActionInfo(None, 0, '', '')
+ self._ainfo.append(info)
+ try:
+ result = wrapped(self, *arg, **kw)
+ finally:
+ self._ainfo.pop()
+ return result
+
+ if hasattr(wrapped, '__name__'):
+ functools.update_wrapper(wrapper, wrapped)
+ wrapper.__docobj__ = wrapped
+ return wrapper
diff --git a/src/pyramid/config/adapters.py b/src/pyramid/config/adapters.py
index e5668c40e..54c239ab3 100644
--- a/src/pyramid/config/adapters.py
+++ b/src/pyramid/config/adapters.py
@@ -8,7 +8,7 @@ from pyramid.interfaces import IResponse, ITraverser, IResourceURL
from pyramid.util import takes_one_arg
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class AdaptersConfiguratorMixin(object):
diff --git a/src/pyramid/config/assets.py b/src/pyramid/config/assets.py
index fd8b2ee49..e505fd204 100644
--- a/src/pyramid/config/assets.py
+++ b/src/pyramid/config/assets.py
@@ -9,7 +9,7 @@ from pyramid.interfaces import IPackageOverrides, PHASE1_CONFIG
from pyramid.exceptions import ConfigurationError
from pyramid.threadlocal import get_current_registry
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class OverrideProvider(pkg_resources.DefaultProvider):
diff --git a/src/pyramid/config/factories.py b/src/pyramid/config/factories.py
index 2ec1558a6..16211989f 100644
--- a/src/pyramid/config/factories.py
+++ b/src/pyramid/config/factories.py
@@ -15,7 +15,7 @@ from pyramid.traversal import DefaultRootFactory
from pyramid.util import get_callable_name, InstancePropertyHelper
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class FactoriesConfiguratorMixin(object):
diff --git a/src/pyramid/config/i18n.py b/src/pyramid/config/i18n.py
index 6e7334448..92c324ff7 100644
--- a/src/pyramid/config/i18n.py
+++ b/src/pyramid/config/i18n.py
@@ -3,7 +3,7 @@ from pyramid.interfaces import ILocaleNegotiator, ITranslationDirectories
from pyramid.exceptions import ConfigurationError
from pyramid.path import AssetResolver
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class I18NConfiguratorMixin(object):
diff --git a/src/pyramid/config/predicates.py b/src/pyramid/config/predicates.py
index cdbf68ca4..8f16f74af 100644
--- a/src/pyramid/config/predicates.py
+++ b/src/pyramid/config/predicates.py
@@ -1,3 +1,256 @@
-import zope.deprecation
+from hashlib import md5
+from webob.acceptparse import Accept
-zope.deprecation.moved('pyramid.predicates', 'Pyramid 1.10')
+from pyramid.compat import bytes_, is_nonstr_iter
+from pyramid.exceptions import ConfigurationError
+from pyramid.interfaces import IPredicateList, PHASE1_CONFIG
+from pyramid.predicates import Notted
+from pyramid.registry import predvalseq
+from pyramid.util import TopologicalSorter
+
+
+MAX_ORDER = 1 << 30
+DEFAULT_PHASH = md5().hexdigest()
+
+
+class PredicateConfiguratorMixin(object):
+ def get_predlist(self, name):
+ predlist = self.registry.queryUtility(IPredicateList, name=name)
+ if predlist is None:
+ predlist = PredicateList()
+ self.registry.registerUtility(predlist, IPredicateList, name=name)
+ return predlist
+
+ def _add_predicate(
+ self, type, name, factory, weighs_more_than=None, weighs_less_than=None
+ ):
+ factory = self.maybe_dotted(factory)
+ discriminator = ('%s option' % type, name)
+ intr = self.introspectable(
+ '%s predicates' % type,
+ discriminator,
+ '%s predicate named %s' % (type, name),
+ '%s predicate' % type,
+ )
+ intr['name'] = name
+ intr['factory'] = factory
+ intr['weighs_more_than'] = weighs_more_than
+ intr['weighs_less_than'] = weighs_less_than
+
+ def register():
+ predlist = self.get_predlist(type)
+ predlist.add(
+ name,
+ factory,
+ weighs_more_than=weighs_more_than,
+ weighs_less_than=weighs_less_than,
+ )
+
+ self.action(
+ discriminator,
+ register,
+ introspectables=(intr,),
+ order=PHASE1_CONFIG,
+ ) # must be registered early
+
+
+class not_(object):
+ """
+
+ You can invert the meaning of any predicate value by wrapping it in a call
+ to :class:`pyramid.config.not_`.
+
+ .. code-block:: python
+ :linenos:
+
+ from pyramid.config import not_
+
+ config.add_view(
+ 'mypackage.views.my_view',
+ route_name='ok',
+ request_method=not_('POST')
+ )
+
+ The above example will ensure that the view is called if the request method
+ is *not* ``POST``, at least if no other view is more specific.
+
+ This technique of wrapping a predicate value in ``not_`` can be used
+ anywhere predicate values are accepted:
+
+ - :meth:`pyramid.config.Configurator.add_view`
+
+ - :meth:`pyramid.config.Configurator.add_route`
+
+ - :meth:`pyramid.config.Configurator.add_subscriber`
+
+ - :meth:`pyramid.view.view_config`
+
+ - :meth:`pyramid.events.subscriber`
+
+ .. versionadded:: 1.5
+ """
+
+ def __init__(self, value):
+ self.value = value
+
+
+# under = after
+# over = before
+
+
+class PredicateList(object):
+ def __init__(self):
+ self.sorter = TopologicalSorter()
+ self.last_added = None
+
+ def add(self, name, factory, weighs_more_than=None, weighs_less_than=None):
+ # Predicates should be added to a predicate list in (presumed)
+ # computation expense order.
+ # if weighs_more_than is None and weighs_less_than is None:
+ # weighs_more_than = self.last_added or FIRST
+ # weighs_less_than = LAST
+ self.last_added = name
+ self.sorter.add(
+ name, factory, after=weighs_more_than, before=weighs_less_than
+ )
+
+ def names(self):
+ # Return the list of valid predicate names.
+ return self.sorter.names
+
+ def make(self, config, **kw):
+ # Given a configurator and a list of keywords, a predicate list is
+ # computed. Elsewhere in the code, we evaluate predicates using a
+ # generator expression. All predicates associated with a view or
+ # route must evaluate true for the view or route to "match" during a
+ # request. The fastest predicate should be evaluated first, then the
+ # next fastest, and so on, as if one returns false, the remainder of
+ # the predicates won't need to be evaluated.
+ #
+ # While we compute predicates, we also compute a predicate hash (aka
+ # phash) that can be used by a caller to identify identical predicate
+ # lists.
+ ordered = self.sorter.sorted()
+ phash = md5()
+ weights = []
+ preds = []
+ for n, (name, predicate_factory) in enumerate(ordered):
+ vals = kw.pop(name, None)
+ if vals is None: # XXX should this be a sentinel other than None?
+ continue
+ if not isinstance(vals, predvalseq):
+ vals = (vals,)
+ for val in vals:
+ realval = val
+ notted = False
+ if isinstance(val, not_):
+ realval = val.value
+ notted = True
+ pred = predicate_factory(realval, config)
+ if notted:
+ pred = Notted(pred)
+ hashes = pred.phash()
+ if not is_nonstr_iter(hashes):
+ hashes = [hashes]
+ for h in hashes:
+ phash.update(bytes_(h))
+ weights.append(1 << n + 1)
+ preds.append(pred)
+ if kw:
+ from difflib import get_close_matches
+
+ closest = []
+ names = [name for name, _ in ordered]
+ for name in kw:
+ closest.extend(get_close_matches(name, names, 3))
+
+ raise ConfigurationError(
+ 'Unknown predicate values: %r (did you mean %s)'
+ % (kw, ','.join(closest))
+ )
+ # A "order" is computed for the predicate list. An order is
+ # a scoring.
+ #
+ # Each predicate is associated with a weight value. The weight of a
+ # predicate symbolizes the relative potential "importance" of the
+ # predicate to all other predicates. A larger weight indicates
+ # greater importance.
+ #
+ # All weights for a given predicate list are bitwise ORed together
+ # to create a "score"; this score is then subtracted from
+ # MAX_ORDER and divided by an integer representing the number of
+ # predicates+1 to determine the order.
+ #
+ # For views, the order represents the ordering in which a "multiview"
+ # ( a collection of views that share the same context/request/name
+ # triad but differ in other ways via predicates) will attempt to call
+ # its set of views. Views with lower orders will be tried first.
+ # The intent is to a) ensure that views with more predicates are
+ # always evaluated before views with fewer predicates and b) to
+ # ensure a stable call ordering of views that share the same number
+ # of predicates. Views which do not have any predicates get an order
+ # of MAX_ORDER, meaning that they will be tried very last.
+ score = 0
+ for bit in weights:
+ score = score | bit
+ order = (MAX_ORDER - score) / (len(preds) + 1)
+ return order, preds, phash.hexdigest()
+
+
+def normalize_accept_offer(offer, allow_range=False):
+ if allow_range and '*' in offer:
+ return offer.lower()
+ return str(Accept.parse_offer(offer))
+
+
+def sort_accept_offers(offers, order=None):
+ """
+ Sort a list of offers by preference.
+
+ For a given ``type/subtype`` category of offers, this algorithm will
+ always sort offers with params higher than the bare offer.
+
+ :param offers: A list of offers to be sorted.
+ :param order: A weighted list of offers where items closer to the start of
+ the list will be a preferred over items closer to the end.
+ :return: A list of offers sorted first by specificity (higher to lower)
+ then by ``order``.
+
+ """
+ if order is None:
+ order = []
+
+ max_weight = len(offers)
+
+ def find_order_index(value, default=None):
+ return next((i for i, x in enumerate(order) if x == value), default)
+
+ def offer_sort_key(value):
+ """
+ (type_weight, params_weight)
+
+ type_weight:
+ - index of specific ``type/subtype`` in order list
+ - ``max_weight * 2`` if no match is found
+
+ params_weight:
+ - index of specific ``type/subtype;params`` in order list
+ - ``max_weight`` if not found
+ - ``max_weight + 1`` if no params at all
+
+ """
+ parsed = Accept.parse_offer(value)
+
+ type_w = find_order_index(
+ parsed.type + '/' + parsed.subtype, max_weight
+ )
+
+ if parsed.params:
+ param_w = find_order_index(value, max_weight)
+
+ else:
+ param_w = max_weight + 1
+
+ return (type_w, param_w)
+
+ return sorted(offers, key=offer_sort_key)
diff --git a/src/pyramid/config/rendering.py b/src/pyramid/config/rendering.py
index 948199636..7e5b767d9 100644
--- a/src/pyramid/config/rendering.py
+++ b/src/pyramid/config/rendering.py
@@ -1,7 +1,7 @@
from pyramid.interfaces import IRendererFactory, PHASE1_CONFIG
from pyramid import renderers
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
DEFAULT_RENDERERS = (
('json', renderers.json_renderer_factory),
diff --git a/src/pyramid/config/routes.py b/src/pyramid/config/routes.py
index 7a76e9e68..a14662370 100644
--- a/src/pyramid/config/routes.py
+++ b/src/pyramid/config/routes.py
@@ -10,18 +10,14 @@ from pyramid.interfaces import (
)
from pyramid.exceptions import ConfigurationError
+import pyramid.predicates
from pyramid.request import route_request_iface
from pyramid.urldispatch import RoutesMapper
from pyramid.util import as_sorted_tuple, is_nonstr_iter
-import pyramid.predicates
-
-from pyramid.config.util import (
- action_method,
- normalize_accept_offer,
- predvalseq,
-)
+from pyramid.config.actions import action_method
+from pyramid.config.predicates import normalize_accept_offer, predvalseq
class RoutesConfiguratorMixin(object):
diff --git a/src/pyramid/config/security.py b/src/pyramid/config/security.py
index 3b55c41d7..08e7cb81a 100644
--- a/src/pyramid/config/security.py
+++ b/src/pyramid/config/security.py
@@ -14,7 +14,7 @@ from pyramid.csrf import LegacySessionCSRFStoragePolicy
from pyramid.exceptions import ConfigurationError
from pyramid.util import as_sorted_tuple
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class SecurityConfiguratorMixin(object):
diff --git a/src/pyramid/config/testing.py b/src/pyramid/config/testing.py
index 1655df52c..bba5054e6 100644
--- a/src/pyramid/config/testing.py
+++ b/src/pyramid/config/testing.py
@@ -11,7 +11,7 @@ from pyramid.renderers import RendererHelper
from pyramid.traversal import decode_path_info, split_path_info
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class TestingConfiguratorMixin(object):
diff --git a/src/pyramid/config/tweens.py b/src/pyramid/config/tweens.py
index b74a57adf..7fc786a97 100644
--- a/src/pyramid/config/tweens.py
+++ b/src/pyramid/config/tweens.py
@@ -10,7 +10,7 @@ from pyramid.tweens import MAIN, INGRESS, EXCVIEW
from pyramid.util import is_string_or_iterable, TopologicalSorter
-from pyramid.config.util import action_method
+from pyramid.config.actions import action_method
class TweensConfiguratorMixin(object):
diff --git a/src/pyramid/config/util.py b/src/pyramid/config/util.py
deleted file mode 100644
index 8723b7721..000000000
--- a/src/pyramid/config/util.py
+++ /dev/null
@@ -1,276 +0,0 @@
-import functools
-from hashlib import md5
-import traceback
-from webob.acceptparse import Accept
-from zope.interface import implementer
-
-from pyramid.compat import bytes_, is_nonstr_iter
-from pyramid.interfaces import IActionInfo
-
-from pyramid.exceptions import ConfigurationError
-from pyramid.predicates import Notted
-from pyramid.registry import predvalseq
-from pyramid.util import TopologicalSorter, takes_one_arg
-
-TopologicalSorter = TopologicalSorter # support bw-compat imports
-takes_one_arg = takes_one_arg # support bw-compat imports
-
-
-@implementer(IActionInfo)
-class ActionInfo(object):
- def __init__(self, file, line, function, src):
- self.file = file
- self.line = line
- self.function = function
- self.src = src
-
- def __str__(self):
- srclines = self.src.split('\n')
- src = '\n'.join(' %s' % x for x in srclines)
- return 'Line %s of file %s:\n%s' % (self.line, self.file, src)
-
-
-def action_method(wrapped):
- """ Wrapper to provide the right conflict info report data when a method
- that calls Configurator.action calls another that does the same. Not a
- documented API but used by some external systems."""
-
- def wrapper(self, *arg, **kw):
- if self._ainfo is None:
- self._ainfo = []
- info = kw.pop('_info', None)
- # backframes for outer decorators to actionmethods
- backframes = kw.pop('_backframes', 0) + 2
- if is_nonstr_iter(info) and len(info) == 4:
- # _info permitted as extract_stack tuple
- info = ActionInfo(*info)
- if info is None:
- try:
- f = traceback.extract_stack(limit=4)
-
- # Work around a Python 3.5 issue whereby it would insert an
- # extra stack frame. This should no longer be necessary in
- # Python 3.5.1
- last_frame = ActionInfo(*f[-1])
- if last_frame.function == 'extract_stack': # pragma: no cover
- f.pop()
- info = ActionInfo(*f[-backframes])
- except Exception: # pragma: no cover
- info = ActionInfo(None, 0, '', '')
- self._ainfo.append(info)
- try:
- result = wrapped(self, *arg, **kw)
- finally:
- self._ainfo.pop()
- return result
-
- if hasattr(wrapped, '__name__'):
- functools.update_wrapper(wrapper, wrapped)
- wrapper.__docobj__ = wrapped
- return wrapper
-
-
-MAX_ORDER = 1 << 30
-DEFAULT_PHASH = md5().hexdigest()
-
-
-class not_(object):
- """
-
- You can invert the meaning of any predicate value by wrapping it in a call
- to :class:`pyramid.config.not_`.
-
- .. code-block:: python
- :linenos:
-
- from pyramid.config import not_
-
- config.add_view(
- 'mypackage.views.my_view',
- route_name='ok',
- request_method=not_('POST')
- )
-
- The above example will ensure that the view is called if the request method
- is *not* ``POST``, at least if no other view is more specific.
-
- This technique of wrapping a predicate value in ``not_`` can be used
- anywhere predicate values are accepted:
-
- - :meth:`pyramid.config.Configurator.add_view`
-
- - :meth:`pyramid.config.Configurator.add_route`
-
- - :meth:`pyramid.config.Configurator.add_subscriber`
-
- - :meth:`pyramid.view.view_config`
-
- - :meth:`pyramid.events.subscriber`
-
- .. versionadded:: 1.5
- """
-
- def __init__(self, value):
- self.value = value
-
-
-# under = after
-# over = before
-
-
-class PredicateList(object):
- def __init__(self):
- self.sorter = TopologicalSorter()
- self.last_added = None
-
- def add(self, name, factory, weighs_more_than=None, weighs_less_than=None):
- # Predicates should be added to a predicate list in (presumed)
- # computation expense order.
- # if weighs_more_than is None and weighs_less_than is None:
- # weighs_more_than = self.last_added or FIRST
- # weighs_less_than = LAST
- self.last_added = name
- self.sorter.add(
- name, factory, after=weighs_more_than, before=weighs_less_than
- )
-
- def names(self):
- # Return the list of valid predicate names.
- return self.sorter.names
-
- def make(self, config, **kw):
- # Given a configurator and a list of keywords, a predicate list is
- # computed. Elsewhere in the code, we evaluate predicates using a
- # generator expression. All predicates associated with a view or
- # route must evaluate true for the view or route to "match" during a
- # request. The fastest predicate should be evaluated first, then the
- # next fastest, and so on, as if one returns false, the remainder of
- # the predicates won't need to be evaluated.
- #
- # While we compute predicates, we also compute a predicate hash (aka
- # phash) that can be used by a caller to identify identical predicate
- # lists.
- ordered = self.sorter.sorted()
- phash = md5()
- weights = []
- preds = []
- for n, (name, predicate_factory) in enumerate(ordered):
- vals = kw.pop(name, None)
- if vals is None: # XXX should this be a sentinel other than None?
- continue
- if not isinstance(vals, predvalseq):
- vals = (vals,)
- for val in vals:
- realval = val
- notted = False
- if isinstance(val, not_):
- realval = val.value
- notted = True
- pred = predicate_factory(realval, config)
- if notted:
- pred = Notted(pred)
- hashes = pred.phash()
- if not is_nonstr_iter(hashes):
- hashes = [hashes]
- for h in hashes:
- phash.update(bytes_(h))
- weights.append(1 << n + 1)
- preds.append(pred)
- if kw:
- from difflib import get_close_matches
-
- closest = []
- names = [name for name, _ in ordered]
- for name in kw:
- closest.extend(get_close_matches(name, names, 3))
-
- raise ConfigurationError(
- 'Unknown predicate values: %r (did you mean %s)'
- % (kw, ','.join(closest))
- )
- # A "order" is computed for the predicate list. An order is
- # a scoring.
- #
- # Each predicate is associated with a weight value. The weight of a
- # predicate symbolizes the relative potential "importance" of the
- # predicate to all other predicates. A larger weight indicates
- # greater importance.
- #
- # All weights for a given predicate list are bitwise ORed together
- # to create a "score"; this score is then subtracted from
- # MAX_ORDER and divided by an integer representing the number of
- # predicates+1 to determine the order.
- #
- # For views, the order represents the ordering in which a "multiview"
- # ( a collection of views that share the same context/request/name
- # triad but differ in other ways via predicates) will attempt to call
- # its set of views. Views with lower orders will be tried first.
- # The intent is to a) ensure that views with more predicates are
- # always evaluated before views with fewer predicates and b) to
- # ensure a stable call ordering of views that share the same number
- # of predicates. Views which do not have any predicates get an order
- # of MAX_ORDER, meaning that they will be tried very last.
- score = 0
- for bit in weights:
- score = score | bit
- order = (MAX_ORDER - score) / (len(preds) + 1)
- return order, preds, phash.hexdigest()
-
-
-def normalize_accept_offer(offer, allow_range=False):
- if allow_range and '*' in offer:
- return offer.lower()
- return str(Accept.parse_offer(offer))
-
-
-def sort_accept_offers(offers, order=None):
- """
- Sort a list of offers by preference.
-
- For a given ``type/subtype`` category of offers, this algorithm will
- always sort offers with params higher than the bare offer.
-
- :param offers: A list of offers to be sorted.
- :param order: A weighted list of offers where items closer to the start of
- the list will be a preferred over items closer to the end.
- :return: A list of offers sorted first by specificity (higher to lower)
- then by ``order``.
-
- """
- if order is None:
- order = []
-
- max_weight = len(offers)
-
- def find_order_index(value, default=None):
- return next((i for i, x in enumerate(order) if x == value), default)
-
- def offer_sort_key(value):
- """
- (type_weight, params_weight)
-
- type_weight:
- - index of specific ``type/subtype`` in order list
- - ``max_weight * 2`` if no match is found
-
- params_weight:
- - index of specific ``type/subtype;params`` in order list
- - ``max_weight`` if not found
- - ``max_weight + 1`` if no params at all
-
- """
- parsed = Accept.parse_offer(value)
-
- type_w = find_order_index(
- parsed.type + '/' + parsed.subtype, max_weight
- )
-
- if parsed.params:
- param_w = find_order_index(value, max_weight)
-
- else:
- param_w = max_weight + 1
-
- return (type_w, param_w)
-
- return sorted(offers, key=offer_sort_key)
diff --git a/src/pyramid/config/views.py b/src/pyramid/config/views.py
index cc5b48ecb..bd1b693ba 100644
--- a/src/pyramid/config/views.py
+++ b/src/pyramid/config/views.py
@@ -74,8 +74,8 @@ from pyramid.viewderivers import (
wraps_view,
)
-from pyramid.config.util import (
- action_method,
+from pyramid.config.actions import action_method
+from pyramid.config.predicates import (
DEFAULT_PHASH,
MAX_ORDER,
normalize_accept_offer,
diff --git a/tests/test_config/test_actions.py b/tests/test_config/test_actions.py
new file mode 100644
index 000000000..a72d0d7b1
--- /dev/null
+++ b/tests/test_config/test_actions.py
@@ -0,0 +1,1090 @@
+import unittest
+
+from pyramid.exceptions import ConfigurationConflictError
+from pyramid.exceptions import ConfigurationExecutionError
+
+from pyramid.interfaces import IRequest
+
+
+class ActionConfiguratorMixinTests(unittest.TestCase):
+ def _makeOne(self, *arg, **kw):
+ from pyramid.config import Configurator
+
+ config = Configurator(*arg, **kw)
+ return config
+
+ def _getViewCallable(
+ self,
+ config,
+ ctx_iface=None,
+ request_iface=None,
+ name='',
+ exception_view=False,
+ ):
+ from zope.interface import Interface
+ from pyramid.interfaces import IView
+ from pyramid.interfaces import IViewClassifier
+ from pyramid.interfaces import IExceptionViewClassifier
+
+ if exception_view: # pragma: no cover
+ classifier = IExceptionViewClassifier
+ else:
+ classifier = IViewClassifier
+ if ctx_iface is None:
+ ctx_iface = Interface
+ if request_iface is None:
+ request_iface = IRequest
+ return config.registry.adapters.lookup(
+ (classifier, request_iface, ctx_iface),
+ IView,
+ name=name,
+ default=None,
+ )
+
+ def test_action_branching_kw_is_None(self):
+ config = self._makeOne(autocommit=True)
+ self.assertEqual(config.action('discrim'), None)
+
+ def test_action_branching_kw_is_not_None(self):
+ config = self._makeOne(autocommit=True)
+ self.assertEqual(config.action('discrim', kw={'a': 1}), None)
+
+ def test_action_autocommit_with_introspectables(self):
+ from pyramid.config.actions import ActionInfo
+
+ config = self._makeOne(autocommit=True)
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(len(intr.registered), 1)
+ self.assertEqual(intr.registered[0][0], config.introspector)
+ self.assertEqual(intr.registered[0][1].__class__, ActionInfo)
+
+ def test_action_autocommit_with_introspectables_introspection_off(self):
+ config = self._makeOne(autocommit=True)
+ config.introspection = False
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(len(intr.registered), 0)
+
+ def test_action_branching_nonautocommit_with_config_info(self):
+ config = self._makeOne(autocommit=False)
+ config.info = 'abc'
+ state = DummyActionState()
+ state.autocommit = False
+ config.action_state = state
+ config.action('discrim', kw={'a': 1})
+ self.assertEqual(
+ state.actions,
+ [
+ (
+ (),
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': 'discrim',
+ 'includepath': (),
+ 'info': 'abc',
+ 'introspectables': (),
+ 'kw': {'a': 1},
+ 'order': 0,
+ },
+ )
+ ],
+ )
+
+ def test_action_branching_nonautocommit_without_config_info(self):
+ config = self._makeOne(autocommit=False)
+ config.info = ''
+ config._ainfo = ['z']
+ state = DummyActionState()
+ config.action_state = state
+ state.autocommit = False
+ config.action('discrim', kw={'a': 1})
+ self.assertEqual(
+ state.actions,
+ [
+ (
+ (),
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': 'discrim',
+ 'includepath': (),
+ 'info': 'z',
+ 'introspectables': (),
+ 'kw': {'a': 1},
+ 'order': 0,
+ },
+ )
+ ],
+ )
+
+ def test_action_branching_nonautocommit_with_introspectables(self):
+ config = self._makeOne(autocommit=False)
+ config.info = ''
+ config._ainfo = []
+ state = DummyActionState()
+ config.action_state = state
+ state.autocommit = False
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(state.actions[0][1]['introspectables'], (intr,))
+
+ def test_action_nonautocommit_with_introspectables_introspection_off(self):
+ config = self._makeOne(autocommit=False)
+ config.info = ''
+ config._ainfo = []
+ config.introspection = False
+ state = DummyActionState()
+ config.action_state = state
+ state.autocommit = False
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(state.actions[0][1]['introspectables'], ())
+
+ def test_commit_conflict_simple(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ config.add_view(view1)
+ config.add_view(view2)
+ self.assertRaises(ConfigurationConflictError, config.commit)
+
+ def test_commit_conflict_resolved_with_include(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def includeme(config):
+ config.add_view(view2)
+
+ config.add_view(view1)
+ config.include(includeme)
+ config.commit()
+ registeredview = self._getViewCallable(config)
+ self.assertEqual(registeredview.__name__, 'view1')
+
+ def test_commit_conflict_with_two_includes(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def includeme1(config):
+ config.add_view(view1)
+
+ def includeme2(config):
+ config.add_view(view2)
+
+ config.include(includeme1)
+ config.include(includeme2)
+ try:
+ config.commit()
+ except ConfigurationConflictError as why:
+ c1, c2 = _conflictFunctions(why)
+ self.assertEqual(c1, 'includeme1')
+ self.assertEqual(c2, 'includeme2')
+ else: # pragma: no cover
+ raise AssertionError
+
+ def test_commit_conflict_resolved_with_two_includes_and_local(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def view3(request): # pragma: no cover
+ pass
+
+ def includeme1(config):
+ config.add_view(view1)
+
+ def includeme2(config):
+ config.add_view(view2)
+
+ config.include(includeme1)
+ config.include(includeme2)
+ config.add_view(view3)
+ config.commit()
+ registeredview = self._getViewCallable(config)
+ self.assertEqual(registeredview.__name__, 'view3')
+
+ def test_autocommit_no_conflicts(self):
+ from pyramid.renderers import null_renderer
+
+ config = self._makeOne(autocommit=True)
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def view3(request): # pragma: no cover
+ pass
+
+ config.add_view(view1, renderer=null_renderer)
+ config.add_view(view2, renderer=null_renderer)
+ config.add_view(view3, renderer=null_renderer)
+ config.commit()
+ registeredview = self._getViewCallable(config)
+ self.assertEqual(registeredview.__name__, 'view3')
+
+ def test_conflict_set_notfound_view(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ config.set_notfound_view(view1)
+ config.set_notfound_view(view2)
+ try:
+ config.commit()
+ except ConfigurationConflictError as why:
+ c1, c2 = _conflictFunctions(why)
+ self.assertEqual(c1, 'test_conflict_set_notfound_view')
+ self.assertEqual(c2, 'test_conflict_set_notfound_view')
+ else: # pragma: no cover
+ raise AssertionError
+
+ def test_conflict_set_forbidden_view(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ config.set_forbidden_view(view1)
+ config.set_forbidden_view(view2)
+ try:
+ config.commit()
+ except ConfigurationConflictError as why:
+ c1, c2 = _conflictFunctions(why)
+ self.assertEqual(c1, 'test_conflict_set_forbidden_view')
+ self.assertEqual(c2, 'test_conflict_set_forbidden_view')
+ else: # pragma: no cover
+ raise AssertionError
+
+
+class TestActionState(unittest.TestCase):
+ def _makeOne(self):
+ from pyramid.config.actions import ActionState
+
+ return ActionState()
+
+ def test_it(self):
+ c = self._makeOne()
+ self.assertEqual(c.actions, [])
+
+ def test_action_simple(self):
+ from . import dummyfactory as f
+
+ c = self._makeOne()
+ c.actions = []
+ c.action(1, f, (1,), {'x': 1})
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (1,),
+ 'callable': f,
+ 'discriminator': 1,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {'x': 1},
+ 'order': 0,
+ }
+ ],
+ )
+ c.action(None)
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (1,),
+ 'callable': f,
+ 'discriminator': 1,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {'x': 1},
+ 'order': 0,
+ },
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ },
+ ],
+ )
+
+ def test_action_with_includepath(self):
+ c = self._makeOne()
+ c.actions = []
+ c.action(None, includepath=('abc',))
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': ('abc',),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_action_with_info(self):
+ c = self._makeOne()
+ c.action(None, info='abc')
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': 'abc',
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_action_with_includepath_and_info(self):
+ c = self._makeOne()
+ c.action(None, includepath=('spec',), info='bleh')
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': ('spec',),
+ 'info': 'bleh',
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_action_with_order(self):
+ c = self._makeOne()
+ c.actions = []
+ c.action(None, order=99999)
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 99999,
+ }
+ ],
+ )
+
+ def test_action_with_introspectables(self):
+ c = self._makeOne()
+ c.actions = []
+ intr = DummyIntrospectable()
+ c.action(None, introspectables=(intr,))
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (intr,),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_processSpec(self):
+ c = self._makeOne()
+ self.assertTrue(c.processSpec('spec'))
+ self.assertFalse(c.processSpec('spec'))
+
+ def test_execute_actions_tuples(self):
+ output = []
+
+ def f(*a, **k):
+ output.append((a, k))
+
+ c = self._makeOne()
+ c.actions = [
+ (1, f, (1,)),
+ (1, f, (11,), {}, ('x',)),
+ (2, f, (2,)),
+ (None, None),
+ ]
+ c.execute_actions()
+ self.assertEqual(output, [((1,), {}), ((2,), {})])
+
+ def test_execute_actions_dicts(self):
+ output = []
+
+ def f(*a, **k):
+ output.append((a, k))
+
+ c = self._makeOne()
+ c.actions = [
+ {
+ 'discriminator': 1,
+ 'callable': f,
+ 'args': (1,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ },
+ {
+ 'discriminator': 1,
+ 'callable': f,
+ 'args': (11,),
+ 'kw': {},
+ 'includepath': ('x',),
+ 'order': 0,
+ 'info': None,
+ 'introspectables': (),
+ },
+ {
+ 'discriminator': 2,
+ 'callable': f,
+ 'args': (2,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ },
+ {
+ 'discriminator': None,
+ 'callable': None,
+ 'args': (),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ },
+ ]
+ c.execute_actions()
+ self.assertEqual(output, [((1,), {}), ((2,), {})])
+
+ def test_execute_actions_with_introspectables(self):
+ output = []
+
+ def f(*a, **k):
+ output.append((a, k))
+
+ c = self._makeOne()
+ intr = DummyIntrospectable()
+ c.actions = [
+ {
+ 'discriminator': 1,
+ 'callable': f,
+ 'args': (1,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (intr,),
+ }
+ ]
+ introspector = object()
+ c.execute_actions(introspector=introspector)
+ self.assertEqual(output, [((1,), {})])
+ self.assertEqual(intr.registered, [(introspector, None)])
+
+ def test_execute_actions_with_introspectable_no_callable(self):
+ c = self._makeOne()
+ intr = DummyIntrospectable()
+ c.actions = [
+ {
+ 'discriminator': 1,
+ 'callable': None,
+ 'args': (1,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (intr,),
+ }
+ ]
+ introspector = object()
+ c.execute_actions(introspector=introspector)
+ self.assertEqual(intr.registered, [(introspector, None)])
+
+ def test_execute_actions_error(self):
+ output = []
+
+ def f(*a, **k):
+ output.append(('f', a, k))
+
+ def bad():
+ raise NotImplementedError
+
+ c = self._makeOne()
+ c.actions = [
+ (1, f, (1,)),
+ (1, f, (11,), {}, ('x',)),
+ (2, f, (2,)),
+ (3, bad, (), {}, (), 'oops'),
+ ]
+ self.assertRaises(ConfigurationExecutionError, c.execute_actions)
+ self.assertEqual(output, [('f', (1,), {}), ('f', (2,), {})])
+
+ def test_reentrant_action(self):
+ output = []
+ c = self._makeOne()
+
+ def f(*a, **k):
+ output.append(('f', a, k))
+ c.actions.append((3, g, (8,), {}))
+
+ def g(*a, **k):
+ output.append(('g', a, k))
+
+ c.actions = [(1, f, (1,))]
+ c.execute_actions()
+ self.assertEqual(output, [('f', (1,), {}), ('g', (8,), {})])
+
+ def test_reentrant_action_with_deferred_discriminator(self):
+ # see https://github.com/Pylons/pyramid/issues/2697
+ from pyramid.registry import Deferred
+
+ output = []
+ c = self._makeOne()
+
+ def f(*a, **k):
+ output.append(('f', a, k))
+ c.actions.append((4, g, (4,), {}, (), None, 2))
+
+ def g(*a, **k):
+ output.append(('g', a, k))
+
+ def h(*a, **k):
+ output.append(('h', a, k))
+
+ def discrim():
+ self.assertEqual(output, [('f', (1,), {}), ('g', (2,), {})])
+ return 3
+
+ d = Deferred(discrim)
+ c.actions = [
+ (d, h, (3,), {}, (), None, 1), # order 1
+ (1, f, (1,)), # order 0
+ (2, g, (2,)), # order 0
+ ]
+ c.execute_actions()
+ self.assertEqual(
+ output,
+ [
+ ('f', (1,), {}),
+ ('g', (2,), {}),
+ ('h', (3,), {}),
+ ('g', (4,), {}),
+ ],
+ )
+
+ def test_reentrant_action_error(self):
+ from pyramid.exceptions import ConfigurationError
+
+ c = self._makeOne()
+
+ def f(*a, **k):
+ c.actions.append((3, g, (8,), {}, (), None, -1))
+
+ def g(*a, **k): # pragma: no cover
+ pass
+
+ c.actions = [(1, f, (1,))]
+ self.assertRaises(ConfigurationError, c.execute_actions)
+
+ def test_reentrant_action_without_clear(self):
+ c = self._makeOne()
+
+ def f(*a, **k):
+ c.actions.append((3, g, (8,)))
+
+ def g(*a, **k):
+ pass
+
+ c.actions = [(1, f, (1,))]
+ c.execute_actions(clear=False)
+ self.assertEqual(c.actions, [(1, f, (1,)), (3, g, (8,))])
+
+ def test_executing_conflicting_action_across_orders(self):
+ from pyramid.exceptions import ConfigurationConflictError
+
+ c = self._makeOne()
+
+ def f(*a, **k):
+ pass
+
+ def g(*a, **k): # pragma: no cover
+ pass
+
+ c.actions = [(1, f, (1,), {}, (), None, -1), (1, g, (2,))]
+ self.assertRaises(ConfigurationConflictError, c.execute_actions)
+
+ def test_executing_conflicting_action_across_reentrant_orders(self):
+ from pyramid.exceptions import ConfigurationConflictError
+
+ c = self._makeOne()
+
+ def f(*a, **k):
+ c.actions.append((1, g, (8,)))
+
+ def g(*a, **k): # pragma: no cover
+ pass
+
+ c.actions = [(1, f, (1,), {}, (), None, -1)]
+ self.assertRaises(ConfigurationConflictError, c.execute_actions)
+
+
+class Test_reentrant_action_functional(unittest.TestCase):
+ def _makeConfigurator(self, *arg, **kw):
+ from pyramid.config import Configurator
+
+ config = Configurator(*arg, **kw)
+ return config
+
+ def test_functional(self):
+ def add_auto_route(config, name, view):
+ def register():
+ config.add_view(route_name=name, view=view)
+ config.add_route(name, '/' + name)
+
+ config.action(('auto route', name), register, order=-30)
+
+ config = self._makeConfigurator()
+ config.add_directive('add_auto_route', add_auto_route)
+
+ def my_view(request): # pragma: no cover
+ return request.response
+
+ config.add_auto_route('foo', my_view)
+ config.commit()
+ from pyramid.interfaces import IRoutesMapper
+
+ mapper = config.registry.getUtility(IRoutesMapper)
+ routes = mapper.get_routes()
+ route = routes[0]
+ self.assertEqual(len(routes), 1)
+ self.assertEqual(route.name, 'foo')
+ self.assertEqual(route.path, '/foo')
+
+ def test_deferred_discriminator(self):
+ # see https://github.com/Pylons/pyramid/issues/2697
+ from pyramid.config import PHASE0_CONFIG
+
+ config = self._makeConfigurator()
+
+ def deriver(view, info):
+ return view
+
+ deriver.options = ('foo',)
+ config.add_view_deriver(deriver, 'foo_view')
+ # add_view uses a deferred discriminator and will fail if executed
+ # prior to add_view_deriver executing its action
+ config.add_view(lambda r: r.response, name='', foo=1)
+
+ def dummy_action():
+ # trigger a re-entrant action
+ config.action(None, lambda: None)
+
+ config.action(None, dummy_action, order=PHASE0_CONFIG)
+ config.commit()
+
+
+class Test_resolveConflicts(unittest.TestCase):
+ def _callFUT(self, actions):
+ from pyramid.config.actions import resolveConflicts
+
+ return resolveConflicts(actions)
+
+ def test_it_success_tuples(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f),
+ (1, f, (1,), {}, (), 'first'),
+ (1, f, (2,), {}, ('x',), 'second'),
+ (1, f, (3,), {}, ('y',), 'third'),
+ (4, f, (4,), {}, ('y',), 'should be last', 99999),
+ (3, f, (3,), {}, ('y',)),
+ (None, f, (5,), {}, ('y',)),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': None,
+ 'args': (),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': 'first',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (3,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 3,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (5,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': 'should be last',
+ 'args': (4,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 4,
+ 'includepath': ('y',),
+ 'order': 99999,
+ },
+ ],
+ )
+
+ def test_it_success_dicts(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f),
+ (1, f, (1,), {}, (), 'first'),
+ (1, f, (2,), {}, ('x',), 'second'),
+ (1, f, (3,), {}, ('y',), 'third'),
+ (4, f, (4,), {}, ('y',), 'should be last', 99999),
+ (3, f, (3,), {}, ('y',)),
+ (None, f, (5,), {}, ('y',)),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': None,
+ 'args': (),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': 'first',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (3,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 3,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (5,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': 'should be last',
+ 'args': (4,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 4,
+ 'includepath': ('y',),
+ 'order': 99999,
+ },
+ ],
+ )
+
+ def test_it_conflict(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f),
+ (1, f, (2,), {}, ('x',), 'eek'), # will conflict
+ (1, f, (3,), {}, ('y',), 'ack'), # will conflict
+ (4, f, (4,), {}, ('y',)),
+ (3, f, (3,), {}, ('y',)),
+ (None, f, (5,), {}, ('y',)),
+ ]
+ )
+ self.assertRaises(ConfigurationConflictError, list, result)
+
+ def test_it_with_actions_grouped_by_order(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f), # X
+ (1, f, (1,), {}, (), 'third', 10), # X
+ (1, f, (2,), {}, ('x',), 'fourth', 10),
+ (1, f, (3,), {}, ('y',), 'fifth', 10),
+ (2, f, (1,), {}, (), 'sixth', 10), # X
+ (3, f, (1,), {}, (), 'seventh', 10), # X
+ (5, f, (4,), {}, ('y',), 'eighth', 99999), # X
+ (4, f, (3,), {}, (), 'first', 5), # X
+ (4, f, (5,), {}, ('y',), 'second', 5),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(len(result), 6)
+ # resolved actions should be grouped by (order, i)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': None,
+ 'args': (),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': 'first',
+ 'args': (3,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 4,
+ 'includepath': (),
+ 'order': 5,
+ },
+ {
+ 'info': 'third',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': (),
+ 'order': 10,
+ },
+ {
+ 'info': 'sixth',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 2,
+ 'includepath': (),
+ 'order': 10,
+ },
+ {
+ 'info': 'seventh',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 3,
+ 'includepath': (),
+ 'order': 10,
+ },
+ {
+ 'info': 'eighth',
+ 'args': (4,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 5,
+ 'includepath': ('y',),
+ 'order': 99999,
+ },
+ ],
+ )
+
+ def test_override_success_across_orders(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (1, f, (2,), {}, ('x',), 'eek', 0),
+ (1, f, (3,), {}, ('x', 'y'), 'ack', 10),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': 'eek',
+ 'args': (2,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': ('x',),
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_conflicts_across_orders(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (1, f, (2,), {}, ('x', 'y'), 'eek', 0),
+ (1, f, (3,), {}, ('x'), 'ack', 10),
+ ]
+ )
+ self.assertRaises(ConfigurationConflictError, list, result)
+
+
+class TestActionInfo(unittest.TestCase):
+ def _getTargetClass(self):
+ from pyramid.config.actions import ActionInfo
+
+ return ActionInfo
+
+ def _makeOne(self, filename, lineno, function, linerepr):
+ return self._getTargetClass()(filename, lineno, function, linerepr)
+
+ def test_class_conforms(self):
+ from zope.interface.verify import verifyClass
+ from pyramid.interfaces import IActionInfo
+
+ verifyClass(IActionInfo, self._getTargetClass())
+
+ def test_instance_conforms(self):
+ from zope.interface.verify import verifyObject
+ from pyramid.interfaces import IActionInfo
+
+ verifyObject(IActionInfo, self._makeOne('f', 0, 'f', 'f'))
+
+ def test_ctor(self):
+ inst = self._makeOne('filename', 10, 'function', 'src')
+ self.assertEqual(inst.file, 'filename')
+ self.assertEqual(inst.line, 10)
+ self.assertEqual(inst.function, 'function')
+ self.assertEqual(inst.src, 'src')
+
+ def test___str__(self):
+ inst = self._makeOne('filename', 0, 'function', ' linerepr ')
+ self.assertEqual(
+ str(inst), "Line 0 of file filename:\n linerepr "
+ )
+
+
+def _conflictFunctions(e):
+ conflicts = e._conflicts.values()
+ for conflict in conflicts:
+ for confinst in conflict:
+ yield confinst.function
+
+
+class DummyActionState(object):
+ autocommit = False
+ info = ''
+
+ def __init__(self):
+ self.actions = []
+
+ def action(self, *arg, **kw):
+ self.actions.append((arg, kw))
+
+
+class DummyIntrospectable(object):
+ def __init__(self):
+ self.registered = []
+
+ def register(self, introspector, action_info):
+ self.registered.append((introspector, action_info))
diff --git a/tests/test_config/test_init.py b/tests/test_config/test_init.py
index 2c92b60fb..811672fb3 100644
--- a/tests/test_config/test_init.py
+++ b/tests/test_config/test_init.py
@@ -1,7 +1,5 @@
import os
import unittest
-from zope.interface import Interface
-from zope.interface import implementer
from pyramid.compat import im_func
from pyramid.testing import skip_on
@@ -10,7 +8,6 @@ from . import dummy_tween_factory
from . import dummy_include
from . import dummy_extend
from . import dummy_extend2
-from . import IDummy
from . import DummyContext
from pyramid.exceptions import ConfigurationExecutionError
@@ -34,8 +31,6 @@ class ConfiguratorTests(unittest.TestCase):
name='',
exception_view=False,
):
- from zope.interface import Interface
- from pyramid.interfaces import IRequest
from pyramid.interfaces import IView
from pyramid.interfaces import IViewClassifier
from pyramid.interfaces import IExceptionViewClassifier
@@ -44,10 +39,6 @@ class ConfiguratorTests(unittest.TestCase):
classifier = IExceptionViewClassifier
else:
classifier = IViewClassifier
- if ctx_iface is None:
- ctx_iface = Interface
- if request_iface is None:
- request_iface = IRequest
return config.registry.adapters.lookup(
(classifier, request_iface, ctx_iface),
IView,
@@ -299,7 +290,6 @@ class ConfiguratorTests(unittest.TestCase):
def test_ctor_httpexception_view_default(self):
from pyramid.interfaces import IExceptionResponse
from pyramid.httpexceptions import default_exceptionresponse_view
- from pyramid.interfaces import IRequest
config = self._makeOne()
view = self._getViewCallable(
@@ -309,7 +299,6 @@ class ConfiguratorTests(unittest.TestCase):
def test_ctor_exceptionresponse_view_None(self):
from pyramid.interfaces import IExceptionResponse
- from pyramid.interfaces import IRequest
config = self._makeOne(exceptionresponse_view=None)
view = self._getViewCallable(
@@ -319,7 +308,6 @@ class ConfiguratorTests(unittest.TestCase):
def test_ctor_exceptionresponse_view_custom(self):
from pyramid.interfaces import IExceptionResponse
- from pyramid.interfaces import IRequest
def exceptionresponse_view(context, request): # pragma: no cover
pass
@@ -548,7 +536,6 @@ class ConfiguratorTests(unittest.TestCase):
def test_setup_registry_explicit_notfound_trumps_iexceptionresponse(self):
from pyramid.renderers import null_renderer
from zope.interface import implementedBy
- from pyramid.interfaces import IRequest
from pyramid.httpexceptions import HTTPNotFound
from pyramid.registry import Registry
@@ -975,110 +962,8 @@ test_config.dummy_include2"""
config.include(include)
self.assertTrue(stack[0] is config.registry)
- def test_action_branching_kw_is_None(self):
- config = self._makeOne(autocommit=True)
- self.assertEqual(config.action('discrim'), None)
-
- def test_action_branching_kw_is_not_None(self):
- config = self._makeOne(autocommit=True)
- self.assertEqual(config.action('discrim', kw={'a': 1}), None)
-
- def test_action_autocommit_with_introspectables(self):
- from pyramid.config.util import ActionInfo
-
- config = self._makeOne(autocommit=True)
- intr = DummyIntrospectable()
- config.action('discrim', introspectables=(intr,))
- self.assertEqual(len(intr.registered), 1)
- self.assertEqual(intr.registered[0][0], config.introspector)
- self.assertEqual(intr.registered[0][1].__class__, ActionInfo)
-
- def test_action_autocommit_with_introspectables_introspection_off(self):
- config = self._makeOne(autocommit=True)
- config.introspection = False
- intr = DummyIntrospectable()
- config.action('discrim', introspectables=(intr,))
- self.assertEqual(len(intr.registered), 0)
-
- def test_action_branching_nonautocommit_with_config_info(self):
- config = self._makeOne(autocommit=False)
- config.info = 'abc'
- state = DummyActionState()
- state.autocommit = False
- config.action_state = state
- config.action('discrim', kw={'a': 1})
- self.assertEqual(
- state.actions,
- [
- (
- (),
- {
- 'args': (),
- 'callable': None,
- 'discriminator': 'discrim',
- 'includepath': (),
- 'info': 'abc',
- 'introspectables': (),
- 'kw': {'a': 1},
- 'order': 0,
- },
- )
- ],
- )
-
- def test_action_branching_nonautocommit_without_config_info(self):
- config = self._makeOne(autocommit=False)
- config.info = ''
- config._ainfo = ['z']
- state = DummyActionState()
- config.action_state = state
- state.autocommit = False
- config.action('discrim', kw={'a': 1})
- self.assertEqual(
- state.actions,
- [
- (
- (),
- {
- 'args': (),
- 'callable': None,
- 'discriminator': 'discrim',
- 'includepath': (),
- 'info': 'z',
- 'introspectables': (),
- 'kw': {'a': 1},
- 'order': 0,
- },
- )
- ],
- )
-
- def test_action_branching_nonautocommit_with_introspectables(self):
- config = self._makeOne(autocommit=False)
- config.info = ''
- config._ainfo = []
- state = DummyActionState()
- config.action_state = state
- state.autocommit = False
- intr = DummyIntrospectable()
- config.action('discrim', introspectables=(intr,))
- self.assertEqual(state.actions[0][1]['introspectables'], (intr,))
-
- def test_action_nonautocommit_with_introspectables_introspection_off(self):
- config = self._makeOne(autocommit=False)
- config.info = ''
- config._ainfo = []
- config.introspection = False
- state = DummyActionState()
- config.action_state = state
- state.autocommit = False
- intr = DummyIntrospectable()
- config.action('discrim', introspectables=(intr,))
- self.assertEqual(state.actions[0][1]['introspectables'], ())
-
def test_scan_integration(self):
from zope.interface import alsoProvides
- from pyramid.interfaces import IRequest
from pyramid.view import render_view_to_response
import tests.test_config.pkgs.scannable as package
@@ -1185,7 +1070,6 @@ test_config.dummy_include2"""
def test_scan_integration_with_ignore(self):
from zope.interface import alsoProvides
- from pyramid.interfaces import IRequest
from pyramid.view import render_view_to_response
import tests.test_config.pkgs.scannable as package
@@ -1207,7 +1091,6 @@ test_config.dummy_include2"""
def test_scan_integration_dottedname_package(self):
from zope.interface import alsoProvides
- from pyramid.interfaces import IRequest
from pyramid.view import render_view_to_response
config = self._makeOne(autocommit=True)
@@ -1306,149 +1189,6 @@ test_config.dummy_include2"""
finally:
getSiteManager.reset()
- def test_commit_conflict_simple(self):
- config = self._makeOne()
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- config.add_view(view1)
- config.add_view(view2)
- self.assertRaises(ConfigurationConflictError, config.commit)
-
- def test_commit_conflict_resolved_with_include(self):
- config = self._makeOne()
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- def includeme(config):
- config.add_view(view2)
-
- config.add_view(view1)
- config.include(includeme)
- config.commit()
- registeredview = self._getViewCallable(config)
- self.assertEqual(registeredview.__name__, 'view1')
-
- def test_commit_conflict_with_two_includes(self):
- config = self._makeOne()
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- def includeme1(config):
- config.add_view(view1)
-
- def includeme2(config):
- config.add_view(view2)
-
- config.include(includeme1)
- config.include(includeme2)
- try:
- config.commit()
- except ConfigurationConflictError as why:
- c1, c2 = _conflictFunctions(why)
- self.assertEqual(c1, 'includeme1')
- self.assertEqual(c2, 'includeme2')
- else: # pragma: no cover
- raise AssertionError
-
- def test_commit_conflict_resolved_with_two_includes_and_local(self):
- config = self._makeOne()
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- def view3(request): # pragma: no cover
- pass
-
- def includeme1(config):
- config.add_view(view1)
-
- def includeme2(config):
- config.add_view(view2)
-
- config.include(includeme1)
- config.include(includeme2)
- config.add_view(view3)
- config.commit()
- registeredview = self._getViewCallable(config)
- self.assertEqual(registeredview.__name__, 'view3')
-
- def test_autocommit_no_conflicts(self):
- from pyramid.renderers import null_renderer
-
- config = self._makeOne(autocommit=True)
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- def view3(request): # pragma: no cover
- pass
-
- config.add_view(view1, renderer=null_renderer)
- config.add_view(view2, renderer=null_renderer)
- config.add_view(view3, renderer=null_renderer)
- config.commit()
- registeredview = self._getViewCallable(config)
- self.assertEqual(registeredview.__name__, 'view3')
-
- def test_conflict_set_notfound_view(self):
- config = self._makeOne()
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- config.set_notfound_view(view1)
- config.set_notfound_view(view2)
- try:
- config.commit()
- except ConfigurationConflictError as why:
- c1, c2 = _conflictFunctions(why)
- self.assertEqual(c1, 'test_conflict_set_notfound_view')
- self.assertEqual(c2, 'test_conflict_set_notfound_view')
- else: # pragma: no cover
- raise AssertionError
-
- def test_conflict_set_forbidden_view(self):
- config = self._makeOne()
-
- def view1(request): # pragma: no cover
- pass
-
- def view2(request): # pragma: no cover
- pass
-
- config.set_forbidden_view(view1)
- config.set_forbidden_view(view2)
- try:
- config.commit()
- except ConfigurationConflictError as why:
- c1, c2 = _conflictFunctions(why)
- self.assertEqual(c1, 'test_conflict_set_forbidden_view')
- self.assertEqual(c2, 'test_conflict_set_forbidden_view')
- else: # pragma: no cover
- raise AssertionError
-
def test___getattr__missing_when_directives_exist(self):
config = self._makeOne()
directives = {}
@@ -1624,749 +1364,6 @@ class TestConfigurator__add_predicate(unittest.TestCase):
)
-class TestActionState(unittest.TestCase):
- def _makeOne(self):
- from pyramid.config import ActionState
-
- return ActionState()
-
- def test_it(self):
- c = self._makeOne()
- self.assertEqual(c.actions, [])
-
- def test_action_simple(self):
- from . import dummyfactory as f
-
- c = self._makeOne()
- c.actions = []
- c.action(1, f, (1,), {'x': 1})
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (1,),
- 'callable': f,
- 'discriminator': 1,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- 'kw': {'x': 1},
- 'order': 0,
- }
- ],
- )
- c.action(None)
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (1,),
- 'callable': f,
- 'discriminator': 1,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- 'kw': {'x': 1},
- 'order': 0,
- },
- {
- 'args': (),
- 'callable': None,
- 'discriminator': None,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- 'kw': {},
- 'order': 0,
- },
- ],
- )
-
- def test_action_with_includepath(self):
- c = self._makeOne()
- c.actions = []
- c.action(None, includepath=('abc',))
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (),
- 'callable': None,
- 'discriminator': None,
- 'includepath': ('abc',),
- 'info': None,
- 'introspectables': (),
- 'kw': {},
- 'order': 0,
- }
- ],
- )
-
- def test_action_with_info(self):
- c = self._makeOne()
- c.action(None, info='abc')
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (),
- 'callable': None,
- 'discriminator': None,
- 'includepath': (),
- 'info': 'abc',
- 'introspectables': (),
- 'kw': {},
- 'order': 0,
- }
- ],
- )
-
- def test_action_with_includepath_and_info(self):
- c = self._makeOne()
- c.action(None, includepath=('spec',), info='bleh')
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (),
- 'callable': None,
- 'discriminator': None,
- 'includepath': ('spec',),
- 'info': 'bleh',
- 'introspectables': (),
- 'kw': {},
- 'order': 0,
- }
- ],
- )
-
- def test_action_with_order(self):
- c = self._makeOne()
- c.actions = []
- c.action(None, order=99999)
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (),
- 'callable': None,
- 'discriminator': None,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- 'kw': {},
- 'order': 99999,
- }
- ],
- )
-
- def test_action_with_introspectables(self):
- c = self._makeOne()
- c.actions = []
- intr = DummyIntrospectable()
- c.action(None, introspectables=(intr,))
- self.assertEqual(
- c.actions,
- [
- {
- 'args': (),
- 'callable': None,
- 'discriminator': None,
- 'includepath': (),
- 'info': None,
- 'introspectables': (intr,),
- 'kw': {},
- 'order': 0,
- }
- ],
- )
-
- def test_processSpec(self):
- c = self._makeOne()
- self.assertTrue(c.processSpec('spec'))
- self.assertFalse(c.processSpec('spec'))
-
- def test_execute_actions_tuples(self):
- output = []
-
- def f(*a, **k):
- output.append((a, k))
-
- c = self._makeOne()
- c.actions = [
- (1, f, (1,)),
- (1, f, (11,), {}, ('x',)),
- (2, f, (2,)),
- (None, None),
- ]
- c.execute_actions()
- self.assertEqual(output, [((1,), {}), ((2,), {})])
-
- def test_execute_actions_dicts(self):
- output = []
-
- def f(*a, **k):
- output.append((a, k))
-
- c = self._makeOne()
- c.actions = [
- {
- 'discriminator': 1,
- 'callable': f,
- 'args': (1,),
- 'kw': {},
- 'order': 0,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- },
- {
- 'discriminator': 1,
- 'callable': f,
- 'args': (11,),
- 'kw': {},
- 'includepath': ('x',),
- 'order': 0,
- 'info': None,
- 'introspectables': (),
- },
- {
- 'discriminator': 2,
- 'callable': f,
- 'args': (2,),
- 'kw': {},
- 'order': 0,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- },
- {
- 'discriminator': None,
- 'callable': None,
- 'args': (),
- 'kw': {},
- 'order': 0,
- 'includepath': (),
- 'info': None,
- 'introspectables': (),
- },
- ]
- c.execute_actions()
- self.assertEqual(output, [((1,), {}), ((2,), {})])
-
- def test_execute_actions_with_introspectables(self):
- output = []
-
- def f(*a, **k):
- output.append((a, k))
-
- c = self._makeOne()
- intr = DummyIntrospectable()
- c.actions = [
- {
- 'discriminator': 1,
- 'callable': f,
- 'args': (1,),
- 'kw': {},
- 'order': 0,
- 'includepath': (),
- 'info': None,
- 'introspectables': (intr,),
- }
- ]
- introspector = object()
- c.execute_actions(introspector=introspector)
- self.assertEqual(output, [((1,), {})])
- self.assertEqual(intr.registered, [(introspector, None)])
-
- def test_execute_actions_with_introspectable_no_callable(self):
- c = self._makeOne()
- intr = DummyIntrospectable()
- c.actions = [
- {
- 'discriminator': 1,
- 'callable': None,
- 'args': (1,),
- 'kw': {},
- 'order': 0,
- 'includepath': (),
- 'info': None,
- 'introspectables': (intr,),
- }
- ]
- introspector = object()
- c.execute_actions(introspector=introspector)
- self.assertEqual(intr.registered, [(introspector, None)])
-
- def test_execute_actions_error(self):
- output = []
-
- def f(*a, **k):
- output.append(('f', a, k))
-
- def bad():
- raise NotImplementedError
-
- c = self._makeOne()
- c.actions = [
- (1, f, (1,)),
- (1, f, (11,), {}, ('x',)),
- (2, f, (2,)),
- (3, bad, (), {}, (), 'oops'),
- ]
- self.assertRaises(ConfigurationExecutionError, c.execute_actions)
- self.assertEqual(output, [('f', (1,), {}), ('f', (2,), {})])
-
- def test_reentrant_action(self):
- output = []
- c = self._makeOne()
-
- def f(*a, **k):
- output.append(('f', a, k))
- c.actions.append((3, g, (8,), {}))
-
- def g(*a, **k):
- output.append(('g', a, k))
-
- c.actions = [(1, f, (1,))]
- c.execute_actions()
- self.assertEqual(output, [('f', (1,), {}), ('g', (8,), {})])
-
- def test_reentrant_action_with_deferred_discriminator(self):
- # see https://github.com/Pylons/pyramid/issues/2697
- from pyramid.registry import Deferred
-
- output = []
- c = self._makeOne()
-
- def f(*a, **k):
- output.append(('f', a, k))
- c.actions.append((4, g, (4,), {}, (), None, 2))
-
- def g(*a, **k):
- output.append(('g', a, k))
-
- def h(*a, **k):
- output.append(('h', a, k))
-
- def discrim():
- self.assertEqual(output, [('f', (1,), {}), ('g', (2,), {})])
- return 3
-
- d = Deferred(discrim)
- c.actions = [
- (d, h, (3,), {}, (), None, 1), # order 1
- (1, f, (1,)), # order 0
- (2, g, (2,)), # order 0
- ]
- c.execute_actions()
- self.assertEqual(
- output,
- [
- ('f', (1,), {}),
- ('g', (2,), {}),
- ('h', (3,), {}),
- ('g', (4,), {}),
- ],
- )
-
- def test_reentrant_action_error(self):
- from pyramid.exceptions import ConfigurationError
-
- c = self._makeOne()
-
- def f(*a, **k):
- c.actions.append((3, g, (8,), {}, (), None, -1))
-
- def g(*a, **k): # pragma: no cover
- pass
-
- c.actions = [(1, f, (1,))]
- self.assertRaises(ConfigurationError, c.execute_actions)
-
- def test_reentrant_action_without_clear(self):
- c = self._makeOne()
-
- def f(*a, **k):
- c.actions.append((3, g, (8,)))
-
- def g(*a, **k):
- pass
-
- c.actions = [(1, f, (1,))]
- c.execute_actions(clear=False)
- self.assertEqual(c.actions, [(1, f, (1,)), (3, g, (8,))])
-
- def test_executing_conflicting_action_across_orders(self):
- from pyramid.exceptions import ConfigurationConflictError
-
- c = self._makeOne()
-
- def f(*a, **k):
- pass
-
- def g(*a, **k): # pragma: no cover
- pass
-
- c.actions = [(1, f, (1,), {}, (), None, -1), (1, g, (2,))]
- self.assertRaises(ConfigurationConflictError, c.execute_actions)
-
- def test_executing_conflicting_action_across_reentrant_orders(self):
- from pyramid.exceptions import ConfigurationConflictError
-
- c = self._makeOne()
-
- def f(*a, **k):
- c.actions.append((1, g, (8,)))
-
- def g(*a, **k): # pragma: no cover
- pass
-
- c.actions = [(1, f, (1,), {}, (), None, -1)]
- self.assertRaises(ConfigurationConflictError, c.execute_actions)
-
-
-class Test_reentrant_action_functional(unittest.TestCase):
- def _makeConfigurator(self, *arg, **kw):
- from pyramid.config import Configurator
-
- config = Configurator(*arg, **kw)
- return config
-
- def test_functional(self):
- def add_auto_route(config, name, view):
- def register():
- config.add_view(route_name=name, view=view)
- config.add_route(name, '/' + name)
-
- config.action(('auto route', name), register, order=-30)
-
- config = self._makeConfigurator()
- config.add_directive('add_auto_route', add_auto_route)
-
- def my_view(request): # pragma: no cover
- return request.response
-
- config.add_auto_route('foo', my_view)
- config.commit()
- from pyramid.interfaces import IRoutesMapper
-
- mapper = config.registry.getUtility(IRoutesMapper)
- routes = mapper.get_routes()
- route = routes[0]
- self.assertEqual(len(routes), 1)
- self.assertEqual(route.name, 'foo')
- self.assertEqual(route.path, '/foo')
-
- def test_deferred_discriminator(self):
- # see https://github.com/Pylons/pyramid/issues/2697
- from pyramid.config import PHASE0_CONFIG
-
- config = self._makeConfigurator()
-
- def deriver(view, info):
- return view
-
- deriver.options = ('foo',)
- config.add_view_deriver(deriver, 'foo_view')
- # add_view uses a deferred discriminator and will fail if executed
- # prior to add_view_deriver executing its action
- config.add_view(lambda r: r.response, name='', foo=1)
-
- def dummy_action():
- # trigger a re-entrant action
- config.action(None, lambda: None)
-
- config.action(None, dummy_action, order=PHASE0_CONFIG)
- config.commit()
-
-
-class Test_resolveConflicts(unittest.TestCase):
- def _callFUT(self, actions):
- from pyramid.config import resolveConflicts
-
- return resolveConflicts(actions)
-
- def test_it_success_tuples(self):
- from . import dummyfactory as f
-
- result = self._callFUT(
- [
- (None, f),
- (1, f, (1,), {}, (), 'first'),
- (1, f, (2,), {}, ('x',), 'second'),
- (1, f, (3,), {}, ('y',), 'third'),
- (4, f, (4,), {}, ('y',), 'should be last', 99999),
- (3, f, (3,), {}, ('y',)),
- (None, f, (5,), {}, ('y',)),
- ]
- )
- result = list(result)
- self.assertEqual(
- result,
- [
- {
- 'info': None,
- 'args': (),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': None,
- 'includepath': (),
- 'order': 0,
- },
- {
- 'info': 'first',
- 'args': (1,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 1,
- 'includepath': (),
- 'order': 0,
- },
- {
- 'info': None,
- 'args': (3,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 3,
- 'includepath': ('y',),
- 'order': 0,
- },
- {
- 'info': None,
- 'args': (5,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': None,
- 'includepath': ('y',),
- 'order': 0,
- },
- {
- 'info': 'should be last',
- 'args': (4,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 4,
- 'includepath': ('y',),
- 'order': 99999,
- },
- ],
- )
-
- def test_it_success_dicts(self):
- from . import dummyfactory as f
-
- result = self._callFUT(
- [
- (None, f),
- (1, f, (1,), {}, (), 'first'),
- (1, f, (2,), {}, ('x',), 'second'),
- (1, f, (3,), {}, ('y',), 'third'),
- (4, f, (4,), {}, ('y',), 'should be last', 99999),
- (3, f, (3,), {}, ('y',)),
- (None, f, (5,), {}, ('y',)),
- ]
- )
- result = list(result)
- self.assertEqual(
- result,
- [
- {
- 'info': None,
- 'args': (),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': None,
- 'includepath': (),
- 'order': 0,
- },
- {
- 'info': 'first',
- 'args': (1,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 1,
- 'includepath': (),
- 'order': 0,
- },
- {
- 'info': None,
- 'args': (3,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 3,
- 'includepath': ('y',),
- 'order': 0,
- },
- {
- 'info': None,
- 'args': (5,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': None,
- 'includepath': ('y',),
- 'order': 0,
- },
- {
- 'info': 'should be last',
- 'args': (4,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 4,
- 'includepath': ('y',),
- 'order': 99999,
- },
- ],
- )
-
- def test_it_conflict(self):
- from . import dummyfactory as f
-
- result = self._callFUT(
- [
- (None, f),
- (1, f, (2,), {}, ('x',), 'eek'), # will conflict
- (1, f, (3,), {}, ('y',), 'ack'), # will conflict
- (4, f, (4,), {}, ('y',)),
- (3, f, (3,), {}, ('y',)),
- (None, f, (5,), {}, ('y',)),
- ]
- )
- self.assertRaises(ConfigurationConflictError, list, result)
-
- def test_it_with_actions_grouped_by_order(self):
- from . import dummyfactory as f
-
- result = self._callFUT(
- [
- (None, f), # X
- (1, f, (1,), {}, (), 'third', 10), # X
- (1, f, (2,), {}, ('x',), 'fourth', 10),
- (1, f, (3,), {}, ('y',), 'fifth', 10),
- (2, f, (1,), {}, (), 'sixth', 10), # X
- (3, f, (1,), {}, (), 'seventh', 10), # X
- (5, f, (4,), {}, ('y',), 'eighth', 99999), # X
- (4, f, (3,), {}, (), 'first', 5), # X
- (4, f, (5,), {}, ('y',), 'second', 5),
- ]
- )
- result = list(result)
- self.assertEqual(len(result), 6)
- # resolved actions should be grouped by (order, i)
- self.assertEqual(
- result,
- [
- {
- 'info': None,
- 'args': (),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': None,
- 'includepath': (),
- 'order': 0,
- },
- {
- 'info': 'first',
- 'args': (3,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 4,
- 'includepath': (),
- 'order': 5,
- },
- {
- 'info': 'third',
- 'args': (1,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 1,
- 'includepath': (),
- 'order': 10,
- },
- {
- 'info': 'sixth',
- 'args': (1,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 2,
- 'includepath': (),
- 'order': 10,
- },
- {
- 'info': 'seventh',
- 'args': (1,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 3,
- 'includepath': (),
- 'order': 10,
- },
- {
- 'info': 'eighth',
- 'args': (4,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 5,
- 'includepath': ('y',),
- 'order': 99999,
- },
- ],
- )
-
- def test_override_success_across_orders(self):
- from . import dummyfactory as f
-
- result = self._callFUT(
- [
- (1, f, (2,), {}, ('x',), 'eek', 0),
- (1, f, (3,), {}, ('x', 'y'), 'ack', 10),
- ]
- )
- result = list(result)
- self.assertEqual(
- result,
- [
- {
- 'info': 'eek',
- 'args': (2,),
- 'callable': f,
- 'introspectables': (),
- 'kw': {},
- 'discriminator': 1,
- 'includepath': ('x',),
- 'order': 0,
- }
- ],
- )
-
- def test_conflicts_across_orders(self):
- from . import dummyfactory as f
-
- result = self._callFUT(
- [
- (1, f, (2,), {}, ('x', 'y'), 'eek', 0),
- (1, f, (3,), {}, ('x'), 'ack', 10),
- ]
- )
- self.assertRaises(ConfigurationConflictError, list, result)
-
-
class TestGlobalRegistriesIntegration(unittest.TestCase):
def setUp(self):
from pyramid.config import global_registries
@@ -2430,11 +1427,6 @@ class DummyThreadLocalManager(object):
self.popped = True
-@implementer(IDummy)
-class DummyEvent:
- pass
-
-
class DummyRegistry(object):
def __init__(self, adaptation=None, util=None):
self.utilities = []
@@ -2459,35 +1451,5 @@ class DummyRegistry(object):
return self.util
-class IOther(Interface):
- pass
-
-
-def _conflictFunctions(e):
- conflicts = e._conflicts.values()
- for conflict in conflicts:
- for confinst in conflict:
- yield confinst.function
-
-
-class DummyActionState(object):
- autocommit = False
- info = ''
-
- def __init__(self):
- self.actions = []
-
- def action(self, *arg, **kw):
- self.actions.append((arg, kw))
-
-
-class DummyIntrospectable(object):
- def __init__(self):
- self.registered = []
-
- def register(self, introspector, action_info):
- self.registered.append((introspector, action_info))
-
-
class DummyPredicate(object):
pass
diff --git a/tests/test_config/test_util.py b/tests/test_config/test_predicates.py
index 50d143b2d..079652b39 100644
--- a/tests/test_config/test_util.py
+++ b/tests/test_config/test_predicates.py
@@ -3,44 +3,9 @@ import unittest
from pyramid.compat import text_
-class TestActionInfo(unittest.TestCase):
- def _getTargetClass(self):
- from pyramid.config.util import ActionInfo
-
- return ActionInfo
-
- def _makeOne(self, filename, lineno, function, linerepr):
- return self._getTargetClass()(filename, lineno, function, linerepr)
-
- def test_class_conforms(self):
- from zope.interface.verify import verifyClass
- from pyramid.interfaces import IActionInfo
-
- verifyClass(IActionInfo, self._getTargetClass())
-
- def test_instance_conforms(self):
- from zope.interface.verify import verifyObject
- from pyramid.interfaces import IActionInfo
-
- verifyObject(IActionInfo, self._makeOne('f', 0, 'f', 'f'))
-
- def test_ctor(self):
- inst = self._makeOne('filename', 10, 'function', 'src')
- self.assertEqual(inst.file, 'filename')
- self.assertEqual(inst.line, 10)
- self.assertEqual(inst.function, 'function')
- self.assertEqual(inst.src, 'src')
-
- def test___str__(self):
- inst = self._makeOne('filename', 0, 'function', ' linerepr ')
- self.assertEqual(
- str(inst), "Line 0 of file filename:\n linerepr "
- )
-
-
class TestPredicateList(unittest.TestCase):
def _makeOne(self):
- from pyramid.config.util import PredicateList
+ from pyramid.config.predicates import PredicateList
from pyramid import predicates
inst = PredicateList()
@@ -71,7 +36,7 @@ class TestPredicateList(unittest.TestCase):
self.assertTrue(order1 < order2)
def test_ordering_number_of_predicates(self):
- from pyramid.config.util import predvalseq
+ from pyramid.config.predicates import predvalseq
order1, _, _ = self._callFUT(
xhr='xhr',
@@ -169,7 +134,7 @@ class TestPredicateList(unittest.TestCase):
self.assertTrue(order12 > order10)
def test_ordering_importance_of_predicates(self):
- from pyramid.config.util import predvalseq
+ from pyramid.config.predicates import predvalseq
order1, _, _ = self._callFUT(xhr='xhr')
order2, _, _ = self._callFUT(request_method='request_method')
@@ -194,7 +159,7 @@ class TestPredicateList(unittest.TestCase):
self.assertTrue(order9 > order10)
def test_ordering_importance_and_number(self):
- from pyramid.config.util import predvalseq
+ from pyramid.config.predicates import predvalseq
order1, _, _ = self._callFUT(
xhr='xhr', request_method='request_method'
@@ -233,7 +198,7 @@ class TestPredicateList(unittest.TestCase):
self.assertTrue(order1 > order2)
def test_different_custom_predicates_with_same_hash(self):
- from pyramid.config.util import predvalseq
+ from pyramid.config.predicates import predvalseq
class PredicateWithHash(object):
def __hash__(self):
@@ -286,7 +251,7 @@ class TestPredicateList(unittest.TestCase):
)
def test_custom_predicates_can_affect_traversal(self):
- from pyramid.config.util import predvalseq
+ from pyramid.config.predicates import predvalseq
def custom(info, request):
m = info['match']
@@ -312,7 +277,7 @@ class TestPredicateList(unittest.TestCase):
)
def test_predicate_text_is_correct(self):
- from pyramid.config.util import predvalseq
+ from pyramid.config.predicates import predvalseq
_, predicates, _ = self._callFUT(
xhr='xhr',
@@ -420,20 +385,9 @@ class TestPredicateList(unittest.TestCase):
self.assertEqual(predicates[2](None, request), True)
-class TestDeprecatedPredicates(unittest.TestCase):
- def test_it(self):
- import warnings
-
- with warnings.catch_warnings(record=True) as w:
- warnings.filterwarnings('always')
- from pyramid.config.predicates import XHRPredicate # noqa: F401
-
- self.assertEqual(len(w), 1)
-
-
class Test_sort_accept_offers(unittest.TestCase):
def _callFUT(self, offers, order=None):
- from pyramid.config.util import sort_accept_offers
+ from pyramid.config.predicates import sort_accept_offers
return sort_accept_offers(offers, order)
diff --git a/tests/test_config/test_testing.py b/tests/test_config/test_testing.py
index ce6659667..ede31e1b6 100644
--- a/tests/test_config/test_testing.py
+++ b/tests/test_config/test_testing.py
@@ -104,7 +104,7 @@ class TestingConfiguratorMixinTests(unittest.TestCase):
def test_testing_add_subscriber_dottedname(self):
config = self._makeOne(autocommit=True)
- L = config.testing_add_subscriber('tests.test_config.test_init.IDummy')
+ L = config.testing_add_subscriber('tests.test_config.IDummy')
event = DummyEvent()
config.registry.notify(event)
self.assertEqual(len(L), 1)
diff --git a/tests/test_config/test_views.py b/tests/test_config/test_views.py
index 977944fdd..5e722c9a0 100644
--- a/tests/test_config/test_views.py
+++ b/tests/test_config/test_views.py
@@ -664,7 +664,7 @@ class TestViewsConfigurationMixin(unittest.TestCase):
self.assertEqual(wrapper(None, request), 'OK')
def test_add_view_default_phash_overrides_default_phash(self):
- from pyramid.config.util import DEFAULT_PHASH
+ from pyramid.config.predicates import DEFAULT_PHASH
from pyramid.renderers import null_renderer
from zope.interface import Interface
from pyramid.interfaces import IRequest
@@ -690,7 +690,7 @@ class TestViewsConfigurationMixin(unittest.TestCase):
self.assertEqual(wrapper(None, request), 'OK')
def test_add_view_exc_default_phash_overrides_default_phash(self):
- from pyramid.config.util import DEFAULT_PHASH
+ from pyramid.config.predicates import DEFAULT_PHASH
from pyramid.renderers import null_renderer
from zope.interface import implementedBy
from pyramid.interfaces import IRequest
diff --git a/tests/test_viewderivers.py b/tests/test_viewderivers.py
index a1455ed92..f01cb490e 100644
--- a/tests/test_viewderivers.py
+++ b/tests/test_viewderivers.py
@@ -1265,7 +1265,7 @@ class TestDeriveView(unittest.TestCase):
self.assertEqual(result(None, None), response)
def test_attr_wrapped_view_branching_default_phash(self):
- from pyramid.config.util import DEFAULT_PHASH
+ from pyramid.config.predicates import DEFAULT_PHASH
def view(context, request): # pragma: no cover
pass