diff options
| -rw-r--r-- | repoze/bfg/interfaces.py | 5 | ||||
| -rw-r--r-- | repoze/bfg/router.py | 19 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_router.py | 51 |
3 files changed, 57 insertions, 18 deletions
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index 803cf237a..b0b769b02 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -11,6 +11,11 @@ class IRootPolicy(Interface): class ITraversalPolicy(Interface): def __call__(environ, root): """ Return a tuple in the form (context, name, subpath) """ + +class ISecurityPolicy(Interface): + def __call__(environ, context, name): + """ Return a WSGI app on unauthorized or None to signify that + the request is allowed to continue """ class ITraverser(Interface): def __init__(context): diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py index 626080bdc..fde50b528 100644 --- a/repoze/bfg/router.py +++ b/repoze/bfg/router.py @@ -8,18 +8,21 @@ from repoze.bfg.interfaces import IWSGIApplicationFactory from repoze.bfg.interfaces import IWebObRequest class Router: - def __init__(self, root_policy, traversal_policy): + def __init__(self, root_policy, traversal_policy, security_policy): self.root_policy = root_policy self.traversal_policy = traversal_policy + self.security_policy = security_policy def __call__(self, environ, start_response): root = self.root_policy(environ) - context, name, subpath = self.traversal_policy(root, environ) - environ['repoze.bfg.subpath'] = subpath - request = Request(environ) - directlyProvides(request, IWebObRequest) - app = queryMultiAdapter((context, request), - IWSGIApplicationFactory, name=name) + context, name, subpath = self.traversal_policy(environ, root) + app = self.security_policy(environ, context, name) if app is None: - app = HTTPNotFound(request.url) + environ['repoze.bfg.subpath'] = subpath + request = Request(environ) + directlyProvides(request, IWebObRequest) + app = queryMultiAdapter((context, request), + IWSGIApplicationFactory, name=name) + if app is None: + app = HTTPNotFound(request.url) return app(environ, start_response) diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py index 8f4b2ff95..c684e06b5 100644 --- a/repoze/bfg/tests/test_router.py +++ b/repoze/bfg/tests/test_router.py @@ -38,13 +38,15 @@ class RouterTests(unittest.TestCase, PlacelessSetup): headerii = [] def rootpolicy(environ): return None - def traversalpolicy(root, environ): + def traversalpolicy(environ, root): return DummyContext(), 'foo', [] + def securitypolicy(environ, context, name): + return None def start_response(status, headers): statii[:] = [status] headerii[:] = [headers] environ = self._makeEnviron() - router = self._makeOne(rootpolicy, traversalpolicy) + router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) result = router(environ, start_response) headers = headerii[0] self.assertEqual(len(headers), 2) @@ -52,17 +54,40 @@ class RouterTests(unittest.TestCase, PlacelessSetup): self.assertEqual(status, '404 Not Found') self.failUnless('http://localhost:8080' in result[0], result) + def test_call_securitypolicy_denies(self): + statii = [] + headerii = [] + def rootpolicy(environ): + return None + def traversalpolicy(environ, root): + return DummyContext(), 'foo', [] + def securitypolicy(environ, context, name): + from webob.exc import HTTPUnauthorized + return HTTPUnauthorized() + def start_response(status, headers): + statii[:] = [status] + headerii[:] = [headers] + environ = self._makeEnviron() + router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) + result = router(environ, start_response) + headers = headerii[0] + self.assertEqual(len(headers), 2) + status = statii[0] + self.assertEqual(status, '401 Unauthorized') + def test_call_app_registered_nonspecific_default_path(self): def rootpolicy(environ): return None context = DummyContext() - def traversalpolicy(root, environ): + def traversalpolicy(environ, root): return context, '', [] def start_response(status, headers): pass + def securitypolicy(environ, context, name): + return None environ = self._makeEnviron() self._registerFactory(DummyWSGIApplicationFactory, '', None, None) - router = self._makeOne(rootpolicy, traversalpolicy) + router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) result = router(environ, start_response) self.failUnless(result[0] is context) import webob @@ -73,13 +98,15 @@ class RouterTests(unittest.TestCase, PlacelessSetup): def rootpolicy(environ): return None context = DummyContext() - def traversalpolicy(root, environ): + def traversalpolicy(environ, root): return context, 'foo', ['bar', 'baz'] def start_response(status, headers): pass + def securitypolicy(environ, context, name): + return None environ = self._makeEnviron() self._registerFactory(DummyWSGIApplicationFactory, 'foo', None, None) - router = self._makeOne(rootpolicy, traversalpolicy) + router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) result = router(environ, start_response) self.failUnless(result[0] is context) import webob @@ -95,15 +122,17 @@ class RouterTests(unittest.TestCase, PlacelessSetup): class IContext(Interface): pass directlyProvides(context, IContext) - def traversalpolicy(root, environ): + def traversalpolicy(environ, root): return context, 'foo', ['bar', 'baz'] def start_response(status, headers): pass + def securitypolicy(environ, context, name): + return None environ = self._makeEnviron() from repoze.bfg.interfaces import IWebObRequest self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext, IWebObRequest) - router = self._makeOne(rootpolicy, traversalpolicy) + router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) result = router(environ, start_response) self.failUnless(result[0] is context) import webob @@ -123,16 +152,18 @@ class RouterTests(unittest.TestCase, PlacelessSetup): headerii = [] def rootpolicy(environ): return None - def traversalpolicy(root, environ): + def traversalpolicy(environ, root): return context, 'foo', [] def start_response(status, headers): statii[:] = [status] headerii[:] = [headers] + def securitypolicy(environ, context, name): + return None environ = self._makeEnviron() from repoze.bfg.interfaces import IWebObRequest self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext, IWebObRequest) - router = self._makeOne(rootpolicy, traversalpolicy) + router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) result = router(environ, start_response) headers = headerii[0] self.assertEqual(len(headers), 2) |
