import unittest
from repoze.bfg import testing
class TestRouter(unittest.TestCase):
def setUp(self):
testing.setUp()
from repoze.bfg.threadlocal import get_current_registry
self.registry = get_current_registry()
def tearDown(self):
testing.tearDown()
def _registerRouteRequest(self, name):
from repoze.bfg.interfaces import IRouteRequest
from zope.interface import Interface
class IRequest(Interface):
""" """
self.registry.registerUtility(IRequest, IRouteRequest, name=name)
return IRequest
def _connectRoute(self, path, name, factory=None):
from repoze.bfg.interfaces import IRoutesMapper
from repoze.bfg.urldispatch import RoutesMapper
mapper = self.registry.queryUtility(IRoutesMapper)
if mapper is None:
mapper = RoutesMapper()
self.registry.registerUtility(mapper, IRoutesMapper)
mapper.connect(path, name, factory)
def _registerLogger(self):
from repoze.bfg.interfaces import IDebugLogger
logger = DummyLogger()
self.registry.registerUtility(logger, IDebugLogger)
return logger
def _registerSettings(self, **kw):
from repoze.bfg.interfaces import ISettings
settings = {'debug_authorization':False, 'debug_notfound':False}
settings.update(kw)
self.registry.registerUtility(settings, ISettings)
def _registerTraverserFactory(self, context, view_name='', subpath=None,
traversed=None, virtual_root=None,
virtual_root_path=None, **kw):
from repoze.bfg.interfaces import ITraverser
if virtual_root is None:
virtual_root = context
if subpath is None:
subpath = []
if traversed is None:
traversed = []
if virtual_root_path is None:
virtual_root_path = []
class DummyTraverserFactory:
def __init__(self, root):
self.root = root
def __call__(self, request):
values = {'root':self.root,
'context':context,
'view_name':view_name,
'subpath':subpath,
'traversed':traversed,
'virtual_root':virtual_root,
'virtual_root_path':virtual_root_path}
kw.update(values)
return kw
self.registry.registerAdapter(DummyTraverserFactory, (None,),
ITraverser, name='')
def _registerView(self, app, name, *for_):
from repoze.bfg.interfaces import IView
self.registry.registerAdapter(app, for_, IView, name)
def _registerEventListener(self, iface):
L = []
def listener(event):
L.append(event)
self.registry.registerHandler(listener, (iface,))
return L
def _registerRootFactory(self, val):
rootfactory = DummyRootFactory(val)
from repoze.bfg.interfaces import IRootFactory
self.registry.registerUtility(rootfactory, IRootFactory)
return rootfactory
def _getTargetClass(self):
from repoze.bfg.router import Router
return Router
def _makeOne(self):
klass = self._getTargetClass()
return klass(self.registry)
def _makeEnviron(self, **extras):
environ = {
'wsgi.url_scheme':'http',
'SERVER_NAME':'localhost',
'SERVER_PORT':'8080',
'REQUEST_METHOD':'GET',
'PATH_INFO':'/',
}
environ.update(extras)
return environ
def test_root_policy(self):
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
rootfactory = self._registerRootFactory('abc')
router = self._makeOne()
self.assertEqual(router.root_policy, rootfactory)
def test_iforbiddenview_override(self):
from repoze.bfg.interfaces import IForbiddenView
def app():
""" """
self.registry.registerUtility(app, IForbiddenView)
router = self._makeOne()
self.assertEqual(router.forbidden_view, app)
def test_iforbiddenview_nooverride(self):
context = DummyContext()
router = self._makeOne()
from repoze.bfg.view import default_forbidden_view
self.assertEqual(router.forbidden_view, default_forbidden_view)
def test_inotfoundview_override(self):
from repoze.bfg.interfaces import INotFoundView
def app():
""" """
self.registry.registerUtility(app, INotFoundView)
router = self._makeOne()
self.assertEqual(router.notfound_view, app)
def test_inotfoundview_nooverride(self):
context = DummyContext()
router = self._makeOne()
from repoze.bfg.view import default_notfound_view
self.assertEqual(router.notfound_view, default_notfound_view)
def test_call_traverser_default(self):
environ = self._makeEnviron()
context = DummyContext()
logger = self._registerLogger()
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
headers = start_response.headers
self.assertEqual(len(headers), 2)
status = start_response.status
self.assertEqual(status, '404 Not Found')
self.failUnless('/' in result[0], result)
self.failIf('debug_notfound' in result[0])
self.assertEqual(len(logger.messages), 0)
def test_call_no_view_registered_no_isettings(self):
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
logger = self._registerLogger()
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
headers = start_response.headers
self.assertEqual(len(headers), 2)
status = start_response.status
self.assertEqual(status, '404 Not Found')
self.failUnless('/' in result[0], result)
self.failIf('debug_notfound' in result[0])
self.assertEqual(len(logger.messages), 0)
def test_call_no_view_registered_debug_notfound_false(self):
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
logger = self._registerLogger()
self._registerSettings(debug_notfound=False)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
headers = start_response.headers
self.assertEqual(len(headers), 2)
status = start_response.status
self.assertEqual(status, '404 Not Found')
self.failUnless('/' in result[0], result)
self.failIf('debug_notfound' in result[0])
self.assertEqual(len(logger.messages), 0)
def test_call_no_view_registered_debug_notfound_true(self):
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
self._registerSettings(debug_notfound=True)
logger = self._registerLogger()
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
headers = start_response.headers
self.assertEqual(len(headers), 2)
status = start_response.status
self.assertEqual(status, '404 Not Found')
self.failUnless(
"debug_notfound of url http://localhost:8080/; path_info: '/', "
"context:" in result[0])
self.failUnless(
"view_name: '', subpath: []" in result[0])
self.failUnless('http://localhost:8080' in result[0], result)
self.assertEqual(len(logger.messages), 1)
message = logger.messages[0]
self.failUnless('of url http://localhost:8080' in message)
self.failUnless("path_info: '/'" in message)
self.failUnless('DummyContext instance at' in message)
self.failUnless("view_name: ''" in message)
self.failUnless("subpath: []" in message)
def test_call_view_returns_nonresponse(self):
context = DummyContext()
self._registerTraverserFactory(context)
environ = self._makeEnviron()
view = DummyView('abc')
self._registerView(view, '', None, None)
router = self._makeOne()
start_response = DummyStartResponse()
self.assertRaises(ValueError, router, environ, start_response)
def test_inotfoundview_returns_nonresponse(self):
from repoze.bfg.interfaces import INotFoundView
context = DummyContext()
environ = self._makeEnviron()
self._registerTraverserFactory(context)
def app(context, request):
""" """
self.registry.registerUtility(app, INotFoundView)
router = self._makeOne()
start_response = DummyStartResponse()
self.assertRaises(ValueError, router, environ, start_response)
def test_call_view_registered_nonspecific_default_path(self):
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
self._registerView(view, '', None, None)
rootfactory = self._registerRootFactory(context)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(result, ['Hello world'])
self.assertEqual(start_response.headers, ())
self.assertEqual(start_response.status, '200 OK')
request = view.request
self.assertEqual(request.view_name, '')
self.assertEqual(request.subpath, [])
self.assertEqual(request.context, context)
self.assertEqual(request.root, context)
def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self):
context = DummyContext()
self._registerTraverserFactory(context, view_name='foo',
subpath=['bar'],
traversed=['context'])
rootfactory = self._registerRootFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
self._registerView(view, 'foo', None, None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(result, ['Hello world'])
self.assertEqual(start_response.headers, ())
self.assertEqual(start_response.status, '200 OK')
request = view.request
self.assertEqual(request.view_name, 'foo')
self.assertEqual(request.subpath, ['bar'])
self.assertEqual(request.context, context)
self.assertEqual(request.root, context)
def test_call_view_registered_specific_success(self):
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context)
rootfactory = self._registerRootFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(result, ['Hello world'])
self.assertEqual(start_response.headers, ())
self.assertEqual(start_response.status, '200 OK')
request = view.request
self.assertEqual(request.view_name, '')
self.assertEqual(request.subpath, [])
self.assertEqual(request.context, context)
self.assertEqual(request.root, context)
def test_call_view_registered_specific_fail(self):
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
pass
class INotContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
context = DummyContext()
directlyProvides(context, INotContext)
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
view = DummyView(response)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '404 Not Found')
self.failUnless('404' in result[0])
def test_call_view_raises_forbidden(self):
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
view = DummyView(response, raise_unauthorized=True)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
router = self._makeOne()
start_response = DummyStartResponse()
response = router(environ, start_response)
self.assertEqual(start_response.status, '401 Unauthorized')
self.assertEqual(environ['repoze.bfg.message'], 'unauthorized')
def test_call_view_raises_notfound(self):
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
view = DummyView(response, raise_notfound=True)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
router = self._makeOne()
start_response = DummyStartResponse()
response = router(environ, start_response)
self.assertEqual(start_response.status, '404 Not Found')
self.assertEqual(environ['repoze.bfg.message'], 'notfound')
def test_call_request_has_global_response_headers(self):
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
pass
from repoze.bfg.interfaces import IRequest
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse('200 OK')
response.headerlist = [('a', 1)]
def view(context, request):
request.global_response_headers = [('b', 2)]
return response
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
router = self._makeOne()
start_response = DummyStartResponse()
router(environ, start_response)
self.assertEqual(start_response.status, '200 OK')
self.assertEqual(start_response.headers, [('a', 1), ('b', 2)])
def test_call_eventsends(self):
from repoze.bfg.interfaces import INewRequest
from repoze.bfg.interfaces import INewResponse
from repoze.bfg.interfaces import IAfterTraversal
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
self._registerView(view, '', None, None)
request_events = self._registerEventListener(INewRequest)
aftertraversal_events = self._registerEventListener(IAfterTraversal)
response_events = self._registerEventListener(INewResponse)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(len(request_events), 1)
self.assertEqual(request_events[0].request.environ, environ)
self.assertEqual(len(aftertraversal_events), 1)
self.assertEqual(aftertraversal_events[0].request.environ, environ)
self.assertEqual(len(response_events), 1)
self.assertEqual(response_events[0].response, response)
def test_call_pushes_and_pops_threadlocal_manager(self):
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron()
self._registerView(view, '', None, None)
router = self._makeOne()
start_response = DummyStartResponse()
router.threadlocal_manager = DummyThreadLocalManager()
result = router(environ, start_response)
self.assertEqual(len(router.threadlocal_manager.pushed), 1)
self.assertEqual(len(router.threadlocal_manager.popped), 1)
def test_call_route_matches_and_has_factory(self):
req_iface = self._registerRouteRequest('foo')
root = object()
def factory(request):
return root
self._connectRoute('archives/:action/:article', 'foo', factory)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
self._registerView(view, '', None, None)
rootfactory = self._registerRootFactory(context)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(result, ['Hello world'])
self.assertEqual(start_response.headers, ())
self.assertEqual(start_response.status, '200 OK')
request = view.request
self.assertEqual(request.view_name, '')
self.assertEqual(request.subpath, [])
self.assertEqual(request.context, context)
self.assertEqual(request.root, root)
routing_args = environ['wsgiorg.routing_args'][1]
self.assertEqual(routing_args['action'], 'action1')
self.assertEqual(routing_args['article'], 'article1')
self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
self.assertEqual(environ['bfg.routes.route'].name, 'foo')
self.assertEqual(request.matchdict, routing_args)
self.failUnless(req_iface.providedBy(request))
def test_call_route_matches_doesnt_overwrite_subscriber_iface(self):
from repoze.bfg.interfaces import INewRequest
from zope.interface import alsoProvides
from zope.interface import Interface
req_iface = self._registerRouteRequest('foo')
class IFoo(Interface):
pass
def listener(event):
alsoProvides(event.request, IFoo)
self.registry.registerHandler(listener, (INewRequest,))
root = object()
def factory(request):
return root
self._connectRoute('archives/:action/:article', 'foo', factory)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
response.app_iter = ['Hello world']
view = DummyView(response)
environ = self._makeEnviron(PATH_INFO='/archives/action1/article1')
self._registerView(view, '', None, None)
rootfactory = self._registerRootFactory(context)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(result, ['Hello world'])
self.assertEqual(start_response.headers, ())
self.assertEqual(start_response.status, '200 OK')
request = view.request
self.assertEqual(request.view_name, '')
self.assertEqual(request.subpath, [])
self.assertEqual(request.context, context)
self.assertEqual(request.root, root)
routing_args = environ['wsgiorg.routing_args'][1]
self.assertEqual(routing_args['action'], 'action1')
self.assertEqual(routing_args['article'], 'article1')
self.assertEqual(environ['bfg.routes.matchdict'], routing_args)
self.assertEqual(environ['bfg.routes.route'].name, 'foo')
self.assertEqual(request.matchdict, routing_args)
self.failUnless(req_iface.providedBy(request))
self.failUnless(IFoo.providedBy(request))
class DummyContext:
pass
class DummyView:
def __init__(self, response, raise_unauthorized=False,
raise_notfound=False):
self.response = response
self.raise_unauthorized = raise_unauthorized
self.raise_notfound = raise_notfound
def __call__(self, context, request):
self.context = context
self.request = request
if self.raise_unauthorized:
from repoze.bfg.exceptions import Forbidden
raise Forbidden('unauthorized')
if self.raise_notfound:
from repoze.bfg.exceptions import NotFound
raise NotFound('notfound')
return self.response
class DummyRootFactory:
def __init__(self, root):
self.root = root
def __call__(self, environ):
return self.root
class DummyStartResponse:
status = ()
headers = ()
def __call__(self, status, headers):
self.status = status
self.headers = headers
class DummyResponse:
headerlist = ()
app_iter = ()
def __init__(self, status='200 OK'):
self.status = status
class DummyThreadLocalManager:
def __init__(self):
self.pushed = []
self.popped = []
def push(self, val):
self.pushed.append(val)
def pop(self):
self.popped.append(True)
class DummyAuthenticationPolicy:
pass
class DummyLogger:
def __init__(self):
self.messages = []
def info(self, msg):
self.messages.append(msg)
warn = info
debug = info