diff options
| author | Chris McDonough <chrism@agendaless.com> | 2008-08-17 00:15:17 +0000 |
|---|---|---|
| committer | Chris McDonough <chrism@agendaless.com> | 2008-08-17 00:15:17 +0000 |
| commit | 157721dda97f5aea95f40e307d9d5dceb1014f83 (patch) | |
| tree | 07272c8ff86adcb7e9700aed9daa46af6accd291 | |
| parent | 1a1ca311479764b3c96a3fd61571bf0cdc8cb043 (diff) | |
| download | pyramid-157721dda97f5aea95f40e307d9d5dceb1014f83.tar.gz pyramid-157721dda97f5aea95f40e307d9d5dceb1014f83.tar.bz2 pyramid-157721dda97f5aea95f40e307d9d5dceb1014f83.zip | |
Add RepozeWhoIdentityACLSecurityPolicy; add debug logging.
| -rw-r--r-- | repoze/bfg/security.py | 110 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_security.py | 131 |
2 files changed, 185 insertions, 56 deletions
diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py index 5ab6ae31d..d15d89e96 100644 --- a/repoze/bfg/security.py +++ b/repoze/bfg/security.py @@ -1,3 +1,7 @@ +import logging +import os +import sys + from zope.interface import implements from zope.component import queryUtility @@ -80,29 +84,13 @@ class ACLAuthorizer(object): self.logger and self.logger.debug(str(result)) return result -class RemoteUserACLSecurityPolicy(object): - """ A security policy which: - - - examines the request.environ for the REMOTE_USER variable and - uses any non-false value as a principal id for this request. - - - uses an ACL-based authorization model which attempts to find an - ACL on the context, and which returns ``Allowed`` from its - 'permits' method if the ACL found grants access to the current - principal. It returns ``Denied`` if permission was not granted - (either explicitly via a deny or implicitly by not finding a - matching ACE action). An ACL is an ordered sequence of ACE - tuples, e.g. ``[(Allow, Everyone, 'read'), (Deny, 'george', - 'write')]``. ACLs stored on model instance objects as their - __acl__ attribute will be used by the security machinery to - grant or deny access. - - """ +class ACLSecurityPolicy(object): implements(ISecurityPolicy) authorizer_factory = ACLAuthorizer - def __init__(self, logger=None): + def __init__(self, logger, get_principals): self.logger = logger + self.get_principals = get_principals def permits(self, context, request, permission): """ Return ``Allowed`` if the policy permits access, @@ -118,17 +106,93 @@ class RemoteUserACLSecurityPolicy(object): return False def authenticated_userid(self, request): - return request.environ.get('REMOTE_USER', None) + principals = self.get_principals(request) + if principals: + return principals[0] def effective_principals(self, request): - userid = self.authenticated_userid(request) effective_principals = [Everyone] + principal_ids = self.get_principals(request) - if userid is not None: + if principal_ids: effective_principals.append(Authenticated) - effective_principals.append(userid) + effective_principals.extend(principal_ids) + return effective_principals +DEBUG_LOG_KEY = 'BFG_SECURITY_DEBUG' + +def debug_logger(logger): + if logger is None: + do_debug_log = os.environ.get(DEBUG_LOG_KEY, '') + if str(do_debug_log).lower() in ('1', 'y', 'true', 't', 'on'): + handler = logging.StreamHandler(sys.stdout) + fmt = '%(asctime)s %(message)s' + formatter = logging.Formatter(fmt) + handler.setFormatter(formatter) + logger = logging.Logger('repoze.bfg.security') + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + return logger + return logger + +def RemoteUserACLSecurityPolicy(logger=None): + """ A security policy which: + + - examines the request.environ for the REMOTE_USER variable and + uses any non-false value as a principal id for this request. + + - uses an ACL-based authorization model which attempts to find an + ACL on the context, and which returns ``Allowed`` from its + 'permits' method if the ACL found grants access to the current + principal. It returns ``Denied`` if permission was not granted + (either explicitly via a deny or implicitly by not finding a + matching ACE action). An ACL is an ordered sequence of ACE + tuples, e.g. ``[(Allow, Everyone, 'read'), (Deny, 'george', + 'write')]``. ACLs stored on model instance objects as their + __acl__ attribute will be used by the security machinery to + grant or deny access. + + """ + logger = debug_logger(logger) + def get_principals(request): + user_id = request.environ.get('REMOTE_USER') + if user_id: + return [user_id] + return [] + return ACLSecurityPolicy(logger, get_principals) + +def RepozeWhoIdentityACLSecurityPolicy(logger=None): + """ A security policy which: + + - examines the request.environ for the ``repoze.who.identity`` + dictionary. If one is found, the principal ids for the request + are composed of ``repoze.who.identity['repoze.who.userid']`` + plus ``repoze.who.identity.get('groups', []). + + - uses an ACL-based authorization model which attempts to find an + ACL on the context, and which returns ``Allowed`` from its + 'permits' method if the ACL found grants access to the current + principal. It returns ``Denied`` if permission was not granted + (either explicitly via a deny or implicitly by not finding a + matching ACE action). An ACL is an ordered sequence of ACE + tuples, e.g. ``[(Allow, Everyone, 'read'), (Deny, 'george', + 'write')]``. ACLs stored on model instance objects as their + __acl__ attribute will be used by the security machinery to + grant or deny access. + + """ + logger = debug_logger(logger) + def get_principals(request): + identity = request.environ.get('repoze.who.identity') + if not identity: + return [] + principals = [identity['repoze.who.userid']] + principals.extend(identity.get('groups', [])) + return principals + + return ACLSecurityPolicy(logger, get_principals) + class PermitsResult: def __init__(self, ace, acl, permission, principals, context): self.acl = acl diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py index 6d85e2160..9797ebce5 100644 --- a/repoze/bfg/tests/test_security.py +++ b/repoze/bfg/tests/test_security.py @@ -210,10 +210,10 @@ class TestACLAuthorizer(unittest.TestCase): result = authorizer.permits('read', *principals) self.assertEqual(len(logger.messages), 1) -class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): +class TestACLSecurityPolicy(unittest.TestCase, PlacelessSetup): def _getTargetClass(self): - from repoze.bfg.security import RemoteUserACLSecurityPolicy - return RemoteUserACLSecurityPolicy + from repoze.bfg.security import ACLSecurityPolicy + return ACLSecurityPolicy def _makeOne(self, *arg, **kw): klass = self._getTargetClass() @@ -225,40 +225,22 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): def tearDown(self): PlacelessSetup.tearDown(self) - def test_instance_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ISecurityPolicy - logger = DummyLogger() - verifyObject(ISecurityPolicy, self._makeOne(logger)) - def test_class_implements_ISecurityPolicy(self): from zope.interface.verify import verifyClass from repoze.bfg.interfaces import ISecurityPolicy verifyClass(ISecurityPolicy, self._getTargetClass()) - def test_authenticated_userid(self): - context = DummyContext() - request = DummyRequest({'REMOTE_USER':'fred'}) - logger = DummyLogger() - policy = self._makeOne(logger) - result = policy.authenticated_userid(request) - self.assertEqual(result, 'fred') - - def test_effective_principals(self): - context = DummyContext() - request = DummyRequest({'REMOTE_USER':'fred'}) + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy logger = DummyLogger() - policy = self._makeOne(logger) - result = policy.effective_principals(request) - from repoze.bfg.security import Everyone - from repoze.bfg.security import Authenticated - self.assertEqual(result, [Everyone, Authenticated, 'fred']) + verifyObject(ISecurityPolicy, self._makeOne(logger, lambda *arg: None)) - def test_permits_no_remote_user_no_acl_info_on_context(self): + def test_permits_no_principals_no_acl_info_on_context(self): context = DummyContext() request = DummyRequest({}) logger = DummyLogger() - policy = self._makeOne(logger) + policy = self._makeOne(logger, lambda *arg: None) authorizer_factory = make_authorizer_factory(None) policy.authorizer_factory = authorizer_factory result = policy.permits(context, request, 'view') @@ -268,12 +250,12 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): self.assertEqual(authorizer_factory.permission, 'view') self.assertEqual(authorizer_factory.context, context) - def test_permits_no_remote_user_acl_info_on_context(self): + def test_permits_no_principals_acl_info_on_context(self): context = DummyContext() context.__acl__ = [] request = DummyRequest({}) logger = DummyLogger() - policy = self._makeOne(logger) + policy = self._makeOne(logger, lambda *arg: None) authorizer_factory = make_authorizer_factory(None) policy.authorizer_factory = authorizer_factory result = policy.permits(context, request, 'view') @@ -283,7 +265,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): self.assertEqual(authorizer_factory.permission, 'view') self.assertEqual(authorizer_factory.context, context) - def test_permits_no_remote_user_withparents_root_has_acl_info(self): + def test_permits_no_principals_withparents_root_has_acl_info(self): context = DummyContext() context.__name__ = None context.__parent__ = None @@ -293,7 +275,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): context.__acl__ = [] request = DummyRequest({}) logger = DummyLogger() - policy = self._makeOne(logger) + policy = self._makeOne(logger, lambda *arg: None) authorizer_factory = make_authorizer_factory(None) policy.authorizer_factory = authorizer_factory result = policy.permits(context, request, 'view') @@ -303,7 +285,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): self.assertEqual(authorizer_factory.permission, 'view') self.assertEqual(authorizer_factory.context, context) - def test_permits_no_remote_user_withparents_root_allows_everyone(self): + def test_permits_no_principals_withparents_root_allows_everyone(self): context = DummyContext() context.__name__ = None context.__parent__ = None @@ -312,7 +294,7 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): context2.__parent__ = context request = DummyRequest({}) logger = DummyLogger() - policy = self._makeOne(logger) + policy = self._makeOne(logger, lambda *arg: None) authorizer_factory = make_authorizer_factory(context) policy.authorizer_factory = authorizer_factory result = policy.permits(context, request, 'view') @@ -321,6 +303,89 @@ class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): self.assertEqual(authorizer_factory.principals, (Everyone,)) self.assertEqual(authorizer_factory.permission, 'view') self.assertEqual(authorizer_factory.context, context) + + +class TestRemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup): + def _getTargetClass(self): + from repoze.bfg.security import RemoteUserACLSecurityPolicy + return RemoteUserACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def setUp(self): + PlacelessSetup.setUp(self) + + def tearDown(self): + PlacelessSetup.tearDown(self) + + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy + logger = DummyLogger() + verifyObject(ISecurityPolicy, self._makeOne(logger)) + + def test_authenticated_userid(self): + context = DummyContext() + request = DummyRequest({'REMOTE_USER':'fred'}) + logger = DummyLogger() + policy = self._makeOne(logger) + result = policy.authenticated_userid(request) + self.assertEqual(result, 'fred') + + def test_effective_principals(self): + context = DummyContext() + request = DummyRequest({'REMOTE_USER':'fred'}) + logger = DummyLogger() + policy = self._makeOne(logger) + result = policy.effective_principals(request) + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + self.assertEqual(result, [Everyone, Authenticated, 'fred']) + + +class TestRepozeWhoIdentityACLSecurityPolicy(unittest.TestCase, PlacelessSetup): + def _getTargetClass(self): + from repoze.bfg.security import RepozeWhoIdentityACLSecurityPolicy + return RepozeWhoIdentityACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def setUp(self): + PlacelessSetup.setUp(self) + + def tearDown(self): + PlacelessSetup.tearDown(self) + + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy + logger = DummyLogger() + verifyObject(ISecurityPolicy, self._makeOne(logger)) + + def test_authenticated_userid(self): + context = DummyContext() + identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}} + request = DummyRequest(identity) + logger = DummyLogger() + policy = self._makeOne(logger) + result = policy.authenticated_userid(request) + self.assertEqual(result, 'fred') + + def test_effective_principals(self): + context = DummyContext() + identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}} + request = DummyRequest(identity) + logger = DummyLogger() + policy = self._makeOne(logger) + result = policy.effective_principals(request) + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + self.assertEqual(result, [Everyone, Authenticated, 'fred']) + class TestAPIFunctions(unittest.TestCase, PlacelessSetup): def setUp(self): |
