diff options
| author | Chris McDonough <chrism@agendaless.com> | 2008-07-11 23:30:35 +0000 |
|---|---|---|
| committer | Chris McDonough <chrism@agendaless.com> | 2008-07-11 23:30:35 +0000 |
| commit | 2d74688f6564c325077044c4b870c6f966baad91 (patch) | |
| tree | a536611d8ad9a816afd139b1f140bde34e61647a | |
| parent | 3011338a75fe905a150ab93c97830b39c55b4ca1 (diff) | |
| download | pyramid-2d74688f6564c325077044c4b870c6f966baad91.tar.gz pyramid-2d74688f6564c325077044c4b870c6f966baad91.tar.bz2 pyramid-2d74688f6564c325077044c4b870c6f966baad91.zip | |
Add security policy checks.
| -rw-r--r-- | repoze/bfg/configure.zcml | 2 | ||||
| -rw-r--r-- | repoze/bfg/interfaces.py | 3 | ||||
| -rw-r--r-- | repoze/bfg/router.py | 4 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_router.py | 18 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_wsgiadapter.py | 75 | ||||
| -rw-r--r-- | repoze/bfg/wsgiadapter.py | 30 |
6 files changed, 113 insertions, 19 deletions
diff --git a/repoze/bfg/configure.zcml b/repoze/bfg/configure.zcml index 15a49b2c0..0223e25ba 100644 --- a/repoze/bfg/configure.zcml +++ b/repoze/bfg/configure.zcml @@ -18,7 +18,7 @@ <adapter factory=".wsgiadapter.NaiveWSGIViewAdapter" provides=".interfaces.IWSGIApplicationFactory" - for="* *" + for="* * *" /> <include file="meta.zcml" /> diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index a0409ba89..b4c222418 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -36,10 +36,11 @@ class IWSGIApplication(Interface): """ A PEP 333 application """ class IWSGIApplicationFactory(Interface): - def __call__(view, request): + def __call__(context, request, view): """ Return an object that implements IWSGIApplication """ class ILocation(Interface): """Objects that have a structural location""" __parent__ = Attribute("The parent in the location hierarchy") __name__ = Attribute("The name of the object") + diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py index 3e51832d5..bd7df2c94 100644 --- a/repoze/bfg/router.py +++ b/repoze/bfg/router.py @@ -33,12 +33,14 @@ class Router: app = HTTPFound(add_slash=True) else: request.subpath = subpath + request.view_name = name app = queryMultiAdapter((context, request), IViewFactory, name=name, default=_marker) if app is _marker: app = HTTPNotFound(request.url) else: - app = getMultiAdapter((app, request), IWSGIApplicationFactory) + app = getMultiAdapter((context, request, app), + IWSGIApplicationFactory) return app(environ, start_response) def make_app(root_policy, package=None, default_redirects=True, diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py index 25d3bfdcf..5d866523e 100644 --- a/repoze/bfg/tests/test_router.py +++ b/repoze/bfg/tests/test_router.py @@ -70,7 +70,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup): environ = self._makeEnviron(PATH_INFO='/doesnt/end/in/slash') self._registerTraverserFactory(traversalfactory, '', None, None) self._registerViewFactory(viewfactory, '', None, None) - self._registerWSGIFactory(wsgifactory, '', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None, None) router = self._makeOne(rootpolicy) start_response = DummyStartResponse() result = router(environ, start_response) @@ -94,7 +94,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup): environ = self._makeEnviron() self._registerTraverserFactory(traversalfactory, '', None, None) self._registerViewFactory(viewfactory, '', None, None) - self._registerWSGIFactory(wsgifactory, '', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None, None) router = self._makeOne(rootpolicy) start_response = DummyStartResponse() result = router(environ, start_response) @@ -115,7 +115,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup): environ = self._makeEnviron() self._registerTraverserFactory(traversalfactory, '', None, None) self._registerViewFactory(viewfactory, 'foo', None, None) - self._registerWSGIFactory(wsgifactory, '', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None, None) router = self._makeOne(rootpolicy) start_response = DummyStartResponse() result = router(environ, start_response) @@ -142,7 +142,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup): environ = self._makeEnviron() self._registerTraverserFactory(traversalfactory, '', None, None) self._registerViewFactory(viewfactory, '', IContext, IRequest) - self._registerWSGIFactory(wsgifactory, '', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None, None) router = self._makeOne(rootpolicy) start_response = DummyStartResponse() result = router(environ, start_response) @@ -171,7 +171,7 @@ class RouterTests(unittest.TestCase, PlacelessSetup): environ = self._makeEnviron() self._registerTraverserFactory(traversalfactory, '', None, None) self._registerViewFactory(viewfactory, '', IContext, IRequest) - self._registerWSGIFactory(wsgifactory, '', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None, None) router = self._makeOne(rootpolicy) start_response = DummyStartResponse() result = router(environ, start_response) @@ -183,13 +183,15 @@ class DummyContext: def make_wsgi_factory(status, headers, app_iter): class DummyWSGIApplicationFactory: - def __init__(self, view, request): - self.view = view + def __init__(self, context, request, view): + self.context = context self.request = request + self.view = view def __call__(self, environ, start_response): - environ['view'] = self.view + environ['context'] = self.context environ['request'] = self.request + environ['view'] = self.view start_response(status, headers) return app_iter diff --git a/repoze/bfg/tests/test_wsgiadapter.py b/repoze/bfg/tests/test_wsgiadapter.py index 71e2f5c81..db07ab071 100644 --- a/repoze/bfg/tests/test_wsgiadapter.py +++ b/repoze/bfg/tests/test_wsgiadapter.py @@ -1,6 +1,14 @@ import unittest -class NaiveWSGIAdapterTests(unittest.TestCase): +from zope.component.testing import PlacelessSetup + +class NaiveWSGIAdapterTests(unittest.TestCase, PlacelessSetup): + def setUp(self): + PlacelessSetup.setUp(self) + + def tearDown(self): + PlacelessSetup.tearDown(self) + def _getTargetClass(self): from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter return NaiveWSGIViewAdapter @@ -15,7 +23,8 @@ class NaiveWSGIAdapterTests(unittest.TestCase): def view(): return response request = DummyRequest() - adapter = self._makeOne(view, request) + context = DummyContext() + adapter = self._makeOne(context, request, view) environ = {} start_response = DummyStartResponse() result = adapter(environ, start_response) @@ -31,7 +40,8 @@ class NaiveWSGIAdapterTests(unittest.TestCase): response.start_response = start_response return response request = DummyRequest() - adapter = self._makeOne(view, request) + context = DummyContext() + adapter = self._makeOne(context, request, view) environ = {} start_response = DummyStartResponse() result = adapter(environ, start_response) @@ -48,7 +58,61 @@ class NaiveWSGIAdapterTests(unittest.TestCase): def view(request): response.request = request return response - adapter = self._makeOne(view, request) + context = DummyContext() + adapter = self._makeOne(context, request, view) + environ = {} + start_response = DummyStartResponse() + result = adapter(environ, start_response) + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(response.request, request) + + def test_view_fails_security_policy(self): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.wsgiadapter import IViewSecurityPolicy + def failed(context, request): + def view(): + response = DummyResponse() + response.app_iter = ['failed'] + response.status = '401 Unauthorized' + response.headerlist = () + return response + return view + gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy) + request = DummyRequest() + response = DummyResponse() + response.app_iter = ['Hello world'] + def view(request): + response.request = request + return response + context = DummyContext() + adapter = self._makeOne(context, request, view) + environ = {} + start_response = DummyStartResponse() + result = adapter(environ, start_response) + self.assertEqual(result, ['failed']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '401 Unauthorized') + + def test_view_passes_security_policy(self): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.wsgiadapter import IViewSecurityPolicy + def failed(context, request): + def view(): + return None + return view + gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy) + request = DummyRequest() + response = DummyResponse() + response.app_iter = ['Hello world'] + def view(request): + response.request = request + return response + context = DummyContext() + adapter = self._makeOne(context, request, view) environ = {} start_response = DummyStartResponse() result = adapter(environ, start_response) @@ -57,6 +121,9 @@ class NaiveWSGIAdapterTests(unittest.TestCase): self.assertEqual(start_response.status, '200 OK') self.assertEqual(response.request, request) +class DummyContext: + pass + class DummyRequest: pass diff --git a/repoze/bfg/wsgiadapter.py b/repoze/bfg/wsgiadapter.py index f76360c27..16effe1a5 100644 --- a/repoze/bfg/wsgiadapter.py +++ b/repoze/bfg/wsgiadapter.py @@ -1,19 +1,40 @@ -from zope.interface import implements +from zope.component import queryMultiAdapter from zope.interface import classProvides +from zope.interface import implements +from zope.interface import Interface from repoze.bfg.interfaces import IWSGIApplicationFactory from repoze.bfg.interfaces import IWSGIApplication from repoze.bfg.mapply import mapply +class IViewSecurityPolicy(Interface): + """ Marker interface for a view security policy; a view security + policy. """ + def __call__(): + """ Return None if the security check succeeded, + otherwise it should return a WSGI application representing an + unauthorized view""" + class NaiveWSGIViewAdapter: classProvides(IWSGIApplicationFactory) implements(IWSGIApplication) - def __init__(self, view, request): - self.view = view + def __init__(self, context, request, view): + self.context = context self.request = request + self.view = view def __call__(self, environ, start_response): + context = self.context + request = self.request + view = self.view + security_policy = queryMultiAdapter((context, request), + IViewSecurityPolicy) + if security_policy: + failed_view = security_policy() + if failed_view: + view = failed_view + catch_response = [] def replace_start_response(status, headers): catch_response[:] = (status, headers) @@ -22,7 +43,8 @@ class NaiveWSGIViewAdapter: 'environ':environ, 'start_response':start_response, } - response = mapply(self.view, positional = (), keyword = kwdict) + + response = mapply(view, positional = (), keyword = kwdict) if not catch_response: catch_response = (response.status, response.headerlist) start_response(*catch_response) |
