summaryrefslogtreecommitdiff
path: root/tests/test_config/test_actions.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_config/test_actions.py')
-rw-r--r--tests/test_config/test_actions.py1055
1 files changed, 1055 insertions, 0 deletions
diff --git a/tests/test_config/test_actions.py b/tests/test_config/test_actions.py
new file mode 100644
index 000000000..ba8b8c124
--- /dev/null
+++ b/tests/test_config/test_actions.py
@@ -0,0 +1,1055 @@
+import unittest
+
+from pyramid.exceptions import ConfigurationConflictError
+from pyramid.exceptions import ConfigurationExecutionError
+
+from pyramid.interfaces import IRequest
+
+
+class ConfiguratorTests(unittest.TestCase):
+ def _makeOne(self, *arg, **kw):
+ from pyramid.config import Configurator
+
+ config = Configurator(*arg, **kw)
+ return config
+
+ def _getViewCallable(
+ self,
+ config,
+ ctx_iface=None,
+ request_iface=None,
+ name='',
+ exception_view=False,
+ ):
+ from zope.interface import Interface
+ from pyramid.interfaces import IView
+ from pyramid.interfaces import IViewClassifier
+ from pyramid.interfaces import IExceptionViewClassifier
+
+ if exception_view: # pragma: no cover
+ classifier = IExceptionViewClassifier
+ else:
+ classifier = IViewClassifier
+ if ctx_iface is None:
+ ctx_iface = Interface
+ if request_iface is None:
+ request_iface = IRequest
+ return config.registry.adapters.lookup(
+ (classifier, request_iface, ctx_iface),
+ IView,
+ name=name,
+ default=None,
+ )
+
+ def test_action_branching_kw_is_None(self):
+ config = self._makeOne(autocommit=True)
+ self.assertEqual(config.action('discrim'), None)
+
+ def test_action_branching_kw_is_not_None(self):
+ config = self._makeOne(autocommit=True)
+ self.assertEqual(config.action('discrim', kw={'a': 1}), None)
+
+ def test_action_autocommit_with_introspectables(self):
+ from pyramid.config.util import ActionInfo
+
+ config = self._makeOne(autocommit=True)
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(len(intr.registered), 1)
+ self.assertEqual(intr.registered[0][0], config.introspector)
+ self.assertEqual(intr.registered[0][1].__class__, ActionInfo)
+
+ def test_action_autocommit_with_introspectables_introspection_off(self):
+ config = self._makeOne(autocommit=True)
+ config.introspection = False
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(len(intr.registered), 0)
+
+ def test_action_branching_nonautocommit_with_config_info(self):
+ config = self._makeOne(autocommit=False)
+ config.info = 'abc'
+ state = DummyActionState()
+ state.autocommit = False
+ config.action_state = state
+ config.action('discrim', kw={'a': 1})
+ self.assertEqual(
+ state.actions,
+ [
+ (
+ (),
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': 'discrim',
+ 'includepath': (),
+ 'info': 'abc',
+ 'introspectables': (),
+ 'kw': {'a': 1},
+ 'order': 0,
+ },
+ )
+ ],
+ )
+
+ def test_action_branching_nonautocommit_without_config_info(self):
+ config = self._makeOne(autocommit=False)
+ config.info = ''
+ config._ainfo = ['z']
+ state = DummyActionState()
+ config.action_state = state
+ state.autocommit = False
+ config.action('discrim', kw={'a': 1})
+ self.assertEqual(
+ state.actions,
+ [
+ (
+ (),
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': 'discrim',
+ 'includepath': (),
+ 'info': 'z',
+ 'introspectables': (),
+ 'kw': {'a': 1},
+ 'order': 0,
+ },
+ )
+ ],
+ )
+
+ def test_action_branching_nonautocommit_with_introspectables(self):
+ config = self._makeOne(autocommit=False)
+ config.info = ''
+ config._ainfo = []
+ state = DummyActionState()
+ config.action_state = state
+ state.autocommit = False
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(state.actions[0][1]['introspectables'], (intr,))
+
+ def test_action_nonautocommit_with_introspectables_introspection_off(self):
+ config = self._makeOne(autocommit=False)
+ config.info = ''
+ config._ainfo = []
+ config.introspection = False
+ state = DummyActionState()
+ config.action_state = state
+ state.autocommit = False
+ intr = DummyIntrospectable()
+ config.action('discrim', introspectables=(intr,))
+ self.assertEqual(state.actions[0][1]['introspectables'], ())
+
+ def test_commit_conflict_simple(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ config.add_view(view1)
+ config.add_view(view2)
+ self.assertRaises(ConfigurationConflictError, config.commit)
+
+ def test_commit_conflict_resolved_with_include(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def includeme(config):
+ config.add_view(view2)
+
+ config.add_view(view1)
+ config.include(includeme)
+ config.commit()
+ registeredview = self._getViewCallable(config)
+ self.assertEqual(registeredview.__name__, 'view1')
+
+ def test_commit_conflict_with_two_includes(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def includeme1(config):
+ config.add_view(view1)
+
+ def includeme2(config):
+ config.add_view(view2)
+
+ config.include(includeme1)
+ config.include(includeme2)
+ try:
+ config.commit()
+ except ConfigurationConflictError as why:
+ c1, c2 = _conflictFunctions(why)
+ self.assertEqual(c1, 'includeme1')
+ self.assertEqual(c2, 'includeme2')
+ else: # pragma: no cover
+ raise AssertionError
+
+ def test_commit_conflict_resolved_with_two_includes_and_local(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def view3(request): # pragma: no cover
+ pass
+
+ def includeme1(config):
+ config.add_view(view1)
+
+ def includeme2(config):
+ config.add_view(view2)
+
+ config.include(includeme1)
+ config.include(includeme2)
+ config.add_view(view3)
+ config.commit()
+ registeredview = self._getViewCallable(config)
+ self.assertEqual(registeredview.__name__, 'view3')
+
+ def test_autocommit_no_conflicts(self):
+ from pyramid.renderers import null_renderer
+
+ config = self._makeOne(autocommit=True)
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ def view3(request): # pragma: no cover
+ pass
+
+ config.add_view(view1, renderer=null_renderer)
+ config.add_view(view2, renderer=null_renderer)
+ config.add_view(view3, renderer=null_renderer)
+ config.commit()
+ registeredview = self._getViewCallable(config)
+ self.assertEqual(registeredview.__name__, 'view3')
+
+ def test_conflict_set_notfound_view(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ config.set_notfound_view(view1)
+ config.set_notfound_view(view2)
+ try:
+ config.commit()
+ except ConfigurationConflictError as why:
+ c1, c2 = _conflictFunctions(why)
+ self.assertEqual(c1, 'test_conflict_set_notfound_view')
+ self.assertEqual(c2, 'test_conflict_set_notfound_view')
+ else: # pragma: no cover
+ raise AssertionError
+
+ def test_conflict_set_forbidden_view(self):
+ config = self._makeOne()
+
+ def view1(request): # pragma: no cover
+ pass
+
+ def view2(request): # pragma: no cover
+ pass
+
+ config.set_forbidden_view(view1)
+ config.set_forbidden_view(view2)
+ try:
+ config.commit()
+ except ConfigurationConflictError as why:
+ c1, c2 = _conflictFunctions(why)
+ self.assertEqual(c1, 'test_conflict_set_forbidden_view')
+ self.assertEqual(c2, 'test_conflict_set_forbidden_view')
+ else: # pragma: no cover
+ raise AssertionError
+
+
+class TestActionState(unittest.TestCase):
+ def _makeOne(self):
+ from pyramid.config.actions import ActionState
+
+ return ActionState()
+
+ def test_it(self):
+ c = self._makeOne()
+ self.assertEqual(c.actions, [])
+
+ def test_action_simple(self):
+ from . import dummyfactory as f
+
+ c = self._makeOne()
+ c.actions = []
+ c.action(1, f, (1,), {'x': 1})
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (1,),
+ 'callable': f,
+ 'discriminator': 1,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {'x': 1},
+ 'order': 0,
+ }
+ ],
+ )
+ c.action(None)
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (1,),
+ 'callable': f,
+ 'discriminator': 1,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {'x': 1},
+ 'order': 0,
+ },
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ },
+ ],
+ )
+
+ def test_action_with_includepath(self):
+ c = self._makeOne()
+ c.actions = []
+ c.action(None, includepath=('abc',))
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': ('abc',),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_action_with_info(self):
+ c = self._makeOne()
+ c.action(None, info='abc')
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': 'abc',
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_action_with_includepath_and_info(self):
+ c = self._makeOne()
+ c.action(None, includepath=('spec',), info='bleh')
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': ('spec',),
+ 'info': 'bleh',
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_action_with_order(self):
+ c = self._makeOne()
+ c.actions = []
+ c.action(None, order=99999)
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ 'kw': {},
+ 'order': 99999,
+ }
+ ],
+ )
+
+ def test_action_with_introspectables(self):
+ c = self._makeOne()
+ c.actions = []
+ intr = DummyIntrospectable()
+ c.action(None, introspectables=(intr,))
+ self.assertEqual(
+ c.actions,
+ [
+ {
+ 'args': (),
+ 'callable': None,
+ 'discriminator': None,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (intr,),
+ 'kw': {},
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_processSpec(self):
+ c = self._makeOne()
+ self.assertTrue(c.processSpec('spec'))
+ self.assertFalse(c.processSpec('spec'))
+
+ def test_execute_actions_tuples(self):
+ output = []
+
+ def f(*a, **k):
+ output.append((a, k))
+
+ c = self._makeOne()
+ c.actions = [
+ (1, f, (1,)),
+ (1, f, (11,), {}, ('x',)),
+ (2, f, (2,)),
+ (None, None),
+ ]
+ c.execute_actions()
+ self.assertEqual(output, [((1,), {}), ((2,), {})])
+
+ def test_execute_actions_dicts(self):
+ output = []
+
+ def f(*a, **k):
+ output.append((a, k))
+
+ c = self._makeOne()
+ c.actions = [
+ {
+ 'discriminator': 1,
+ 'callable': f,
+ 'args': (1,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ },
+ {
+ 'discriminator': 1,
+ 'callable': f,
+ 'args': (11,),
+ 'kw': {},
+ 'includepath': ('x',),
+ 'order': 0,
+ 'info': None,
+ 'introspectables': (),
+ },
+ {
+ 'discriminator': 2,
+ 'callable': f,
+ 'args': (2,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ },
+ {
+ 'discriminator': None,
+ 'callable': None,
+ 'args': (),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (),
+ },
+ ]
+ c.execute_actions()
+ self.assertEqual(output, [((1,), {}), ((2,), {})])
+
+ def test_execute_actions_with_introspectables(self):
+ output = []
+
+ def f(*a, **k):
+ output.append((a, k))
+
+ c = self._makeOne()
+ intr = DummyIntrospectable()
+ c.actions = [
+ {
+ 'discriminator': 1,
+ 'callable': f,
+ 'args': (1,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (intr,),
+ }
+ ]
+ introspector = object()
+ c.execute_actions(introspector=introspector)
+ self.assertEqual(output, [((1,), {})])
+ self.assertEqual(intr.registered, [(introspector, None)])
+
+ def test_execute_actions_with_introspectable_no_callable(self):
+ c = self._makeOne()
+ intr = DummyIntrospectable()
+ c.actions = [
+ {
+ 'discriminator': 1,
+ 'callable': None,
+ 'args': (1,),
+ 'kw': {},
+ 'order': 0,
+ 'includepath': (),
+ 'info': None,
+ 'introspectables': (intr,),
+ }
+ ]
+ introspector = object()
+ c.execute_actions(introspector=introspector)
+ self.assertEqual(intr.registered, [(introspector, None)])
+
+ def test_execute_actions_error(self):
+ output = []
+
+ def f(*a, **k):
+ output.append(('f', a, k))
+
+ def bad():
+ raise NotImplementedError
+
+ c = self._makeOne()
+ c.actions = [
+ (1, f, (1,)),
+ (1, f, (11,), {}, ('x',)),
+ (2, f, (2,)),
+ (3, bad, (), {}, (), 'oops'),
+ ]
+ self.assertRaises(ConfigurationExecutionError, c.execute_actions)
+ self.assertEqual(output, [('f', (1,), {}), ('f', (2,), {})])
+
+ def test_reentrant_action(self):
+ output = []
+ c = self._makeOne()
+
+ def f(*a, **k):
+ output.append(('f', a, k))
+ c.actions.append((3, g, (8,), {}))
+
+ def g(*a, **k):
+ output.append(('g', a, k))
+
+ c.actions = [(1, f, (1,))]
+ c.execute_actions()
+ self.assertEqual(output, [('f', (1,), {}), ('g', (8,), {})])
+
+ def test_reentrant_action_with_deferred_discriminator(self):
+ # see https://github.com/Pylons/pyramid/issues/2697
+ from pyramid.registry import Deferred
+
+ output = []
+ c = self._makeOne()
+
+ def f(*a, **k):
+ output.append(('f', a, k))
+ c.actions.append((4, g, (4,), {}, (), None, 2))
+
+ def g(*a, **k):
+ output.append(('g', a, k))
+
+ def h(*a, **k):
+ output.append(('h', a, k))
+
+ def discrim():
+ self.assertEqual(output, [('f', (1,), {}), ('g', (2,), {})])
+ return 3
+
+ d = Deferred(discrim)
+ c.actions = [
+ (d, h, (3,), {}, (), None, 1), # order 1
+ (1, f, (1,)), # order 0
+ (2, g, (2,)), # order 0
+ ]
+ c.execute_actions()
+ self.assertEqual(
+ output,
+ [
+ ('f', (1,), {}),
+ ('g', (2,), {}),
+ ('h', (3,), {}),
+ ('g', (4,), {}),
+ ],
+ )
+
+ def test_reentrant_action_error(self):
+ from pyramid.exceptions import ConfigurationError
+
+ c = self._makeOne()
+
+ def f(*a, **k):
+ c.actions.append((3, g, (8,), {}, (), None, -1))
+
+ def g(*a, **k): # pragma: no cover
+ pass
+
+ c.actions = [(1, f, (1,))]
+ self.assertRaises(ConfigurationError, c.execute_actions)
+
+ def test_reentrant_action_without_clear(self):
+ c = self._makeOne()
+
+ def f(*a, **k):
+ c.actions.append((3, g, (8,)))
+
+ def g(*a, **k):
+ pass
+
+ c.actions = [(1, f, (1,))]
+ c.execute_actions(clear=False)
+ self.assertEqual(c.actions, [(1, f, (1,)), (3, g, (8,))])
+
+ def test_executing_conflicting_action_across_orders(self):
+ from pyramid.exceptions import ConfigurationConflictError
+
+ c = self._makeOne()
+
+ def f(*a, **k):
+ pass
+
+ def g(*a, **k): # pragma: no cover
+ pass
+
+ c.actions = [(1, f, (1,), {}, (), None, -1), (1, g, (2,))]
+ self.assertRaises(ConfigurationConflictError, c.execute_actions)
+
+ def test_executing_conflicting_action_across_reentrant_orders(self):
+ from pyramid.exceptions import ConfigurationConflictError
+
+ c = self._makeOne()
+
+ def f(*a, **k):
+ c.actions.append((1, g, (8,)))
+
+ def g(*a, **k): # pragma: no cover
+ pass
+
+ c.actions = [(1, f, (1,), {}, (), None, -1)]
+ self.assertRaises(ConfigurationConflictError, c.execute_actions)
+
+
+class Test_reentrant_action_functional(unittest.TestCase):
+ def _makeConfigurator(self, *arg, **kw):
+ from pyramid.config import Configurator
+
+ config = Configurator(*arg, **kw)
+ return config
+
+ def test_functional(self):
+ def add_auto_route(config, name, view):
+ def register():
+ config.add_view(route_name=name, view=view)
+ config.add_route(name, '/' + name)
+
+ config.action(('auto route', name), register, order=-30)
+
+ config = self._makeConfigurator()
+ config.add_directive('add_auto_route', add_auto_route)
+
+ def my_view(request): # pragma: no cover
+ return request.response
+
+ config.add_auto_route('foo', my_view)
+ config.commit()
+ from pyramid.interfaces import IRoutesMapper
+
+ mapper = config.registry.getUtility(IRoutesMapper)
+ routes = mapper.get_routes()
+ route = routes[0]
+ self.assertEqual(len(routes), 1)
+ self.assertEqual(route.name, 'foo')
+ self.assertEqual(route.path, '/foo')
+
+ def test_deferred_discriminator(self):
+ # see https://github.com/Pylons/pyramid/issues/2697
+ from pyramid.config import PHASE0_CONFIG
+
+ config = self._makeConfigurator()
+
+ def deriver(view, info):
+ return view
+
+ deriver.options = ('foo',)
+ config.add_view_deriver(deriver, 'foo_view')
+ # add_view uses a deferred discriminator and will fail if executed
+ # prior to add_view_deriver executing its action
+ config.add_view(lambda r: r.response, name='', foo=1)
+
+ def dummy_action():
+ # trigger a re-entrant action
+ config.action(None, lambda: None)
+
+ config.action(None, dummy_action, order=PHASE0_CONFIG)
+ config.commit()
+
+
+class Test_resolveConflicts(unittest.TestCase):
+ def _callFUT(self, actions):
+ from pyramid.config.actions import resolveConflicts
+
+ return resolveConflicts(actions)
+
+ def test_it_success_tuples(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f),
+ (1, f, (1,), {}, (), 'first'),
+ (1, f, (2,), {}, ('x',), 'second'),
+ (1, f, (3,), {}, ('y',), 'third'),
+ (4, f, (4,), {}, ('y',), 'should be last', 99999),
+ (3, f, (3,), {}, ('y',)),
+ (None, f, (5,), {}, ('y',)),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': None,
+ 'args': (),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': 'first',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (3,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 3,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (5,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': 'should be last',
+ 'args': (4,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 4,
+ 'includepath': ('y',),
+ 'order': 99999,
+ },
+ ],
+ )
+
+ def test_it_success_dicts(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f),
+ (1, f, (1,), {}, (), 'first'),
+ (1, f, (2,), {}, ('x',), 'second'),
+ (1, f, (3,), {}, ('y',), 'third'),
+ (4, f, (4,), {}, ('y',), 'should be last', 99999),
+ (3, f, (3,), {}, ('y',)),
+ (None, f, (5,), {}, ('y',)),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': None,
+ 'args': (),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': 'first',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (3,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 3,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': None,
+ 'args': (5,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': ('y',),
+ 'order': 0,
+ },
+ {
+ 'info': 'should be last',
+ 'args': (4,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 4,
+ 'includepath': ('y',),
+ 'order': 99999,
+ },
+ ],
+ )
+
+ def test_it_conflict(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f),
+ (1, f, (2,), {}, ('x',), 'eek'), # will conflict
+ (1, f, (3,), {}, ('y',), 'ack'), # will conflict
+ (4, f, (4,), {}, ('y',)),
+ (3, f, (3,), {}, ('y',)),
+ (None, f, (5,), {}, ('y',)),
+ ]
+ )
+ self.assertRaises(ConfigurationConflictError, list, result)
+
+ def test_it_with_actions_grouped_by_order(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (None, f), # X
+ (1, f, (1,), {}, (), 'third', 10), # X
+ (1, f, (2,), {}, ('x',), 'fourth', 10),
+ (1, f, (3,), {}, ('y',), 'fifth', 10),
+ (2, f, (1,), {}, (), 'sixth', 10), # X
+ (3, f, (1,), {}, (), 'seventh', 10), # X
+ (5, f, (4,), {}, ('y',), 'eighth', 99999), # X
+ (4, f, (3,), {}, (), 'first', 5), # X
+ (4, f, (5,), {}, ('y',), 'second', 5),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(len(result), 6)
+ # resolved actions should be grouped by (order, i)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': None,
+ 'args': (),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': None,
+ 'includepath': (),
+ 'order': 0,
+ },
+ {
+ 'info': 'first',
+ 'args': (3,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 4,
+ 'includepath': (),
+ 'order': 5,
+ },
+ {
+ 'info': 'third',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': (),
+ 'order': 10,
+ },
+ {
+ 'info': 'sixth',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 2,
+ 'includepath': (),
+ 'order': 10,
+ },
+ {
+ 'info': 'seventh',
+ 'args': (1,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 3,
+ 'includepath': (),
+ 'order': 10,
+ },
+ {
+ 'info': 'eighth',
+ 'args': (4,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 5,
+ 'includepath': ('y',),
+ 'order': 99999,
+ },
+ ],
+ )
+
+ def test_override_success_across_orders(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (1, f, (2,), {}, ('x',), 'eek', 0),
+ (1, f, (3,), {}, ('x', 'y'), 'ack', 10),
+ ]
+ )
+ result = list(result)
+ self.assertEqual(
+ result,
+ [
+ {
+ 'info': 'eek',
+ 'args': (2,),
+ 'callable': f,
+ 'introspectables': (),
+ 'kw': {},
+ 'discriminator': 1,
+ 'includepath': ('x',),
+ 'order': 0,
+ }
+ ],
+ )
+
+ def test_conflicts_across_orders(self):
+ from . import dummyfactory as f
+
+ result = self._callFUT(
+ [
+ (1, f, (2,), {}, ('x', 'y'), 'eek', 0),
+ (1, f, (3,), {}, ('x'), 'ack', 10),
+ ]
+ )
+ self.assertRaises(ConfigurationConflictError, list, result)
+
+
+def _conflictFunctions(e):
+ conflicts = e._conflicts.values()
+ for conflict in conflicts:
+ for confinst in conflict:
+ yield confinst.function
+
+
+class DummyActionState(object):
+ autocommit = False
+ info = ''
+
+ def __init__(self):
+ self.actions = []
+
+ def action(self, *arg, **kw):
+ self.actions.append((arg, kw))
+
+
+class DummyIntrospectable(object):
+ def __init__(self):
+ self.registered = []
+
+ def register(self, introspector, action_info):
+ self.registered.append((introspector, action_info))