summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2008-07-11 23:30:35 +0000
committerChris McDonough <chrism@agendaless.com>2008-07-11 23:30:35 +0000
commit2d74688f6564c325077044c4b870c6f966baad91 (patch)
treea536611d8ad9a816afd139b1f140bde34e61647a
parent3011338a75fe905a150ab93c97830b39c55b4ca1 (diff)
downloadpyramid-2d74688f6564c325077044c4b870c6f966baad91.tar.gz
pyramid-2d74688f6564c325077044c4b870c6f966baad91.tar.bz2
pyramid-2d74688f6564c325077044c4b870c6f966baad91.zip
Add security policy checks.
-rw-r--r--repoze/bfg/configure.zcml2
-rw-r--r--repoze/bfg/interfaces.py3
-rw-r--r--repoze/bfg/router.py4
-rw-r--r--repoze/bfg/tests/test_router.py18
-rw-r--r--repoze/bfg/tests/test_wsgiadapter.py75
-rw-r--r--repoze/bfg/wsgiadapter.py30
6 files changed, 113 insertions, 19 deletions
diff --git a/repoze/bfg/configure.zcml b/repoze/bfg/configure.zcml
index 15a49b2c0..0223e25ba 100644
--- a/repoze/bfg/configure.zcml
+++ b/repoze/bfg/configure.zcml
@@ -18,7 +18,7 @@
<adapter
factory=".wsgiadapter.NaiveWSGIViewAdapter"
provides=".interfaces.IWSGIApplicationFactory"
- for="* *"
+ for="* * *"
/>
<include file="meta.zcml" />
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index a0409ba89..b4c222418 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -36,10 +36,11 @@ class IWSGIApplication(Interface):
""" A PEP 333 application """
class IWSGIApplicationFactory(Interface):
- def __call__(view, request):
+ def __call__(context, request, view):
""" Return an object that implements IWSGIApplication """
class ILocation(Interface):
"""Objects that have a structural location"""
__parent__ = Attribute("The parent in the location hierarchy")
__name__ = Attribute("The name of the object")
+
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 3e51832d5..bd7df2c94 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -33,12 +33,14 @@ class Router:
app = HTTPFound(add_slash=True)
else:
request.subpath = subpath
+ request.view_name = name
app = queryMultiAdapter((context, request), IViewFactory, name=name,
default=_marker)
if app is _marker:
app = HTTPNotFound(request.url)
else:
- app = getMultiAdapter((app, request), IWSGIApplicationFactory)
+ app = getMultiAdapter((context, request, app),
+ IWSGIApplicationFactory)
return app(environ, start_response)
def make_app(root_policy, package=None, default_redirects=True,
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index 25d3bfdcf..5d866523e 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -70,7 +70,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
environ = self._makeEnviron(PATH_INFO='/doesnt/end/in/slash')
self._registerTraverserFactory(traversalfactory, '', None, None)
self._registerViewFactory(viewfactory, '', None, None)
- self._registerWSGIFactory(wsgifactory, '', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
router = self._makeOne(rootpolicy)
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -94,7 +94,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
environ = self._makeEnviron()
self._registerTraverserFactory(traversalfactory, '', None, None)
self._registerViewFactory(viewfactory, '', None, None)
- self._registerWSGIFactory(wsgifactory, '', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
router = self._makeOne(rootpolicy)
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -115,7 +115,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
environ = self._makeEnviron()
self._registerTraverserFactory(traversalfactory, '', None, None)
self._registerViewFactory(viewfactory, 'foo', None, None)
- self._registerWSGIFactory(wsgifactory, '', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
router = self._makeOne(rootpolicy)
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -142,7 +142,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
environ = self._makeEnviron()
self._registerTraverserFactory(traversalfactory, '', None, None)
self._registerViewFactory(viewfactory, '', IContext, IRequest)
- self._registerWSGIFactory(wsgifactory, '', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
router = self._makeOne(rootpolicy)
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -171,7 +171,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
environ = self._makeEnviron()
self._registerTraverserFactory(traversalfactory, '', None, None)
self._registerViewFactory(viewfactory, '', IContext, IRequest)
- self._registerWSGIFactory(wsgifactory, '', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
router = self._makeOne(rootpolicy)
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -183,13 +183,15 @@ class DummyContext:
def make_wsgi_factory(status, headers, app_iter):
class DummyWSGIApplicationFactory:
- def __init__(self, view, request):
- self.view = view
+ def __init__(self, context, request, view):
+ self.context = context
self.request = request
+ self.view = view
def __call__(self, environ, start_response):
- environ['view'] = self.view
+ environ['context'] = self.context
environ['request'] = self.request
+ environ['view'] = self.view
start_response(status, headers)
return app_iter
diff --git a/repoze/bfg/tests/test_wsgiadapter.py b/repoze/bfg/tests/test_wsgiadapter.py
index 71e2f5c81..db07ab071 100644
--- a/repoze/bfg/tests/test_wsgiadapter.py
+++ b/repoze/bfg/tests/test_wsgiadapter.py
@@ -1,6 +1,14 @@
import unittest
-class NaiveWSGIAdapterTests(unittest.TestCase):
+from zope.component.testing import PlacelessSetup
+
+class NaiveWSGIAdapterTests(unittest.TestCase, PlacelessSetup):
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+
+ def tearDown(self):
+ PlacelessSetup.tearDown(self)
+
def _getTargetClass(self):
from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter
return NaiveWSGIViewAdapter
@@ -15,7 +23,8 @@ class NaiveWSGIAdapterTests(unittest.TestCase):
def view():
return response
request = DummyRequest()
- adapter = self._makeOne(view, request)
+ context = DummyContext()
+ adapter = self._makeOne(context, request, view)
environ = {}
start_response = DummyStartResponse()
result = adapter(environ, start_response)
@@ -31,7 +40,8 @@ class NaiveWSGIAdapterTests(unittest.TestCase):
response.start_response = start_response
return response
request = DummyRequest()
- adapter = self._makeOne(view, request)
+ context = DummyContext()
+ adapter = self._makeOne(context, request, view)
environ = {}
start_response = DummyStartResponse()
result = adapter(environ, start_response)
@@ -48,7 +58,61 @@ class NaiveWSGIAdapterTests(unittest.TestCase):
def view(request):
response.request = request
return response
- adapter = self._makeOne(view, request)
+ context = DummyContext()
+ adapter = self._makeOne(context, request, view)
+ environ = {}
+ start_response = DummyStartResponse()
+ result = adapter(environ, start_response)
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(response.request, request)
+
+ def test_view_fails_security_policy(self):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.wsgiadapter import IViewSecurityPolicy
+ def failed(context, request):
+ def view():
+ response = DummyResponse()
+ response.app_iter = ['failed']
+ response.status = '401 Unauthorized'
+ response.headerlist = ()
+ return response
+ return view
+ gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy)
+ request = DummyRequest()
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ def view(request):
+ response.request = request
+ return response
+ context = DummyContext()
+ adapter = self._makeOne(context, request, view)
+ environ = {}
+ start_response = DummyStartResponse()
+ result = adapter(environ, start_response)
+ self.assertEqual(result, ['failed'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '401 Unauthorized')
+
+ def test_view_passes_security_policy(self):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.wsgiadapter import IViewSecurityPolicy
+ def failed(context, request):
+ def view():
+ return None
+ return view
+ gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy)
+ request = DummyRequest()
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ def view(request):
+ response.request = request
+ return response
+ context = DummyContext()
+ adapter = self._makeOne(context, request, view)
environ = {}
start_response = DummyStartResponse()
result = adapter(environ, start_response)
@@ -57,6 +121,9 @@ class NaiveWSGIAdapterTests(unittest.TestCase):
self.assertEqual(start_response.status, '200 OK')
self.assertEqual(response.request, request)
+class DummyContext:
+ pass
+
class DummyRequest:
pass
diff --git a/repoze/bfg/wsgiadapter.py b/repoze/bfg/wsgiadapter.py
index f76360c27..16effe1a5 100644
--- a/repoze/bfg/wsgiadapter.py
+++ b/repoze/bfg/wsgiadapter.py
@@ -1,19 +1,40 @@
-from zope.interface import implements
+from zope.component import queryMultiAdapter
from zope.interface import classProvides
+from zope.interface import implements
+from zope.interface import Interface
from repoze.bfg.interfaces import IWSGIApplicationFactory
from repoze.bfg.interfaces import IWSGIApplication
from repoze.bfg.mapply import mapply
+class IViewSecurityPolicy(Interface):
+ """ Marker interface for a view security policy; a view security
+ policy. """
+ def __call__():
+ """ Return None if the security check succeeded,
+ otherwise it should return a WSGI application representing an
+ unauthorized view"""
+
class NaiveWSGIViewAdapter:
classProvides(IWSGIApplicationFactory)
implements(IWSGIApplication)
- def __init__(self, view, request):
- self.view = view
+ def __init__(self, context, request, view):
+ self.context = context
self.request = request
+ self.view = view
def __call__(self, environ, start_response):
+ context = self.context
+ request = self.request
+ view = self.view
+ security_policy = queryMultiAdapter((context, request),
+ IViewSecurityPolicy)
+ if security_policy:
+ failed_view = security_policy()
+ if failed_view:
+ view = failed_view
+
catch_response = []
def replace_start_response(status, headers):
catch_response[:] = (status, headers)
@@ -22,7 +43,8 @@ class NaiveWSGIViewAdapter:
'environ':environ,
'start_response':start_response,
}
- response = mapply(self.view, positional = (), keyword = kwdict)
+
+ response = mapply(view, positional = (), keyword = kwdict)
if not catch_response:
catch_response = (response.status, response.headerlist)
start_response(*catch_response)