summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2008-07-16 10:32:08 +0000
committerChris McDonough <chrism@agendaless.com>2008-07-16 10:32:08 +0000
commit2466f6eaa2246598dc6cb3c962364773eb4cc64a (patch)
tree80954892ad8e12cffb534f3ae92cd321d4d870f5
parent23aa82c4963dc75737d7dc8a84d7639775c3b282 (diff)
downloadpyramid-2466f6eaa2246598dc6cb3c962364773eb4cc64a.tar.gz
pyramid-2466f6eaa2246598dc6cb3c962364773eb4cc64a.tar.bz2
pyramid-2466f6eaa2246598dc6cb3c962364773eb4cc64a.zip
Add security.
-rw-r--r--repoze/bfg/configure.zcml6
-rw-r--r--repoze/bfg/interfaces.py19
-rw-r--r--repoze/bfg/router.py25
-rw-r--r--repoze/bfg/sampleapp/configure.zcml13
-rw-r--r--repoze/bfg/sampleapp/models.py25
-rw-r--r--repoze/bfg/sampleapp/run.py11
-rw-r--r--repoze/bfg/sampleapp/views.py2
-rw-r--r--repoze/bfg/security.py150
-rw-r--r--repoze/bfg/tests/test_router.py111
-rw-r--r--repoze/bfg/tests/test_security.py362
-rw-r--r--repoze/bfg/tests/test_wsgiadapter.py52
-rw-r--r--repoze/bfg/tests/test_zcml.py38
-rw-r--r--repoze/bfg/wsgiadapter.py16
-rw-r--r--repoze/bfg/zcml.py28
14 files changed, 741 insertions, 117 deletions
diff --git a/repoze/bfg/configure.zcml b/repoze/bfg/configure.zcml
index 0223e25ba..c518adc85 100644
--- a/repoze/bfg/configure.zcml
+++ b/repoze/bfg/configure.zcml
@@ -2,12 +2,6 @@
i18n_domain="repoze.bfg">
<include package="z3c.pt" />
- <include package="zope.security" file="meta.zcml"/>
-
- <permission
- id="repoze.view"
- title="View"
- />
<adapter
factory=".traversal.NaivePublishTraverser"
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index 005d30027..f51e23cbf 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -43,3 +43,22 @@ class ITemplateFactory(Interface):
def __call__(template_path):
""" Return an IView given a template path """
+class ISecurityPolicy(Interface):
+ """ A utility that provides a mechanism to check authorization
+ using authentication data """
+ def permits(context, request, permission):
+ """ Returns True if the combination of the authorization
+ information in the context and the authentication data in
+ the request allow the action implied by the permission """
+
+class NoAuthorizationInformation(Exception):
+ pass
+
+class IViewPermission(Interface):
+ def __call__(security_policy):
+ """ Return True if the permission allows, return False if it denies. """
+
+class IViewPermissionFactory(Interface):
+ def __call__(context, request):
+ """ Return an IViewPermission """
+
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 00966dfd1..becf3e3e4 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -1,19 +1,21 @@
from zope.component import getMultiAdapter
from zope.component import queryMultiAdapter
+from zope.component import queryUtility
from zope.interface import directlyProvides
from webob import Request
from webob.exc import HTTPNotFound
+from webob.exc import HTTPUnauthorized
from repoze.bfg.interfaces import IPublishTraverserFactory
from repoze.bfg.interfaces import IViewFactory
+from repoze.bfg.interfaces import IViewPermission
+from repoze.bfg.interfaces import ISecurityPolicy
from repoze.bfg.interfaces import IWSGIApplicationFactory
from repoze.bfg.interfaces import IRequest
from repoze.bfg.registry import registry_manager
-_marker = ()
-
class Router:
def __init__(self, root_policy, registry):
self.root_policy = root_policy
@@ -25,14 +27,23 @@ class Router:
directlyProvides(request, IRequest)
root = self.root_policy(environ)
path = environ.get('PATH_INFO', '/')
- traverser = getMultiAdapter((root, request),
- IPublishTraverserFactory)
+ traverser = getMultiAdapter((root, request), IPublishTraverserFactory)
context, name, subpath = traverser(path)
request.subpath = subpath
request.view_name = name
- app = queryMultiAdapter((context, request), IViewFactory, name=name,
- default=_marker)
- if app is _marker:
+
+ security_policy = queryUtility(ISecurityPolicy)
+ if security_policy:
+ permission = queryMultiAdapter((context, request), IViewPermission,
+ name=name)
+ if permission is not None:
+ if not permission(security_policy):
+ app = HTTPUnauthorized()
+ app.explanation = repr(permission)
+ return app(environ, start_response)
+
+ app = queryMultiAdapter((context, request), IViewFactory, name=name)
+ if app is None:
app = HTTPNotFound(request.url)
else:
app = getMultiAdapter((context, request, app),
diff --git a/repoze/bfg/sampleapp/configure.zcml b/repoze/bfg/sampleapp/configure.zcml
index 374a93090..bd1b46d6a 100644
--- a/repoze/bfg/sampleapp/configure.zcml
+++ b/repoze/bfg/sampleapp/configure.zcml
@@ -4,18 +4,23 @@
<include package="repoze.bfg" />
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.RemoteUserACLSecurityPolicy"
+ />
+
<!-- the default view for a Blog -->
<bfg:view
for=".models.IBlog"
factory=".views.blog_default_view"
- permission="repoze.view"
+ permission="view"
/>
<!-- the default view for a BlogEntry -->
<bfg:view
for=".models.IBlogEntry"
factory=".views.blog_entry_default_view"
- permission="repoze.view"
+ permission="view"
/>
<!-- the add view for a BlogEntry -->
@@ -23,7 +28,7 @@
for=".models.IBlog"
factory=".views.blog_entry_add_view"
name="add_entry.html"
- permission="repoze.view"
+ permission="add"
/>
<!-- the contents view for any mapping (shows dict members) -->
@@ -31,7 +36,7 @@
for=".models.IMapping"
factory=".views.contents_view"
name="contents.html"
- permission="repoze.view"
+ permission="manage"
/>
</configure>
diff --git a/repoze/bfg/sampleapp/models.py b/repoze/bfg/sampleapp/models.py
index 975d79142..d07110e83 100644
--- a/repoze/bfg/sampleapp/models.py
+++ b/repoze/bfg/sampleapp/models.py
@@ -1,5 +1,10 @@
from zope.interface import Interface
from zope.interface import implements
+from zope.location.interfaces import ILocation
+from zope.location.location import Location
+
+from repoze.bfg.security import Everyone
+from repoze.bfg.security import Allow
import datetime
@@ -9,20 +14,26 @@ class IMapping(Interface):
class IBlog(Interface):
pass
-class Blog(dict):
- implements(IBlog, IMapping)
- def __init__(self, name):
- self.__name__ = name
- dict.__init__(self)
+class Blog(dict, Location):
+ __acl__ = [ (Allow, Everyone, 'view'), (Allow, 'group:editors', 'add'),
+ (Allow, 'group:managers', 'manage') ]
+ implements(IBlog, IMapping, ILocation)
class IBlogEntry(Interface):
pass
class BlogEntry(object):
implements(IBlogEntry)
- def __init__(self, name, title, body, author):
- self.__name__ = name
+ def __init__(self, title, body, author):
self.title = title
self.body = body
self.author = author
self.created = datetime.datetime.now()
+
+blog = Blog()
+blog['sample'] = BlogEntry('Sample Blog Entry',
+ '<p>This is a sample blog entry</p>',
+ 'chrism')
+def get_root(environ):
+ return blog
+
diff --git a/repoze/bfg/sampleapp/run.py b/repoze/bfg/sampleapp/run.py
index c6fbeed78..6c36c430e 100644
--- a/repoze/bfg/sampleapp/run.py
+++ b/repoze/bfg/sampleapp/run.py
@@ -1,17 +1,8 @@
from repoze.bfg import make_app
from repoze.bfg import sampleapp
-
-from repoze.bfg.sampleapp.models import Blog
-from repoze.bfg.sampleapp.models import BlogEntry
+from repoze.bfg.sampleapp.models import get_root
def main():
- blog = Blog('Sample blog')
- blog['sample'] = BlogEntry('sample', 'Sample Blog Entry',
- '<p>This is a sample blog entry</p>',
- 'chrism')
- def get_root(environ):
- return blog
-
app = make_app(get_root, sampleapp)
from paste import httpserver
httpserver.serve(app, host='0.0.0.0', port='5432')
diff --git a/repoze/bfg/sampleapp/views.py b/repoze/bfg/sampleapp/views.py
index 41ab69061..ce591dec2 100644
--- a/repoze/bfg/sampleapp/views.py
+++ b/repoze/bfg/sampleapp/views.py
@@ -63,8 +63,8 @@ def blog_entry_add_view(context, request):
author = form['author']
body = form['body']
title = form['title']
+ new_entry = BlogEntry(title, body, author)
name = str(time.time())
- new_entry = BlogEntry(name, title, body, author)
context[name] = new_entry
return HTTPFound(location='/')
diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py
new file mode 100644
index 000000000..6f2c858e3
--- /dev/null
+++ b/repoze/bfg/security.py
@@ -0,0 +1,150 @@
+from zope.interface import implements
+from zope.location.location import LocationIterator
+
+from repoze.bfg.interfaces import ISecurityPolicy
+from repoze.bfg.interfaces import IViewPermission
+from repoze.bfg.interfaces import IViewPermissionFactory
+from repoze.bfg.interfaces import NoAuthorizationInformation
+
+Everyone = 'system.Everyone'
+Authenticated = 'system.Authenticated'
+Allow = 'Allow'
+Deny = 'Deny'
+
+class ACLAuthorizer(object):
+
+ def __init__(self, context, logger=None):
+ self.context = context
+ self.logger = logger
+
+ def get_acl(self, default=None):
+ return getattr(self.context, '__acl__', default)
+
+ def permits(self, permission, *principals):
+ acl = self.get_acl()
+ if acl is None:
+ raise NoAuthorizationInformation('%s item has no __acl__' % acl)
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ for principal in flatten(principals):
+ if ace_principal == principal:
+ permissions = flatten(ace_permissions)
+ if permission in permissions:
+ action = ace_action
+ if action == Allow:
+ result = Allowed(ace, acl, permission, principals,
+ self.context)
+ self.logger and self.logger.debug(str(result))
+ return result
+ result = Denied(ace, acl, permission, principals,
+ self.context)
+ self.logger and self.logger.debug(str(result))
+ return result
+ result = Denied(None, acl, permission, principals, self.context)
+ self.logger and self.logger.debug(str(result))
+ return result
+
+
+class RemoteUserACLSecurityPolicy(object):
+ implements(ISecurityPolicy)
+ authorizer_factory = ACLAuthorizer
+
+ def __init__(self, logger=None):
+ self.logger = logger
+
+ def permits(self, context, request, permission):
+ userid = request.environ.get('REMOTE_USER', None)
+ effective_principals = [Everyone]
+
+ if userid is not None:
+ effective_principals.append(Authenticated)
+ effective_principals.append(userid)
+
+ for location in LocationIterator(context):
+ authorizer = self.authorizer_factory(context, self.logger)
+ try:
+ return authorizer.permits(permission, *effective_principals)
+ except NoAuthorizationInformation:
+ continue
+
+ return False
+
+class PermitsResult:
+ def __init__(self, ace, acl, permission, principals, context):
+ self.acl = acl
+ self.ace = ace
+ self.permission = permission
+ self.principals = principals
+ self.context_repr = repr(context)
+
+ def __str__(self):
+ msg = '%s: %r via ace %r in acl %s or principals %r in context %s'
+ msg = msg % (self.__class__.__name__,
+ self.permission, self.ace, self.acl, self.principals,
+ self.context_repr)
+ return msg
+
+class Denied(PermitsResult):
+ def __nonzero__(self):
+ return False
+
+ def __eq__(self, other):
+ if bool(other) is False:
+ return True
+
+class Allowed(PermitsResult):
+ def __nonzero__(self):
+ return True
+
+ def __eq__(self, other):
+ if bool(other) is True:
+ return True
+
+def flatten(x):
+ """flatten(sequence) -> list
+
+ Returns a single, flat list which contains all elements retrieved
+ from the sequence and all recursively contained sub-sequences
+ (iterables).
+
+ Examples:
+ >>> [1, 2, [3,4], (5,6)]
+ [1, 2, [3, 4], (5, 6)]
+ >>> flatten([[[1,2,3], (42,None)], [4,5], [6], 7, MyVector(8,9,10)])
+ [1, 2, 3, 42, None, 4, 5, 6, 7, 8, 9, 10]"""
+ if isinstance(x, basestring):
+ return [x]
+ result = []
+ for el in x:
+ if hasattr(el, "__iter__") and not isinstance(el, basestring):
+ result.extend(flatten(el))
+ else:
+ result.append(el)
+ return result
+
+class ViewPermission(object):
+ implements(IViewPermission)
+ def __init__(self, context, request, permission_name):
+ self.context = context
+ self.request = request
+ self.permission_name = permission_name
+
+ def __call__(self, security_policy):
+ return security_policy.permits(self.context, self.request,
+ self.permission_name)
+
+ def __repr__(self):
+ return '<Permission at %s named %r for %r>' % (id(self),
+ self.permission_name,
+ self.request.view_name)
+
+class ViewPermissionFactory(object):
+ implements(IViewPermissionFactory)
+ def __init__(self, permission_name):
+ self.permission_name = permission_name
+
+ def __call__(self, context, request):
+ return ViewPermission(context, request, self.permission_name)
+
+
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index 8bb579054..148b72427 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -21,12 +21,24 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
from repoze.bfg.interfaces import IViewFactory
gsm.registerAdapter(app, for_, IViewFactory, name)
+ def _registerPermission(self, permission, name, *for_):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.interfaces import IViewPermission
+ gsm.registerAdapter(permission, for_, IViewPermission, name)
+
def _registerWSGIFactory(self, app, name, *for_):
import zope.component
gsm = zope.component.getGlobalSiteManager()
from repoze.bfg.interfaces import IWSGIApplicationFactory
gsm.registerAdapter(app, for_, IWSGIApplicationFactory, name)
+ def _registerSecurityPolicy(self, secpol):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.interfaces import ISecurityPolicy
+ gsm.registerUtility(secpol, ISecurityPolicy)
+
def _getTargetClass(self):
from repoze.bfg.router import Router
return Router
@@ -155,6 +167,88 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
self.failUnless('404' in result[0])
self.assertEqual(start_response.status, '404 Not Found')
+ def test_call_view_registered_security_policy_permission_none(self):
+ rootpolicy = make_rootpolicy(None)
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ traversalfactory = make_traversal_factory(context, '', [''])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+ environ = self._makeEnviron()
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, '', IContext, IRequest)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
+ secpol = DummySecurityPolicy()
+ self._registerSecurityPolicy(secpol)
+ app_context = make_appcontext()
+ router = self._makeOne(rootpolicy, None)
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(start_response.status, '200 OK')
+
+ def test_call_view_registered_security_policy_permission_succeeds(self):
+ rootpolicy = make_rootpolicy(None)
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ traversalfactory = make_traversal_factory(context, '', [''])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+ secpol = DummySecurityPolicy()
+ permissionfactory = make_permission_factory(True)
+ environ = self._makeEnviron()
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, '', IContext, IRequest)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
+ self._registerSecurityPolicy(secpol)
+ self._registerPermission(permissionfactory, '', IContext, IRequest)
+ app_context = make_appcontext()
+ router = self._makeOne(rootpolicy, None)
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(permissionfactory.checked_with, secpol)
+
+ def test_call_view_registered_security_policy_permission_failss(self):
+ rootpolicy = make_rootpolicy(None)
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ traversalfactory = make_traversal_factory(context, '', [''])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+ secpol = DummySecurityPolicy()
+ permissionfactory = make_permission_factory(False)
+ environ = self._makeEnviron()
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, '', IContext, IRequest)
+ self._registerWSGIFactory(wsgifactory, '', None, None, None)
+ self._registerSecurityPolicy(secpol)
+ self._registerPermission(permissionfactory, '', IContext, IRequest)
+ app_context = make_appcontext()
+ router = self._makeOne(rootpolicy, None)
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(start_response.status, '401 Unauthorized')
+ self.failUnless('permission' in result[0])
+ self.assertEqual(permissionfactory.checked_with, secpol)
+
class MakeAppTests(unittest.TestCase, PlacelessSetup):
def setUp(self):
PlacelessSetup.setUp(self)
@@ -213,6 +307,20 @@ def make_traversal_factory(context, name, subpath):
return context, name, subpath
return DummyTraversalFactory
+def make_permission_factory(result):
+ class DummyPermissionFactory:
+ def __init__(self, context, request):
+ self.context = context
+ self.request = request
+
+ def __call__(self, secpol):
+ self.__class__.checked_with = secpol
+ return result
+
+ def __repr__(self):
+ return 'permission'
+ return DummyPermissionFactory
+
def make_rootpolicy(root):
def rootpolicy(environ):
return root
@@ -237,3 +345,6 @@ class DummyResponse:
headerlist = ()
app_iter = ()
+class DummySecurityPolicy:
+ pass
+
diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py
new file mode 100644
index 000000000..8a4c624e9
--- /dev/null
+++ b/repoze/bfg/tests/test_security.py
@@ -0,0 +1,362 @@
+import unittest
+
+from zope.component.testing import PlacelessSetup
+
+class TestACLAuthorizer(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.security import ACLAuthorizer
+ return ACLAuthorizer
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_permits_no_acl_raises(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ authorizer = self._makeOne(context, logger)
+ from repoze.bfg.interfaces import NoAuthorizationInformation
+ self.assertRaises(NoAuthorizationInformation,
+ authorizer.permits, (), None)
+
+ def test_permits_deny_implicit_empty_acl(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ context.__acl__ = []
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits((), None)
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, None)
+
+ def test_permits_deny_no_principals_implicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import Everyone
+ acl = [(Allow, Everyone, 'view')]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits(None)
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, None)
+
+ def test_permits_deny_oneacl_implicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Allow
+ acl = [(Allow, 'somebody', 'view')]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits('view', 'somebodyelse')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, None)
+
+ def test_permits_deny_twoacl_implicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Allow
+ acl = [(Allow, 'somebody', 'view'), (Allow, 'somebody', 'write')]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits('view', 'somebodyelse')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, None)
+
+ def test_permits_deny_oneacl_explcit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Deny
+ ace = (Deny, 'somebody', 'view')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits('view', 'somebody')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, ace)
+
+ def test_permits_deny_oneacl_multiperm_explcit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ acl = []
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import Allow
+ deny = (Deny, 'somebody', ('view', 'read'))
+ allow = (Allow, 'somebody', 'view')
+ acl = [deny, allow]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits('view', 'somebody')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, deny)
+
+ def test_permits_deny_twoacl_explicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ acl = []
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import Allow
+ allow = (Allow, 'somebody', 'read')
+ deny = (Deny, 'somebody', 'view')
+ acl = [allow, deny]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits('view', 'somebody')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, deny)
+
+ def test_permits_allow_twoacl_explicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import Allow
+ allow = (Allow, 'somebody', 'read')
+ deny = (Deny, 'somebody', 'view')
+ acl = [allow, deny]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ result = authorizer.permits('read', 'somebody')
+ self.assertEqual(result, True)
+ self.assertEqual(result.ace, allow)
+
+ def test_permits_nested_principals_list_allow(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ acl = []
+ from repoze.bfg.security import Allow
+ ace = (Allow, 'larry', 'read')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ principals = (['fred', ['jim', ['bob', 'larry']]])
+ result = authorizer.permits('read', *principals)
+ self.assertEqual(result, True)
+ self.assertEqual(result.ace, ace)
+
+ def test_permits_nested_principals_list_deny_explicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Deny
+ ace = (Deny, 'larry', 'read')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ principals = (['fred', ['jim', ['bob', 'larry']]])
+ result = authorizer.permits('read', *principals)
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, ace)
+
+ def test_permits_nested_principals_list_deny_implicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Allow
+ ace = (Allow, 'somebodyelse', 'read')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ principals = (['fred', ['jim', ['bob', 'larry']]])
+ result = authorizer.permits('read', *principals)
+ self.assertEqual(result, False)
+
+ def test_logging_deny_implicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Allow
+ ace = (Allow, 'somebodyelse', 'read')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ principals = ['fred']
+ result = authorizer.permits('read', *principals)
+ self.assertEqual(len(logger.messages), 1)
+
+ def test_logging_deny_explicit(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Deny
+ ace = (Deny, 'somebodyelse', 'read')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ principals = ['somebodyelse']
+ result = authorizer.permits('read', *principals)
+ self.assertEqual(len(logger.messages), 1)
+
+ def test_logging_allow(self):
+ context = DummyContext()
+ logger = DummyLogger()
+ from repoze.bfg.security import Allow
+ ace = (Allow, 'somebodyelse', 'read')
+ acl = [ace]
+ context.__acl__ = acl
+ authorizer = self._makeOne(context, logger)
+ principals = ['somebodyelse']
+ result = authorizer.permits('read', *principals)
+ self.assertEqual(len(logger.messages), 1)
+
+class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+ def _getTargetClass(self):
+ from repoze.bfg.security import RemoteUserACLSecurityPolicy
+ return RemoteUserACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+
+ def tearDown(self):
+ PlacelessSetup.tearDown(self)
+
+ def test_permits_no_remote_user_no_acl_info_on_context(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ authorizer_factory = make_authorizer_factory(None)
+ policy.authorizer_factory = authorizer_factory
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(authorizer_factory.principals, (Everyone,))
+ self.assertEqual(authorizer_factory.permission, 'view')
+ self.assertEqual(authorizer_factory.context, context)
+
+ def test_permits_no_remote_user_acl_info_on_context(self):
+ context = DummyContext()
+ context.__acl__ = []
+ request = DummyRequest({})
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ authorizer_factory = make_authorizer_factory(None)
+ policy.authorizer_factory = authorizer_factory
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(authorizer_factory.principals, (Everyone,))
+ self.assertEqual(authorizer_factory.permission, 'view')
+ self.assertEqual(authorizer_factory.context, context)
+
+ def test_permits_no_remote_user_withparents_root_has_acl_info(self):
+ context = DummyContext()
+ context.__name__ = None
+ context.__parent__ = None
+ context2 = DummyContext()
+ context2.__name__ = 'context2'
+ context2.__parent__ = context
+ context.__acl__ = []
+ request = DummyRequest({})
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ authorizer_factory = make_authorizer_factory(None)
+ policy.authorizer_factory = authorizer_factory
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(authorizer_factory.principals, (Everyone,))
+ self.assertEqual(authorizer_factory.permission, 'view')
+ self.assertEqual(authorizer_factory.context, context)
+
+ def test_permits_no_remote_user_withparents_root_allows_everyone(self):
+ context = DummyContext()
+ context.__name__ = None
+ context.__parent__ = None
+ context2 = DummyContext()
+ context2.__name__ = 'context2'
+ context2.__parent__ = context
+ request = DummyRequest({})
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ authorizer_factory = make_authorizer_factory(context)
+ policy.authorizer_factory = authorizer_factory
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, True)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(authorizer_factory.principals, (Everyone,))
+ self.assertEqual(authorizer_factory.permission, 'view')
+ self.assertEqual(authorizer_factory.context, context)
+
+
+class TestViewPermission(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.security import ViewPermission
+ return ViewPermission
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_call(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ secpol = DummySecurityPolicy(True)
+ permission = self._makeOne(context, request, 'repoze.view')
+ result = permission(secpol)
+ self.assertEqual(result, True)
+ self.assertEqual(secpol.checked, (context, request, 'repoze.view'))
+
+class TestViewPermissionFactory(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.security import ViewPermissionFactory
+ return ViewPermissionFactory
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_call(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ factory = self._makeOne('repoze.view')
+ result = factory(context, request)
+ self.assertEqual(result.permission_name, 'repoze.view')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.request, request)
+
+class DummyContext:
+ pass
+
+class DummyRequest:
+ def __init__(self, environ):
+ self.environ = environ
+
+class DummySecurityPolicy:
+ def __init__(self, result):
+ self.result = result
+
+ def permits(self, *args):
+ self.checked = args
+ return self.result
+
+class DummyLogger:
+ def __init__(self):
+ self.messages = []
+ def debug(self, msg):
+ self.messages.append(msg)
+
+class make_authorizer_factory:
+ def __init__(self, expected_context, intermediates_raise=False):
+ self.expected_context = expected_context
+ self.intermediates_raise = intermediates_raise
+
+ def __call__(self, context, logger):
+ authorizer = self
+ class Authorizer:
+ def permits(self, permission, *principals):
+ authorizer.permission = permission
+ authorizer.principals = principals
+ authorizer.context = context
+ result = authorizer.expected_context == context
+ if not result and authorizer.intermediates_raise:
+ from repoze.bfg.interfaces import NoAuthorizationInformation
+ raise NoAuthorizationInformation()
+ return result
+ return Authorizer()
+
+
+
+
+
+
+
+
diff --git a/repoze/bfg/tests/test_wsgiadapter.py b/repoze/bfg/tests/test_wsgiadapter.py
index 500c7b138..217d43427 100644
--- a/repoze/bfg/tests/test_wsgiadapter.py
+++ b/repoze/bfg/tests/test_wsgiadapter.py
@@ -91,58 +91,6 @@ class NaiveWSGIAdapterTests(unittest.TestCase, PlacelessSetup):
start_response = DummyStartResponse()
self.assertRaises(ValueError, adapter, environ, start_response)
- def test_view_fails_security_policy(self):
- import zope.component
- gsm = zope.component.getGlobalSiteManager()
- from repoze.bfg.wsgiadapter import IViewSecurityPolicy
- def failed(context, request):
- def view():
- response = DummyResponse()
- response.app_iter = ['failed']
- response.status = '401 Unauthorized'
- response.headerlist = ()
- return response
- return view
- gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy)
- request = DummyRequest()
- response = DummyResponse()
- response.app_iter = ['Hello world']
- def view(request):
- response.request = request
- return response
- context = DummyContext()
- adapter = self._makeOne(context, request, view)
- environ = {}
- start_response = DummyStartResponse()
- result = adapter(environ, start_response)
- self.assertEqual(result, ['failed'])
- self.assertEqual(start_response.headers, ())
- self.assertEqual(start_response.status, '401 Unauthorized')
-
- def test_view_passes_security_policy(self):
- import zope.component
- gsm = zope.component.getGlobalSiteManager()
- from repoze.bfg.wsgiadapter import IViewSecurityPolicy
- def failed(context, request):
- def view():
- return None
- return view
- gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy)
- request = DummyRequest()
- response = DummyResponse()
- response.app_iter = ['Hello world']
- def view(request):
- response.request = request
- return response
- context = DummyContext()
- adapter = self._makeOne(context, request, view)
- environ = {}
- start_response = DummyStartResponse()
- result = adapter(environ, start_response)
- self.assertEqual(result, ['Hello world'])
- self.assertEqual(start_response.headers, ())
- self.assertEqual(start_response.status, '200 OK')
- self.assertEqual(response.request, request)
class DummyContext:
pass
diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py
index db826b687..a2602b9c1 100644
--- a/repoze/bfg/tests/test_zcml.py
+++ b/repoze/bfg/tests/test_zcml.py
@@ -36,10 +36,12 @@ class TestViewDirective(unittest.TestCase, PlacelessSetup):
from repoze.bfg.interfaces import IView
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IViewFactory
+ from repoze.bfg.interfaces import IViewPermission
+ from repoze.bfg.security import ViewPermissionFactory
from zope.component.zcml import handler
from zope.component.interface import provideInterface
- self.assertEqual(len(actions), 3)
+ self.assertEqual(len(actions), 4)
regutil_discriminator = ('utility', IView, context.path('minimal.pt'))
regutil = actions[0]
@@ -57,7 +59,20 @@ class TestViewDirective(unittest.TestCase, PlacelessSetup):
self.assertEqual(provide['args'][0], '')
self.assertEqual(provide['args'][1], IFoo)
- regadapt = actions[2]
+ permission = actions[2]
+ permission_discriminator = ('permission', IFoo, '', IRequest,
+ IViewPermission)
+ self.assertEqual(permission['discriminator'], permission_discriminator)
+ self.assertEqual(permission['callable'], handler)
+ self.assertEqual(permission['args'][0], 'registerAdapter')
+ self.failUnless(isinstance(permission['args'][1],ViewPermissionFactory))
+ self.assertEqual(permission['args'][1].permission_name, 'repoze.view')
+ self.assertEqual(permission['args'][2], (IFoo, IRequest))
+ self.assertEqual(permission['args'][3], IViewPermission)
+ self.assertEqual(permission['args'][4], '')
+ self.assertEqual(permission['args'][5], None)
+
+ regadapt = actions[3]
regadapt_discriminator = ('view', IFoo, '', IRequest, IViewFactory)
self.assertEqual(regadapt['discriminator'], regadapt_discriminator)
self.assertEqual(regadapt['callable'], handler)
@@ -78,18 +93,33 @@ class TestViewDirective(unittest.TestCase, PlacelessSetup):
actions = context.actions
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IViewFactory
+ from repoze.bfg.interfaces import IViewPermission
+ from repoze.bfg.security import ViewPermissionFactory
from zope.component.zcml import handler
from zope.component.interface import provideInterface
- self.assertEqual(len(actions), 2)
+ self.assertEqual(len(actions), 3)
provide = actions[0]
self.assertEqual(provide['discriminator'], None)
self.assertEqual(provide['callable'], provideInterface)
self.assertEqual(provide['args'][0], '')
self.assertEqual(provide['args'][1], IFoo)
+
+ permission = actions[1]
+ permission_discriminator = ('permission', IFoo, '', IRequest,
+ IViewPermission)
+ self.assertEqual(permission['discriminator'], permission_discriminator)
+ self.assertEqual(permission['callable'], handler)
+ self.assertEqual(permission['args'][0], 'registerAdapter')
+ self.failUnless(isinstance(permission['args'][1],ViewPermissionFactory))
+ self.assertEqual(permission['args'][1].permission_name, 'repoze.view')
+ self.assertEqual(permission['args'][2], (IFoo, IRequest))
+ self.assertEqual(permission['args'][3], IViewPermission)
+ self.assertEqual(permission['args'][4], '')
+ self.assertEqual(permission['args'][5], None)
- regadapt = actions[1]
+ regadapt = actions[2]
regadapt_discriminator = ('view', IFoo, '', IRequest, IViewFactory)
self.assertEqual(regadapt['discriminator'], regadapt_discriminator)
self.assertEqual(regadapt['callable'], handler)
diff --git a/repoze/bfg/wsgiadapter.py b/repoze/bfg/wsgiadapter.py
index c2d51a799..d114dd67e 100644
--- a/repoze/bfg/wsgiadapter.py
+++ b/repoze/bfg/wsgiadapter.py
@@ -1,20 +1,14 @@
from zope.component import queryMultiAdapter
+from zope.component import queryUtility
from zope.interface import classProvides
from zope.interface import implements
from zope.interface import Interface
from repoze.bfg.interfaces import IWSGIApplicationFactory
from repoze.bfg.interfaces import IWSGIApplication
+from repoze.bfg.interfaces import ISecurityPolicy
from repoze.bfg.mapply import mapply
-class IViewSecurityPolicy(Interface):
- """ Marker interface for a view security policy; a view security
- policy. """
- def __call__():
- """ Return None if the security check succeeded,
- otherwise it should return a WSGI application representing an
- unauthorized view"""
-
def isResponse(ob):
if ( hasattr(ob, 'app_iter') and hasattr(ob, 'headerlist') and
hasattr(ob, 'status') ):
@@ -36,12 +30,6 @@ class NaiveWSGIViewAdapter:
context = self.context
request = self.request
view = self.view
- security_policy = queryMultiAdapter((context, request),
- IViewSecurityPolicy)
- if security_policy:
- failed_view = security_policy()
- if failed_view:
- view = failed_view
catch_response = []
def replace_start_response(status, headers):
diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py
index 1133b3d0b..309d9b5c5 100644
--- a/repoze/bfg/zcml.py
+++ b/repoze/bfg/zcml.py
@@ -11,15 +11,17 @@ from zope.interface import implements
from zope.interface import classProvides
from zope.schema import TextLine
-from zope.security.zcml import Permission
from repoze.bfg.interfaces import IRequest
from repoze.bfg.interfaces import IViewFactory
+from repoze.bfg.interfaces import IViewPermission
from repoze.bfg.interfaces import IView
from repoze.bfg.template import Z3CPTTemplateFactory
from repoze.bfg.template import render_template
+from repoze.bfg.security import ViewPermissionFactory
+
class TemplateOnlyView(object):
implements(IView)
classProvides(IViewFactory)
@@ -58,7 +60,7 @@ class TemplateOnlyViewFactory(object):
return factory
def view(_context,
- permission,
+ permission=None,
for_=None,
factory=None,
name="",
@@ -95,6 +97,16 @@ def view(_context,
args = ('', for_)
)
+ if permission:
+ pfactory = ViewPermissionFactory(permission)
+ _context.action(
+ discriminator = ('permission', for_,name, IRequest,IViewPermission),
+ callable = handler,
+ args = ('registerAdapter',
+ pfactory, (for_, IRequest), IViewPermission, name,
+ _context.info),
+ )
+
_context.action(
discriminator = ('view', for_, name, IRequest, IViewFactory),
callable = handler,
@@ -104,23 +116,15 @@ def view(_context,
)
class IViewDirective(Interface):
- """
- The page directive is used to create views that provide a single
- url or page.
-
- The page directive creates a new view class from a given template
- and/or class and registers it.
- """
-
for_ = GlobalObject(
title=u"The interface or class this view is for.",
required=False
)
- permission = Permission(
+ permission = TextLine(
title=u"Permission",
description=u"The permission needed to use the view.",
- required=True
+ required=False
)
factory = GlobalObject(