diff options
| author | Chris McDonough <chrism@agendaless.com> | 2008-07-16 10:32:08 +0000 |
|---|---|---|
| committer | Chris McDonough <chrism@agendaless.com> | 2008-07-16 10:32:08 +0000 |
| commit | 2466f6eaa2246598dc6cb3c962364773eb4cc64a (patch) | |
| tree | 80954892ad8e12cffb534f3ae92cd321d4d870f5 | |
| parent | 23aa82c4963dc75737d7dc8a84d7639775c3b282 (diff) | |
| download | pyramid-2466f6eaa2246598dc6cb3c962364773eb4cc64a.tar.gz pyramid-2466f6eaa2246598dc6cb3c962364773eb4cc64a.tar.bz2 pyramid-2466f6eaa2246598dc6cb3c962364773eb4cc64a.zip | |
Add security.
| -rw-r--r-- | repoze/bfg/configure.zcml | 6 | ||||
| -rw-r--r-- | repoze/bfg/interfaces.py | 19 | ||||
| -rw-r--r-- | repoze/bfg/router.py | 25 | ||||
| -rw-r--r-- | repoze/bfg/sampleapp/configure.zcml | 13 | ||||
| -rw-r--r-- | repoze/bfg/sampleapp/models.py | 25 | ||||
| -rw-r--r-- | repoze/bfg/sampleapp/run.py | 11 | ||||
| -rw-r--r-- | repoze/bfg/sampleapp/views.py | 2 | ||||
| -rw-r--r-- | repoze/bfg/security.py | 150 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_router.py | 111 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_security.py | 362 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_wsgiadapter.py | 52 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_zcml.py | 38 | ||||
| -rw-r--r-- | repoze/bfg/wsgiadapter.py | 16 | ||||
| -rw-r--r-- | repoze/bfg/zcml.py | 28 |
14 files changed, 741 insertions, 117 deletions
diff --git a/repoze/bfg/configure.zcml b/repoze/bfg/configure.zcml index 0223e25ba..c518adc85 100644 --- a/repoze/bfg/configure.zcml +++ b/repoze/bfg/configure.zcml @@ -2,12 +2,6 @@ i18n_domain="repoze.bfg"> <include package="z3c.pt" /> - <include package="zope.security" file="meta.zcml"/> - - <permission - id="repoze.view" - title="View" - /> <adapter factory=".traversal.NaivePublishTraverser" diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index 005d30027..f51e23cbf 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -43,3 +43,22 @@ class ITemplateFactory(Interface): def __call__(template_path): """ Return an IView given a template path """ +class ISecurityPolicy(Interface): + """ A utility that provides a mechanism to check authorization + using authentication data """ + def permits(context, request, permission): + """ Returns True if the combination of the authorization + information in the context and the authentication data in + the request allow the action implied by the permission """ + +class NoAuthorizationInformation(Exception): + pass + +class IViewPermission(Interface): + def __call__(security_policy): + """ Return True if the permission allows, return False if it denies. """ + +class IViewPermissionFactory(Interface): + def __call__(context, request): + """ Return an IViewPermission """ + diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py index 00966dfd1..becf3e3e4 100644 --- a/repoze/bfg/router.py +++ b/repoze/bfg/router.py @@ -1,19 +1,21 @@ from zope.component import getMultiAdapter from zope.component import queryMultiAdapter +from zope.component import queryUtility from zope.interface import directlyProvides from webob import Request from webob.exc import HTTPNotFound +from webob.exc import HTTPUnauthorized from repoze.bfg.interfaces import IPublishTraverserFactory from repoze.bfg.interfaces import IViewFactory +from repoze.bfg.interfaces import IViewPermission +from repoze.bfg.interfaces import ISecurityPolicy from repoze.bfg.interfaces import IWSGIApplicationFactory from repoze.bfg.interfaces import IRequest from repoze.bfg.registry import registry_manager -_marker = () - class Router: def __init__(self, root_policy, registry): self.root_policy = root_policy @@ -25,14 +27,23 @@ class Router: directlyProvides(request, IRequest) root = self.root_policy(environ) path = environ.get('PATH_INFO', '/') - traverser = getMultiAdapter((root, request), - IPublishTraverserFactory) + traverser = getMultiAdapter((root, request), IPublishTraverserFactory) context, name, subpath = traverser(path) request.subpath = subpath request.view_name = name - app = queryMultiAdapter((context, request), IViewFactory, name=name, - default=_marker) - if app is _marker: + + security_policy = queryUtility(ISecurityPolicy) + if security_policy: + permission = queryMultiAdapter((context, request), IViewPermission, + name=name) + if permission is not None: + if not permission(security_policy): + app = HTTPUnauthorized() + app.explanation = repr(permission) + return app(environ, start_response) + + app = queryMultiAdapter((context, request), IViewFactory, name=name) + if app is None: app = HTTPNotFound(request.url) else: app = getMultiAdapter((context, request, app), diff --git a/repoze/bfg/sampleapp/configure.zcml b/repoze/bfg/sampleapp/configure.zcml index 374a93090..bd1b46d6a 100644 --- a/repoze/bfg/sampleapp/configure.zcml +++ b/repoze/bfg/sampleapp/configure.zcml @@ -4,18 +4,23 @@ <include package="repoze.bfg" /> + <utility + provides="repoze.bfg.interfaces.ISecurityPolicy" + factory="repoze.bfg.security.RemoteUserACLSecurityPolicy" + /> + <!-- the default view for a Blog --> <bfg:view for=".models.IBlog" factory=".views.blog_default_view" - permission="repoze.view" + permission="view" /> <!-- the default view for a BlogEntry --> <bfg:view for=".models.IBlogEntry" factory=".views.blog_entry_default_view" - permission="repoze.view" + permission="view" /> <!-- the add view for a BlogEntry --> @@ -23,7 +28,7 @@ for=".models.IBlog" factory=".views.blog_entry_add_view" name="add_entry.html" - permission="repoze.view" + permission="add" /> <!-- the contents view for any mapping (shows dict members) --> @@ -31,7 +36,7 @@ for=".models.IMapping" factory=".views.contents_view" name="contents.html" - permission="repoze.view" + permission="manage" /> </configure> diff --git a/repoze/bfg/sampleapp/models.py b/repoze/bfg/sampleapp/models.py index 975d79142..d07110e83 100644 --- a/repoze/bfg/sampleapp/models.py +++ b/repoze/bfg/sampleapp/models.py @@ -1,5 +1,10 @@ from zope.interface import Interface from zope.interface import implements +from zope.location.interfaces import ILocation +from zope.location.location import Location + +from repoze.bfg.security import Everyone +from repoze.bfg.security import Allow import datetime @@ -9,20 +14,26 @@ class IMapping(Interface): class IBlog(Interface): pass -class Blog(dict): - implements(IBlog, IMapping) - def __init__(self, name): - self.__name__ = name - dict.__init__(self) +class Blog(dict, Location): + __acl__ = [ (Allow, Everyone, 'view'), (Allow, 'group:editors', 'add'), + (Allow, 'group:managers', 'manage') ] + implements(IBlog, IMapping, ILocation) class IBlogEntry(Interface): pass class BlogEntry(object): implements(IBlogEntry) - def __init__(self, name, title, body, author): - self.__name__ = name + def __init__(self, title, body, author): self.title = title self.body = body self.author = author self.created = datetime.datetime.now() + +blog = Blog() +blog['sample'] = BlogEntry('Sample Blog Entry', + '<p>This is a sample blog entry</p>', + 'chrism') +def get_root(environ): + return blog + diff --git a/repoze/bfg/sampleapp/run.py b/repoze/bfg/sampleapp/run.py index c6fbeed78..6c36c430e 100644 --- a/repoze/bfg/sampleapp/run.py +++ b/repoze/bfg/sampleapp/run.py @@ -1,17 +1,8 @@ from repoze.bfg import make_app from repoze.bfg import sampleapp - -from repoze.bfg.sampleapp.models import Blog -from repoze.bfg.sampleapp.models import BlogEntry +from repoze.bfg.sampleapp.models import get_root def main(): - blog = Blog('Sample blog') - blog['sample'] = BlogEntry('sample', 'Sample Blog Entry', - '<p>This is a sample blog entry</p>', - 'chrism') - def get_root(environ): - return blog - app = make_app(get_root, sampleapp) from paste import httpserver httpserver.serve(app, host='0.0.0.0', port='5432') diff --git a/repoze/bfg/sampleapp/views.py b/repoze/bfg/sampleapp/views.py index 41ab69061..ce591dec2 100644 --- a/repoze/bfg/sampleapp/views.py +++ b/repoze/bfg/sampleapp/views.py @@ -63,8 +63,8 @@ def blog_entry_add_view(context, request): author = form['author'] body = form['body'] title = form['title'] + new_entry = BlogEntry(title, body, author) name = str(time.time()) - new_entry = BlogEntry(name, title, body, author) context[name] = new_entry return HTTPFound(location='/') diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py new file mode 100644 index 000000000..6f2c858e3 --- /dev/null +++ b/repoze/bfg/security.py @@ -0,0 +1,150 @@ +from zope.interface import implements +from zope.location.location import LocationIterator + +from repoze.bfg.interfaces import ISecurityPolicy +from repoze.bfg.interfaces import IViewPermission +from repoze.bfg.interfaces import IViewPermissionFactory +from repoze.bfg.interfaces import NoAuthorizationInformation + +Everyone = 'system.Everyone' +Authenticated = 'system.Authenticated' +Allow = 'Allow' +Deny = 'Deny' + +class ACLAuthorizer(object): + + def __init__(self, context, logger=None): + self.context = context + self.logger = logger + + def get_acl(self, default=None): + return getattr(self.context, '__acl__', default) + + def permits(self, permission, *principals): + acl = self.get_acl() + if acl is None: + raise NoAuthorizationInformation('%s item has no __acl__' % acl) + + for ace in acl: + ace_action, ace_principal, ace_permissions = ace + for principal in flatten(principals): + if ace_principal == principal: + permissions = flatten(ace_permissions) + if permission in permissions: + action = ace_action + if action == Allow: + result = Allowed(ace, acl, permission, principals, + self.context) + self.logger and self.logger.debug(str(result)) + return result + result = Denied(ace, acl, permission, principals, + self.context) + self.logger and self.logger.debug(str(result)) + return result + result = Denied(None, acl, permission, principals, self.context) + self.logger and self.logger.debug(str(result)) + return result + + +class RemoteUserACLSecurityPolicy(object): + implements(ISecurityPolicy) + authorizer_factory = ACLAuthorizer + + def __init__(self, logger=None): + self.logger = logger + + def permits(self, context, request, permission): + userid = request.environ.get('REMOTE_USER', None) + effective_principals = [Everyone] + + if userid is not None: + effective_principals.append(Authenticated) + effective_principals.append(userid) + + for location in LocationIterator(context): + authorizer = self.authorizer_factory(context, self.logger) + try: + return authorizer.permits(permission, *effective_principals) + except NoAuthorizationInformation: + continue + + return False + +class PermitsResult: + def __init__(self, ace, acl, permission, principals, context): + self.acl = acl + self.ace = ace + self.permission = permission + self.principals = principals + self.context_repr = repr(context) + + def __str__(self): + msg = '%s: %r via ace %r in acl %s or principals %r in context %s' + msg = msg % (self.__class__.__name__, + self.permission, self.ace, self.acl, self.principals, + self.context_repr) + return msg + +class Denied(PermitsResult): + def __nonzero__(self): + return False + + def __eq__(self, other): + if bool(other) is False: + return True + +class Allowed(PermitsResult): + def __nonzero__(self): + return True + + def __eq__(self, other): + if bool(other) is True: + return True + +def flatten(x): + """flatten(sequence) -> list + + Returns a single, flat list which contains all elements retrieved + from the sequence and all recursively contained sub-sequences + (iterables). + + Examples: + >>> [1, 2, [3,4], (5,6)] + [1, 2, [3, 4], (5, 6)] + >>> flatten([[[1,2,3], (42,None)], [4,5], [6], 7, MyVector(8,9,10)]) + [1, 2, 3, 42, None, 4, 5, 6, 7, 8, 9, 10]""" + if isinstance(x, basestring): + return [x] + result = [] + for el in x: + if hasattr(el, "__iter__") and not isinstance(el, basestring): + result.extend(flatten(el)) + else: + result.append(el) + return result + +class ViewPermission(object): + implements(IViewPermission) + def __init__(self, context, request, permission_name): + self.context = context + self.request = request + self.permission_name = permission_name + + def __call__(self, security_policy): + return security_policy.permits(self.context, self.request, + self.permission_name) + + def __repr__(self): + return '<Permission at %s named %r for %r>' % (id(self), + self.permission_name, + self.request.view_name) + +class ViewPermissionFactory(object): + implements(IViewPermissionFactory) + def __init__(self, permission_name): + self.permission_name = permission_name + + def __call__(self, context, request): + return ViewPermission(context, request, self.permission_name) + + diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py index 8bb579054..148b72427 100644 --- a/repoze/bfg/tests/test_router.py +++ b/repoze/bfg/tests/test_router.py @@ -21,12 +21,24 @@ class RouterTests(unittest.TestCase, PlacelessSetup): from repoze.bfg.interfaces import IViewFactory gsm.registerAdapter(app, for_, IViewFactory, name) + def _registerPermission(self, permission, name, *for_): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.interfaces import IViewPermission + gsm.registerAdapter(permission, for_, IViewPermission, name) + def _registerWSGIFactory(self, app, name, *for_): import zope.component gsm = zope.component.getGlobalSiteManager() from repoze.bfg.interfaces import IWSGIApplicationFactory gsm.registerAdapter(app, for_, IWSGIApplicationFactory, name) + def _registerSecurityPolicy(self, secpol): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.interfaces import ISecurityPolicy + gsm.registerUtility(secpol, ISecurityPolicy) + def _getTargetClass(self): from repoze.bfg.router import Router return Router @@ -155,6 +167,88 @@ class RouterTests(unittest.TestCase, PlacelessSetup): self.failUnless('404' in result[0]) self.assertEqual(start_response.status, '404 Not Found') + def test_call_view_registered_security_policy_permission_none(self): + rootpolicy = make_rootpolicy(None) + from zope.interface import Interface + from zope.interface import directlyProvides + class IContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() + directlyProvides(context, IContext) + traversalfactory = make_traversal_factory(context, '', ['']) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) + environ = self._makeEnviron() + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, '', IContext, IRequest) + self._registerWSGIFactory(wsgifactory, '', None, None, None) + secpol = DummySecurityPolicy() + self._registerSecurityPolicy(secpol) + app_context = make_appcontext() + router = self._makeOne(rootpolicy, None) + start_response = DummyStartResponse() + result = router(environ, start_response) + self.assertEqual(start_response.status, '200 OK') + + def test_call_view_registered_security_policy_permission_succeeds(self): + rootpolicy = make_rootpolicy(None) + from zope.interface import Interface + from zope.interface import directlyProvides + class IContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() + directlyProvides(context, IContext) + traversalfactory = make_traversal_factory(context, '', ['']) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) + secpol = DummySecurityPolicy() + permissionfactory = make_permission_factory(True) + environ = self._makeEnviron() + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, '', IContext, IRequest) + self._registerWSGIFactory(wsgifactory, '', None, None, None) + self._registerSecurityPolicy(secpol) + self._registerPermission(permissionfactory, '', IContext, IRequest) + app_context = make_appcontext() + router = self._makeOne(rootpolicy, None) + start_response = DummyStartResponse() + result = router(environ, start_response) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(permissionfactory.checked_with, secpol) + + def test_call_view_registered_security_policy_permission_failss(self): + rootpolicy = make_rootpolicy(None) + from zope.interface import Interface + from zope.interface import directlyProvides + class IContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() + directlyProvides(context, IContext) + traversalfactory = make_traversal_factory(context, '', ['']) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) + secpol = DummySecurityPolicy() + permissionfactory = make_permission_factory(False) + environ = self._makeEnviron() + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, '', IContext, IRequest) + self._registerWSGIFactory(wsgifactory, '', None, None, None) + self._registerSecurityPolicy(secpol) + self._registerPermission(permissionfactory, '', IContext, IRequest) + app_context = make_appcontext() + router = self._makeOne(rootpolicy, None) + start_response = DummyStartResponse() + result = router(environ, start_response) + self.assertEqual(start_response.status, '401 Unauthorized') + self.failUnless('permission' in result[0]) + self.assertEqual(permissionfactory.checked_with, secpol) + class MakeAppTests(unittest.TestCase, PlacelessSetup): def setUp(self): PlacelessSetup.setUp(self) @@ -213,6 +307,20 @@ def make_traversal_factory(context, name, subpath): return context, name, subpath return DummyTraversalFactory +def make_permission_factory(result): + class DummyPermissionFactory: + def __init__(self, context, request): + self.context = context + self.request = request + + def __call__(self, secpol): + self.__class__.checked_with = secpol + return result + + def __repr__(self): + return 'permission' + return DummyPermissionFactory + def make_rootpolicy(root): def rootpolicy(environ): return root @@ -237,3 +345,6 @@ class DummyResponse: headerlist = () app_iter = () +class DummySecurityPolicy: + pass + diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py new file mode 100644 index 000000000..8a4c624e9 --- /dev/null +++ b/repoze/bfg/tests/test_security.py @@ -0,0 +1,362 @@ +import unittest + +from zope.component.testing import PlacelessSetup + +class TestACLAuthorizer(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.security import ACLAuthorizer + return ACLAuthorizer + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_permits_no_acl_raises(self): + context = DummyContext() + logger = DummyLogger() + authorizer = self._makeOne(context, logger) + from repoze.bfg.interfaces import NoAuthorizationInformation + self.assertRaises(NoAuthorizationInformation, + authorizer.permits, (), None) + + def test_permits_deny_implicit_empty_acl(self): + context = DummyContext() + logger = DummyLogger() + context.__acl__ = [] + authorizer = self._makeOne(context, logger) + result = authorizer.permits((), None) + self.assertEqual(result, False) + self.assertEqual(result.ace, None) + + def test_permits_deny_no_principals_implicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Allow + from repoze.bfg.security import Everyone + acl = [(Allow, Everyone, 'view')] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits(None) + self.assertEqual(result, False) + self.assertEqual(result.ace, None) + + def test_permits_deny_oneacl_implicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Allow + acl = [(Allow, 'somebody', 'view')] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits('view', 'somebodyelse') + self.assertEqual(result, False) + self.assertEqual(result.ace, None) + + def test_permits_deny_twoacl_implicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Allow + acl = [(Allow, 'somebody', 'view'), (Allow, 'somebody', 'write')] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits('view', 'somebodyelse') + self.assertEqual(result, False) + self.assertEqual(result.ace, None) + + def test_permits_deny_oneacl_explcit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Deny + ace = (Deny, 'somebody', 'view') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits('view', 'somebody') + self.assertEqual(result, False) + self.assertEqual(result.ace, ace) + + def test_permits_deny_oneacl_multiperm_explcit(self): + context = DummyContext() + logger = DummyLogger() + acl = [] + from repoze.bfg.security import Deny + from repoze.bfg.security import Allow + deny = (Deny, 'somebody', ('view', 'read')) + allow = (Allow, 'somebody', 'view') + acl = [deny, allow] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits('view', 'somebody') + self.assertEqual(result, False) + self.assertEqual(result.ace, deny) + + def test_permits_deny_twoacl_explicit(self): + context = DummyContext() + logger = DummyLogger() + acl = [] + from repoze.bfg.security import Deny + from repoze.bfg.security import Allow + allow = (Allow, 'somebody', 'read') + deny = (Deny, 'somebody', 'view') + acl = [allow, deny] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits('view', 'somebody') + self.assertEqual(result, False) + self.assertEqual(result.ace, deny) + + def test_permits_allow_twoacl_explicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Deny + from repoze.bfg.security import Allow + allow = (Allow, 'somebody', 'read') + deny = (Deny, 'somebody', 'view') + acl = [allow, deny] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + result = authorizer.permits('read', 'somebody') + self.assertEqual(result, True) + self.assertEqual(result.ace, allow) + + def test_permits_nested_principals_list_allow(self): + context = DummyContext() + logger = DummyLogger() + acl = [] + from repoze.bfg.security import Allow + ace = (Allow, 'larry', 'read') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + principals = (['fred', ['jim', ['bob', 'larry']]]) + result = authorizer.permits('read', *principals) + self.assertEqual(result, True) + self.assertEqual(result.ace, ace) + + def test_permits_nested_principals_list_deny_explicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Deny + ace = (Deny, 'larry', 'read') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + principals = (['fred', ['jim', ['bob', 'larry']]]) + result = authorizer.permits('read', *principals) + self.assertEqual(result, False) + self.assertEqual(result.ace, ace) + + def test_permits_nested_principals_list_deny_implicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Allow + ace = (Allow, 'somebodyelse', 'read') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + principals = (['fred', ['jim', ['bob', 'larry']]]) + result = authorizer.permits('read', *principals) + self.assertEqual(result, False) + + def test_logging_deny_implicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Allow + ace = (Allow, 'somebodyelse', 'read') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + principals = ['fred'] + result = authorizer.permits('read', *principals) + self.assertEqual(len(logger.messages), 1) + + def test_logging_deny_explicit(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Deny + ace = (Deny, 'somebodyelse', 'read') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + principals = ['somebodyelse'] + result = authorizer.permits('read', *principals) + self.assertEqual(len(logger.messages), 1) + + def test_logging_allow(self): + context = DummyContext() + logger = DummyLogger() + from repoze.bfg.security import Allow + ace = (Allow, 'somebodyelse', 'read') + acl = [ace] + context.__acl__ = acl + authorizer = self._makeOne(context, logger) + principals = ['somebodyelse'] + result = authorizer.permits('read', *principals) + self.assertEqual(len(logger.messages), 1) + +class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): + def _getTargetClass(self): + from repoze.bfg.security import RemoteUserACLSecurityPolicy + return RemoteUserACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def setUp(self): + PlacelessSetup.setUp(self) + + def tearDown(self): + PlacelessSetup.tearDown(self) + + def test_permits_no_remote_user_no_acl_info_on_context(self): + context = DummyContext() + request = DummyRequest({}) + logger = DummyLogger() + policy = self._makeOne(logger) + authorizer_factory = make_authorizer_factory(None) + policy.authorizer_factory = authorizer_factory + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + from repoze.bfg.security import Everyone + self.assertEqual(authorizer_factory.principals, (Everyone,)) + self.assertEqual(authorizer_factory.permission, 'view') + self.assertEqual(authorizer_factory.context, context) + + def test_permits_no_remote_user_acl_info_on_context(self): + context = DummyContext() + context.__acl__ = [] + request = DummyRequest({}) + logger = DummyLogger() + policy = self._makeOne(logger) + authorizer_factory = make_authorizer_factory(None) + policy.authorizer_factory = authorizer_factory + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + from repoze.bfg.security import Everyone + self.assertEqual(authorizer_factory.principals, (Everyone,)) + self.assertEqual(authorizer_factory.permission, 'view') + self.assertEqual(authorizer_factory.context, context) + + def test_permits_no_remote_user_withparents_root_has_acl_info(self): + context = DummyContext() + context.__name__ = None + context.__parent__ = None + context2 = DummyContext() + context2.__name__ = 'context2' + context2.__parent__ = context + context.__acl__ = [] + request = DummyRequest({}) + logger = DummyLogger() + policy = self._makeOne(logger) + authorizer_factory = make_authorizer_factory(None) + policy.authorizer_factory = authorizer_factory + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + from repoze.bfg.security import Everyone + self.assertEqual(authorizer_factory.principals, (Everyone,)) + self.assertEqual(authorizer_factory.permission, 'view') + self.assertEqual(authorizer_factory.context, context) + + def test_permits_no_remote_user_withparents_root_allows_everyone(self): + context = DummyContext() + context.__name__ = None + context.__parent__ = None + context2 = DummyContext() + context2.__name__ = 'context2' + context2.__parent__ = context + request = DummyRequest({}) + logger = DummyLogger() + policy = self._makeOne(logger) + authorizer_factory = make_authorizer_factory(context) + policy.authorizer_factory = authorizer_factory + result = policy.permits(context, request, 'view') + self.assertEqual(result, True) + from repoze.bfg.security import Everyone + self.assertEqual(authorizer_factory.principals, (Everyone,)) + self.assertEqual(authorizer_factory.permission, 'view') + self.assertEqual(authorizer_factory.context, context) + + +class TestViewPermission(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.security import ViewPermission + return ViewPermission + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_call(self): + context = DummyContext() + request = DummyRequest({}) + secpol = DummySecurityPolicy(True) + permission = self._makeOne(context, request, 'repoze.view') + result = permission(secpol) + self.assertEqual(result, True) + self.assertEqual(secpol.checked, (context, request, 'repoze.view')) + +class TestViewPermissionFactory(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.security import ViewPermissionFactory + return ViewPermissionFactory + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_call(self): + context = DummyContext() + request = DummyRequest({}) + factory = self._makeOne('repoze.view') + result = factory(context, request) + self.assertEqual(result.permission_name, 'repoze.view') + self.assertEqual(result.context, context) + self.assertEqual(result.request, request) + +class DummyContext: + pass + +class DummyRequest: + def __init__(self, environ): + self.environ = environ + +class DummySecurityPolicy: + def __init__(self, result): + self.result = result + + def permits(self, *args): + self.checked = args + return self.result + +class DummyLogger: + def __init__(self): + self.messages = [] + def debug(self, msg): + self.messages.append(msg) + +class make_authorizer_factory: + def __init__(self, expected_context, intermediates_raise=False): + self.expected_context = expected_context + self.intermediates_raise = intermediates_raise + + def __call__(self, context, logger): + authorizer = self + class Authorizer: + def permits(self, permission, *principals): + authorizer.permission = permission + authorizer.principals = principals + authorizer.context = context + result = authorizer.expected_context == context + if not result and authorizer.intermediates_raise: + from repoze.bfg.interfaces import NoAuthorizationInformation + raise NoAuthorizationInformation() + return result + return Authorizer() + + + + + + + + diff --git a/repoze/bfg/tests/test_wsgiadapter.py b/repoze/bfg/tests/test_wsgiadapter.py index 500c7b138..217d43427 100644 --- a/repoze/bfg/tests/test_wsgiadapter.py +++ b/repoze/bfg/tests/test_wsgiadapter.py @@ -91,58 +91,6 @@ class NaiveWSGIAdapterTests(unittest.TestCase, PlacelessSetup): start_response = DummyStartResponse() self.assertRaises(ValueError, adapter, environ, start_response) - def test_view_fails_security_policy(self): - import zope.component - gsm = zope.component.getGlobalSiteManager() - from repoze.bfg.wsgiadapter import IViewSecurityPolicy - def failed(context, request): - def view(): - response = DummyResponse() - response.app_iter = ['failed'] - response.status = '401 Unauthorized' - response.headerlist = () - return response - return view - gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy) - request = DummyRequest() - response = DummyResponse() - response.app_iter = ['Hello world'] - def view(request): - response.request = request - return response - context = DummyContext() - adapter = self._makeOne(context, request, view) - environ = {} - start_response = DummyStartResponse() - result = adapter(environ, start_response) - self.assertEqual(result, ['failed']) - self.assertEqual(start_response.headers, ()) - self.assertEqual(start_response.status, '401 Unauthorized') - - def test_view_passes_security_policy(self): - import zope.component - gsm = zope.component.getGlobalSiteManager() - from repoze.bfg.wsgiadapter import IViewSecurityPolicy - def failed(context, request): - def view(): - return None - return view - gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy) - request = DummyRequest() - response = DummyResponse() - response.app_iter = ['Hello world'] - def view(request): - response.request = request - return response - context = DummyContext() - adapter = self._makeOne(context, request, view) - environ = {} - start_response = DummyStartResponse() - result = adapter(environ, start_response) - self.assertEqual(result, ['Hello world']) - self.assertEqual(start_response.headers, ()) - self.assertEqual(start_response.status, '200 OK') - self.assertEqual(response.request, request) class DummyContext: pass diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py index db826b687..a2602b9c1 100644 --- a/repoze/bfg/tests/test_zcml.py +++ b/repoze/bfg/tests/test_zcml.py @@ -36,10 +36,12 @@ class TestViewDirective(unittest.TestCase, PlacelessSetup): from repoze.bfg.interfaces import IView from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IViewFactory + from repoze.bfg.interfaces import IViewPermission + from repoze.bfg.security import ViewPermissionFactory from zope.component.zcml import handler from zope.component.interface import provideInterface - self.assertEqual(len(actions), 3) + self.assertEqual(len(actions), 4) regutil_discriminator = ('utility', IView, context.path('minimal.pt')) regutil = actions[0] @@ -57,7 +59,20 @@ class TestViewDirective(unittest.TestCase, PlacelessSetup): self.assertEqual(provide['args'][0], '') self.assertEqual(provide['args'][1], IFoo) - regadapt = actions[2] + permission = actions[2] + permission_discriminator = ('permission', IFoo, '', IRequest, + IViewPermission) + self.assertEqual(permission['discriminator'], permission_discriminator) + self.assertEqual(permission['callable'], handler) + self.assertEqual(permission['args'][0], 'registerAdapter') + self.failUnless(isinstance(permission['args'][1],ViewPermissionFactory)) + self.assertEqual(permission['args'][1].permission_name, 'repoze.view') + self.assertEqual(permission['args'][2], (IFoo, IRequest)) + self.assertEqual(permission['args'][3], IViewPermission) + self.assertEqual(permission['args'][4], '') + self.assertEqual(permission['args'][5], None) + + regadapt = actions[3] regadapt_discriminator = ('view', IFoo, '', IRequest, IViewFactory) self.assertEqual(regadapt['discriminator'], regadapt_discriminator) self.assertEqual(regadapt['callable'], handler) @@ -78,18 +93,33 @@ class TestViewDirective(unittest.TestCase, PlacelessSetup): actions = context.actions from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IViewFactory + from repoze.bfg.interfaces import IViewPermission + from repoze.bfg.security import ViewPermissionFactory from zope.component.zcml import handler from zope.component.interface import provideInterface - self.assertEqual(len(actions), 2) + self.assertEqual(len(actions), 3) provide = actions[0] self.assertEqual(provide['discriminator'], None) self.assertEqual(provide['callable'], provideInterface) self.assertEqual(provide['args'][0], '') self.assertEqual(provide['args'][1], IFoo) + + permission = actions[1] + permission_discriminator = ('permission', IFoo, '', IRequest, + IViewPermission) + self.assertEqual(permission['discriminator'], permission_discriminator) + self.assertEqual(permission['callable'], handler) + self.assertEqual(permission['args'][0], 'registerAdapter') + self.failUnless(isinstance(permission['args'][1],ViewPermissionFactory)) + self.assertEqual(permission['args'][1].permission_name, 'repoze.view') + self.assertEqual(permission['args'][2], (IFoo, IRequest)) + self.assertEqual(permission['args'][3], IViewPermission) + self.assertEqual(permission['args'][4], '') + self.assertEqual(permission['args'][5], None) - regadapt = actions[1] + regadapt = actions[2] regadapt_discriminator = ('view', IFoo, '', IRequest, IViewFactory) self.assertEqual(regadapt['discriminator'], regadapt_discriminator) self.assertEqual(regadapt['callable'], handler) diff --git a/repoze/bfg/wsgiadapter.py b/repoze/bfg/wsgiadapter.py index c2d51a799..d114dd67e 100644 --- a/repoze/bfg/wsgiadapter.py +++ b/repoze/bfg/wsgiadapter.py @@ -1,20 +1,14 @@ from zope.component import queryMultiAdapter +from zope.component import queryUtility from zope.interface import classProvides from zope.interface import implements from zope.interface import Interface from repoze.bfg.interfaces import IWSGIApplicationFactory from repoze.bfg.interfaces import IWSGIApplication +from repoze.bfg.interfaces import ISecurityPolicy from repoze.bfg.mapply import mapply -class IViewSecurityPolicy(Interface): - """ Marker interface for a view security policy; a view security - policy. """ - def __call__(): - """ Return None if the security check succeeded, - otherwise it should return a WSGI application representing an - unauthorized view""" - def isResponse(ob): if ( hasattr(ob, 'app_iter') and hasattr(ob, 'headerlist') and hasattr(ob, 'status') ): @@ -36,12 +30,6 @@ class NaiveWSGIViewAdapter: context = self.context request = self.request view = self.view - security_policy = queryMultiAdapter((context, request), - IViewSecurityPolicy) - if security_policy: - failed_view = security_policy() - if failed_view: - view = failed_view catch_response = [] def replace_start_response(status, headers): diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py index 1133b3d0b..309d9b5c5 100644 --- a/repoze/bfg/zcml.py +++ b/repoze/bfg/zcml.py @@ -11,15 +11,17 @@ from zope.interface import implements from zope.interface import classProvides from zope.schema import TextLine -from zope.security.zcml import Permission from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IViewFactory +from repoze.bfg.interfaces import IViewPermission from repoze.bfg.interfaces import IView from repoze.bfg.template import Z3CPTTemplateFactory from repoze.bfg.template import render_template +from repoze.bfg.security import ViewPermissionFactory + class TemplateOnlyView(object): implements(IView) classProvides(IViewFactory) @@ -58,7 +60,7 @@ class TemplateOnlyViewFactory(object): return factory def view(_context, - permission, + permission=None, for_=None, factory=None, name="", @@ -95,6 +97,16 @@ def view(_context, args = ('', for_) ) + if permission: + pfactory = ViewPermissionFactory(permission) + _context.action( + discriminator = ('permission', for_,name, IRequest,IViewPermission), + callable = handler, + args = ('registerAdapter', + pfactory, (for_, IRequest), IViewPermission, name, + _context.info), + ) + _context.action( discriminator = ('view', for_, name, IRequest, IViewFactory), callable = handler, @@ -104,23 +116,15 @@ def view(_context, ) class IViewDirective(Interface): - """ - The page directive is used to create views that provide a single - url or page. - - The page directive creates a new view class from a given template - and/or class and registers it. - """ - for_ = GlobalObject( title=u"The interface or class this view is for.", required=False ) - permission = Permission( + permission = TextLine( title=u"Permission", description=u"The permission needed to use the view.", - required=True + required=False ) factory = GlobalObject( |
