summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--repoze/bfg/interfaces.py5
-rw-r--r--repoze/bfg/router.py19
-rw-r--r--repoze/bfg/tests/test_router.py51
3 files changed, 57 insertions, 18 deletions
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index 803cf237a..b0b769b02 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -11,6 +11,11 @@ class IRootPolicy(Interface):
class ITraversalPolicy(Interface):
def __call__(environ, root):
""" Return a tuple in the form (context, name, subpath) """
+
+class ISecurityPolicy(Interface):
+ def __call__(environ, context, name):
+ """ Return a WSGI app on unauthorized or None to signify that
+ the request is allowed to continue """
class ITraverser(Interface):
def __init__(context):
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 626080bdc..fde50b528 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -8,18 +8,21 @@ from repoze.bfg.interfaces import IWSGIApplicationFactory
from repoze.bfg.interfaces import IWebObRequest
class Router:
- def __init__(self, root_policy, traversal_policy):
+ def __init__(self, root_policy, traversal_policy, security_policy):
self.root_policy = root_policy
self.traversal_policy = traversal_policy
+ self.security_policy = security_policy
def __call__(self, environ, start_response):
root = self.root_policy(environ)
- context, name, subpath = self.traversal_policy(root, environ)
- environ['repoze.bfg.subpath'] = subpath
- request = Request(environ)
- directlyProvides(request, IWebObRequest)
- app = queryMultiAdapter((context, request),
- IWSGIApplicationFactory, name=name)
+ context, name, subpath = self.traversal_policy(environ, root)
+ app = self.security_policy(environ, context, name)
if app is None:
- app = HTTPNotFound(request.url)
+ environ['repoze.bfg.subpath'] = subpath
+ request = Request(environ)
+ directlyProvides(request, IWebObRequest)
+ app = queryMultiAdapter((context, request),
+ IWSGIApplicationFactory, name=name)
+ if app is None:
+ app = HTTPNotFound(request.url)
return app(environ, start_response)
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index 8f4b2ff95..c684e06b5 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -38,13 +38,15 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
headerii = []
def rootpolicy(environ):
return None
- def traversalpolicy(root, environ):
+ def traversalpolicy(environ, root):
return DummyContext(), 'foo', []
+ def securitypolicy(environ, context, name):
+ return None
def start_response(status, headers):
statii[:] = [status]
headerii[:] = [headers]
environ = self._makeEnviron()
- router = self._makeOne(rootpolicy, traversalpolicy)
+ router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
result = router(environ, start_response)
headers = headerii[0]
self.assertEqual(len(headers), 2)
@@ -52,17 +54,40 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
self.assertEqual(status, '404 Not Found')
self.failUnless('http://localhost:8080' in result[0], result)
+ def test_call_securitypolicy_denies(self):
+ statii = []
+ headerii = []
+ def rootpolicy(environ):
+ return None
+ def traversalpolicy(environ, root):
+ return DummyContext(), 'foo', []
+ def securitypolicy(environ, context, name):
+ from webob.exc import HTTPUnauthorized
+ return HTTPUnauthorized()
+ def start_response(status, headers):
+ statii[:] = [status]
+ headerii[:] = [headers]
+ environ = self._makeEnviron()
+ router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+ result = router(environ, start_response)
+ headers = headerii[0]
+ self.assertEqual(len(headers), 2)
+ status = statii[0]
+ self.assertEqual(status, '401 Unauthorized')
+
def test_call_app_registered_nonspecific_default_path(self):
def rootpolicy(environ):
return None
context = DummyContext()
- def traversalpolicy(root, environ):
+ def traversalpolicy(environ, root):
return context, '', []
def start_response(status, headers):
pass
+ def securitypolicy(environ, context, name):
+ return None
environ = self._makeEnviron()
self._registerFactory(DummyWSGIApplicationFactory, '', None, None)
- router = self._makeOne(rootpolicy, traversalpolicy)
+ router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
result = router(environ, start_response)
self.failUnless(result[0] is context)
import webob
@@ -73,13 +98,15 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
def rootpolicy(environ):
return None
context = DummyContext()
- def traversalpolicy(root, environ):
+ def traversalpolicy(environ, root):
return context, 'foo', ['bar', 'baz']
def start_response(status, headers):
pass
+ def securitypolicy(environ, context, name):
+ return None
environ = self._makeEnviron()
self._registerFactory(DummyWSGIApplicationFactory, 'foo', None, None)
- router = self._makeOne(rootpolicy, traversalpolicy)
+ router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
result = router(environ, start_response)
self.failUnless(result[0] is context)
import webob
@@ -95,15 +122,17 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
class IContext(Interface):
pass
directlyProvides(context, IContext)
- def traversalpolicy(root, environ):
+ def traversalpolicy(environ, root):
return context, 'foo', ['bar', 'baz']
def start_response(status, headers):
pass
+ def securitypolicy(environ, context, name):
+ return None
environ = self._makeEnviron()
from repoze.bfg.interfaces import IWebObRequest
self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
IWebObRequest)
- router = self._makeOne(rootpolicy, traversalpolicy)
+ router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
result = router(environ, start_response)
self.failUnless(result[0] is context)
import webob
@@ -123,16 +152,18 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
headerii = []
def rootpolicy(environ):
return None
- def traversalpolicy(root, environ):
+ def traversalpolicy(environ, root):
return context, 'foo', []
def start_response(status, headers):
statii[:] = [status]
headerii[:] = [headers]
+ def securitypolicy(environ, context, name):
+ return None
environ = self._makeEnviron()
from repoze.bfg.interfaces import IWebObRequest
self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
IWebObRequest)
- router = self._makeOne(rootpolicy, traversalpolicy)
+ router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
result = router(environ, start_response)
headers = headerii[0]
self.assertEqual(len(headers), 2)