summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2008-08-17 00:15:17 +0000
committerChris McDonough <chrism@agendaless.com>2008-08-17 00:15:17 +0000
commit157721dda97f5aea95f40e307d9d5dceb1014f83 (patch)
tree07272c8ff86adcb7e9700aed9daa46af6accd291
parent1a1ca311479764b3c96a3fd61571bf0cdc8cb043 (diff)
downloadpyramid-157721dda97f5aea95f40e307d9d5dceb1014f83.tar.gz
pyramid-157721dda97f5aea95f40e307d9d5dceb1014f83.tar.bz2
pyramid-157721dda97f5aea95f40e307d9d5dceb1014f83.zip
Add RepozeWhoIdentityACLSecurityPolicy; add debug logging.
-rw-r--r--repoze/bfg/security.py110
-rw-r--r--repoze/bfg/tests/test_security.py131
2 files changed, 185 insertions, 56 deletions
diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py
index 5ab6ae31d..d15d89e96 100644
--- a/repoze/bfg/security.py
+++ b/repoze/bfg/security.py
@@ -1,3 +1,7 @@
+import logging
+import os
+import sys
+
from zope.interface import implements
from zope.component import queryUtility
@@ -80,29 +84,13 @@ class ACLAuthorizer(object):
self.logger and self.logger.debug(str(result))
return result
-class RemoteUserACLSecurityPolicy(object):
- """ A security policy which:
-
- - examines the request.environ for the REMOTE_USER variable and
- uses any non-false value as a principal id for this request.
-
- - uses an ACL-based authorization model which attempts to find an
- ACL on the context, and which returns ``Allowed`` from its
- 'permits' method if the ACL found grants access to the current
- principal. It returns ``Denied`` if permission was not granted
- (either explicitly via a deny or implicitly by not finding a
- matching ACE action). An ACL is an ordered sequence of ACE
- tuples, e.g. ``[(Allow, Everyone, 'read'), (Deny, 'george',
- 'write')]``. ACLs stored on model instance objects as their
- __acl__ attribute will be used by the security machinery to
- grant or deny access.
-
- """
+class ACLSecurityPolicy(object):
implements(ISecurityPolicy)
authorizer_factory = ACLAuthorizer
- def __init__(self, logger=None):
+ def __init__(self, logger, get_principals):
self.logger = logger
+ self.get_principals = get_principals
def permits(self, context, request, permission):
""" Return ``Allowed`` if the policy permits access,
@@ -118,17 +106,93 @@ class RemoteUserACLSecurityPolicy(object):
return False
def authenticated_userid(self, request):
- return request.environ.get('REMOTE_USER', None)
+ principals = self.get_principals(request)
+ if principals:
+ return principals[0]
def effective_principals(self, request):
- userid = self.authenticated_userid(request)
effective_principals = [Everyone]
+ principal_ids = self.get_principals(request)
- if userid is not None:
+ if principal_ids:
effective_principals.append(Authenticated)
- effective_principals.append(userid)
+ effective_principals.extend(principal_ids)
+
return effective_principals
+DEBUG_LOG_KEY = 'BFG_SECURITY_DEBUG'
+
+def debug_logger(logger):
+ if logger is None:
+ do_debug_log = os.environ.get(DEBUG_LOG_KEY, '')
+ if str(do_debug_log).lower() in ('1', 'y', 'true', 't', 'on'):
+ handler = logging.StreamHandler(sys.stdout)
+ fmt = '%(asctime)s %(message)s'
+ formatter = logging.Formatter(fmt)
+ handler.setFormatter(formatter)
+ logger = logging.Logger('repoze.bfg.security')
+ logger.addHandler(handler)
+ logger.setLevel(logging.DEBUG)
+ return logger
+ return logger
+
+def RemoteUserACLSecurityPolicy(logger=None):
+ """ A security policy which:
+
+ - examines the request.environ for the REMOTE_USER variable and
+ uses any non-false value as a principal id for this request.
+
+ - uses an ACL-based authorization model which attempts to find an
+ ACL on the context, and which returns ``Allowed`` from its
+ 'permits' method if the ACL found grants access to the current
+ principal. It returns ``Denied`` if permission was not granted
+ (either explicitly via a deny or implicitly by not finding a
+ matching ACE action). An ACL is an ordered sequence of ACE
+ tuples, e.g. ``[(Allow, Everyone, 'read'), (Deny, 'george',
+ 'write')]``. ACLs stored on model instance objects as their
+ __acl__ attribute will be used by the security machinery to
+ grant or deny access.
+
+ """
+ logger = debug_logger(logger)
+ def get_principals(request):
+ user_id = request.environ.get('REMOTE_USER')
+ if user_id:
+ return [user_id]
+ return []
+ return ACLSecurityPolicy(logger, get_principals)
+
+def RepozeWhoIdentityACLSecurityPolicy(logger=None):
+ """ A security policy which:
+
+ - examines the request.environ for the ``repoze.who.identity``
+ dictionary. If one is found, the principal ids for the request
+ are composed of ``repoze.who.identity['repoze.who.userid']``
+ plus ``repoze.who.identity.get('groups', []).
+
+ - uses an ACL-based authorization model which attempts to find an
+ ACL on the context, and which returns ``Allowed`` from its
+ 'permits' method if the ACL found grants access to the current
+ principal. It returns ``Denied`` if permission was not granted
+ (either explicitly via a deny or implicitly by not finding a
+ matching ACE action). An ACL is an ordered sequence of ACE
+ tuples, e.g. ``[(Allow, Everyone, 'read'), (Deny, 'george',
+ 'write')]``. ACLs stored on model instance objects as their
+ __acl__ attribute will be used by the security machinery to
+ grant or deny access.
+
+ """
+ logger = debug_logger(logger)
+ def get_principals(request):
+ identity = request.environ.get('repoze.who.identity')
+ if not identity:
+ return []
+ principals = [identity['repoze.who.userid']]
+ principals.extend(identity.get('groups', []))
+ return principals
+
+ return ACLSecurityPolicy(logger, get_principals)
+
class PermitsResult:
def __init__(self, ace, acl, permission, principals, context):
self.acl = acl
diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py
index 6d85e2160..9797ebce5 100644
--- a/repoze/bfg/tests/test_security.py
+++ b/repoze/bfg/tests/test_security.py
@@ -210,10 +210,10 @@ class TestACLAuthorizer(unittest.TestCase):
result = authorizer.permits('read', *principals)
self.assertEqual(len(logger.messages), 1)
-class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+class TestACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
def _getTargetClass(self):
- from repoze.bfg.security import RemoteUserACLSecurityPolicy
- return RemoteUserACLSecurityPolicy
+ from repoze.bfg.security import ACLSecurityPolicy
+ return ACLSecurityPolicy
def _makeOne(self, *arg, **kw):
klass = self._getTargetClass()
@@ -225,40 +225,22 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
def tearDown(self):
PlacelessSetup.tearDown(self)
- def test_instance_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyObject
- from repoze.bfg.interfaces import ISecurityPolicy
- logger = DummyLogger()
- verifyObject(ISecurityPolicy, self._makeOne(logger))
-
def test_class_implements_ISecurityPolicy(self):
from zope.interface.verify import verifyClass
from repoze.bfg.interfaces import ISecurityPolicy
verifyClass(ISecurityPolicy, self._getTargetClass())
- def test_authenticated_userid(self):
- context = DummyContext()
- request = DummyRequest({'REMOTE_USER':'fred'})
- logger = DummyLogger()
- policy = self._makeOne(logger)
- result = policy.authenticated_userid(request)
- self.assertEqual(result, 'fred')
-
- def test_effective_principals(self):
- context = DummyContext()
- request = DummyRequest({'REMOTE_USER':'fred'})
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
logger = DummyLogger()
- policy = self._makeOne(logger)
- result = policy.effective_principals(request)
- from repoze.bfg.security import Everyone
- from repoze.bfg.security import Authenticated
- self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+ verifyObject(ISecurityPolicy, self._makeOne(logger, lambda *arg: None))
- def test_permits_no_remote_user_no_acl_info_on_context(self):
+ def test_permits_no_principals_no_acl_info_on_context(self):
context = DummyContext()
request = DummyRequest({})
logger = DummyLogger()
- policy = self._makeOne(logger)
+ policy = self._makeOne(logger, lambda *arg: None)
authorizer_factory = make_authorizer_factory(None)
policy.authorizer_factory = authorizer_factory
result = policy.permits(context, request, 'view')
@@ -268,12 +250,12 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
self.assertEqual(authorizer_factory.permission, 'view')
self.assertEqual(authorizer_factory.context, context)
- def test_permits_no_remote_user_acl_info_on_context(self):
+ def test_permits_no_principals_acl_info_on_context(self):
context = DummyContext()
context.__acl__ = []
request = DummyRequest({})
logger = DummyLogger()
- policy = self._makeOne(logger)
+ policy = self._makeOne(logger, lambda *arg: None)
authorizer_factory = make_authorizer_factory(None)
policy.authorizer_factory = authorizer_factory
result = policy.permits(context, request, 'view')
@@ -283,7 +265,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
self.assertEqual(authorizer_factory.permission, 'view')
self.assertEqual(authorizer_factory.context, context)
- def test_permits_no_remote_user_withparents_root_has_acl_info(self):
+ def test_permits_no_principals_withparents_root_has_acl_info(self):
context = DummyContext()
context.__name__ = None
context.__parent__ = None
@@ -293,7 +275,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
context.__acl__ = []
request = DummyRequest({})
logger = DummyLogger()
- policy = self._makeOne(logger)
+ policy = self._makeOne(logger, lambda *arg: None)
authorizer_factory = make_authorizer_factory(None)
policy.authorizer_factory = authorizer_factory
result = policy.permits(context, request, 'view')
@@ -303,7 +285,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
self.assertEqual(authorizer_factory.permission, 'view')
self.assertEqual(authorizer_factory.context, context)
- def test_permits_no_remote_user_withparents_root_allows_everyone(self):
+ def test_permits_no_principals_withparents_root_allows_everyone(self):
context = DummyContext()
context.__name__ = None
context.__parent__ = None
@@ -312,7 +294,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
context2.__parent__ = context
request = DummyRequest({})
logger = DummyLogger()
- policy = self._makeOne(logger)
+ policy = self._makeOne(logger, lambda *arg: None)
authorizer_factory = make_authorizer_factory(context)
policy.authorizer_factory = authorizer_factory
result = policy.permits(context, request, 'view')
@@ -321,6 +303,89 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
self.assertEqual(authorizer_factory.principals, (Everyone,))
self.assertEqual(authorizer_factory.permission, 'view')
self.assertEqual(authorizer_factory.context, context)
+
+
+class TestRemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+ def _getTargetClass(self):
+ from repoze.bfg.security import RemoteUserACLSecurityPolicy
+ return RemoteUserACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+
+ def tearDown(self):
+ PlacelessSetup.tearDown(self)
+
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
+ logger = DummyLogger()
+ verifyObject(ISecurityPolicy, self._makeOne(logger))
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, 'fred')
+
+ def test_effective_principals(self):
+ context = DummyContext()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ result = policy.effective_principals(request)
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+
+
+class TestRepozeWhoIdentityACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+ def _getTargetClass(self):
+ from repoze.bfg.security import RepozeWhoIdentityACLSecurityPolicy
+ return RepozeWhoIdentityACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+
+ def tearDown(self):
+ PlacelessSetup.tearDown(self)
+
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
+ logger = DummyLogger()
+ verifyObject(ISecurityPolicy, self._makeOne(logger))
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
+ request = DummyRequest(identity)
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, 'fred')
+
+ def test_effective_principals(self):
+ context = DummyContext()
+ identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
+ request = DummyRequest(identity)
+ logger = DummyLogger()
+ policy = self._makeOne(logger)
+ result = policy.effective_principals(request)
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+
class TestAPIFunctions(unittest.TestCase, PlacelessSetup):
def setUp(self):