summaryrefslogtreecommitdiff
path: root/repoze
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2009-11-23 03:41:51 +0000
committerChris McDonough <chrism@agendaless.com>2009-11-23 03:41:51 +0000
commit41723e16c5274afbdda44c7b19fe663a8e923eaf (patch)
tree63895cf303bf5acc48af6fc8d2ba2c42b03942af /repoze
parent8f8fc8bfe3e5fd11a20f32d47791c248f6721e29 (diff)
downloadpyramid-41723e16c5274afbdda44c7b19fe663a8e923eaf.tar.gz
pyramid-41723e16c5274afbdda44c7b19fe663a8e923eaf.tar.bz2
pyramid-41723e16c5274afbdda44c7b19fe663a8e923eaf.zip
``repoze.bfg.security.has_permission``
``repoze.bfg.security.authenticated_userid`` ``repoze.bfg.security.effective_principals`` ``repoze.bfg.security.view_execution_permitted`` ``repoze.bfg.security.remember`` ``repoze.bfg.security.forget`` Each of these functions now expects to be called with a request object that has a ``registry`` attribute which represents the current ZCA registry. Previously these functions used the ZCA threadlocal API to get the current registry.
Diffstat (limited to 'repoze')
-rw-r--r--repoze/bfg/security.py46
-rw-r--r--repoze/bfg/tests/test_security.py112
2 files changed, 115 insertions, 43 deletions
diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py
index e0873a0ad..a7a5d5f35 100644
--- a/repoze/bfg/security.py
+++ b/repoze/bfg/security.py
@@ -1,6 +1,4 @@
-from zope.component import getSiteManager
from zope.component import providedBy
-from zope.component import queryUtility
from zope.deprecation import deprecated
@@ -11,6 +9,8 @@ from repoze.bfg.interfaces import ISecuredView
# b/c import
from repoze.bfg.exceptions import Forbidden as Unauthorized
+from repoze.bfg.threadlocal import get_current_registry
+
deprecated('Unauthorized',
"('from repoze.bfg.security import Unauthorized' was "
"deprecated as of repoze.bfg 1.1; instead use 'from "
@@ -43,11 +43,15 @@ def has_permission(permission, context, request):
function delegates to the current authentication and authorization
policies. Return ``Allowed`` unconditionally if no authentication
policy has been configured in this application."""
- authn_policy = queryUtility(IAuthenticationPolicy)
+ try:
+ reg = request.registry
+ except AttributeError:
+ reg = get_current_registry() # b/c
+ authn_policy = reg.queryUtility(IAuthenticationPolicy)
if authn_policy is None:
return Allowed('No authentication policy in use.')
- authz_policy = queryUtility(IAuthorizationPolicy)
+ authz_policy = reg.queryUtility(IAuthorizationPolicy)
if authz_policy is None:
raise ValueError('Authentication policy registered without '
'authorization policy') # should never happen
@@ -58,8 +62,12 @@ def authenticated_userid(request):
""" Return the userid of the currently authenticated user or
``None`` if there is no authentication policy in effect or there
is no currently authenticated user. """
+ try:
+ reg = request.registry
+ except AttributeError:
+ reg = get_current_registry() # b/c
- policy = queryUtility(IAuthenticationPolicy)
+ policy = reg.queryUtility(IAuthenticationPolicy)
if policy is None:
return None
return policy.authenticated_userid(request)
@@ -70,8 +78,12 @@ def effective_principals(request):
authenticated user if a user is currently authenticated. If no
authentication policy is in effect, this will return an empty
sequence."""
+ try:
+ reg = request.registry
+ except AttributeError:
+ reg = get_current_registry() # b/c
- policy = queryUtility(IAuthenticationPolicy)
+ policy = reg.queryUtility(IAuthenticationPolicy)
if policy is None:
return []
return policy.effective_principals(request)
@@ -90,7 +102,8 @@ def principals_allowed_by_permission(context, permission):
``NotImplementedError`` exception to be raised when this
function is invoked.
"""
- policy = queryUtility(IAuthorizationPolicy)
+ reg = get_current_registry()
+ policy = reg.queryUtility(IAuthorizationPolicy)
if policy is None:
return [Everyone]
return policy.principals_allowed_by_permission(context, permission)
@@ -102,9 +115,12 @@ def view_execution_permitted(context, request, name=''):
``request``. Return a boolean result. If no authentication
policy is in effect, or if the view is not protected by a
permission, return True."""
- sm = getSiteManager()
+ try:
+ reg = request.registry
+ except AttributeError:
+ reg = get_current_registry() # b/c
provides = map(providedBy, (context, request))
- view = sm.adapters.lookup(provides, ISecuredView, name=name)
+ view = reg.adapters.lookup(provides, ISecuredView, name=name)
if view is None:
return Allowed(
'Allowed: view name %r in context %r (no permission defined)' %
@@ -129,7 +145,11 @@ def remember(request, principal, **kw):
return an empty sequence. If used, the composition and meaning of
``**kw`` must be agreed upon by the calling code and the effective
authentication policy."""
- policy = queryUtility(IAuthenticationPolicy)
+ try:
+ reg = request.registry
+ except AttributeError:
+ reg = get_current_registry() # b/c
+ policy = reg.queryUtility(IAuthenticationPolicy)
if policy is None:
return []
else:
@@ -150,7 +170,11 @@ def forget(request):
If no authentication policy is in use, this function will always
return an empty sequence."""
- policy = queryUtility(IAuthenticationPolicy)
+ try:
+ reg = request.registry
+ except AttributeError:
+ reg = get_current_registry() # b/c
+ policy = reg.queryUtility(IAuthenticationPolicy)
if policy is None:
return []
else:
diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py
index d3d89923d..6df21d33c 100644
--- a/repoze/bfg/tests/test_security.py
+++ b/repoze/bfg/tests/test_security.py
@@ -158,23 +158,6 @@ class TestViewExecutionPermitted(unittest.TestCase):
result = self._callFUT(context, request, '')
self.failUnless(result is True)
-def _registerAuthenticationPolicy(result):
- from repoze.bfg.interfaces import IAuthenticationPolicy
- policy = DummyAuthenticationPolicy(result)
- import zope.component
- sm = zope.component.getSiteManager()
- sm.registerUtility(policy, IAuthenticationPolicy)
- return policy
-
-def _registerAuthorizationPolicy(result):
- from repoze.bfg.interfaces import IAuthorizationPolicy
- policy = DummyAuthorizationPolicy(result)
- import zope.component
- sm = zope.component.getSiteManager()
- sm.registerUtility(policy, IAuthorizationPolicy)
- return policy
-
-
class TestHasPermission(unittest.TestCase):
def setUp(self):
cleanUp()
@@ -187,18 +170,29 @@ class TestHasPermission(unittest.TestCase):
return has_permission(*arg)
def test_no_authentication_policy(self):
- result = self._callFUT('view', None, None)
+ request = _makeRequest()
+ result = self._callFUT('view', None, request)
self.assertEqual(result, True)
self.assertEqual(result.msg, 'No authentication policy in use.')
def test_authentication_policy_no_authorization_policy(self):
- _registerAuthenticationPolicy(None)
- self.assertRaises(ValueError, self._callFUT, 'view', None, None)
+ request = _makeRequest()
+ _registerAuthenticationPolicy(request.registry, None)
+ self.assertRaises(ValueError, self._callFUT, 'view', None, request)
def test_authn_and_authz_policies_registered(self):
- _registerAuthenticationPolicy(None)
- pol = _registerAuthorizationPolicy('yo')
- self.assertEqual(self._callFUT('view', None, None), 'yo')
+ request = _makeRequest()
+ _registerAuthenticationPolicy(request.registry, None)
+ _registerAuthorizationPolicy(request.registry, 'yo')
+ self.assertEqual(self._callFUT('view', None, request), 'yo')
+
+ def test_no_registry_on_request(self):
+ from repoze.bfg.threadlocal import get_current_registry
+ request = DummyRequest({})
+ registry = get_current_registry()
+ _registerAuthenticationPolicy(registry, None)
+ _registerAuthorizationPolicy(registry, 'yo')
+ self.assertEqual(self._callFUT('view', None, request), 'yo')
class TestAuthenticatedUserId(unittest.TestCase):
def setUp(self):
@@ -212,13 +206,21 @@ class TestAuthenticatedUserId(unittest.TestCase):
return authenticated_userid(request)
def test_no_authentication_policy(self):
- request = DummyRequest({})
+ request = _makeRequest()
result = self._callFUT(request)
self.assertEqual(result, None)
def test_with_authentication_policy(self):
- _registerAuthenticationPolicy('yo')
+ request = _makeRequest()
+ _registerAuthenticationPolicy(request.registry, 'yo')
+ result = self._callFUT(request)
+ self.assertEqual(result, 'yo')
+
+ def test_with_authentication_policy_no_reg_on_request(self):
+ from repoze.bfg.threadlocal import get_current_registry
request = DummyRequest({})
+ registry = get_current_registry()
+ _registerAuthenticationPolicy(registry, 'yo')
result = self._callFUT(request)
self.assertEqual(result, 'yo')
@@ -234,13 +236,21 @@ class TestEffectivePrincipals(unittest.TestCase):
return effective_principals(request)
def test_no_authentication_policy(self):
- request = DummyRequest({})
+ request = _makeRequest()
result = self._callFUT(request)
self.assertEqual(result, [])
def test_with_authentication_policy(self):
- _registerAuthenticationPolicy('yo')
+ request = _makeRequest()
+ _registerAuthenticationPolicy(request.registry, 'yo')
+ result = self._callFUT(request)
+ self.assertEqual(result, 'yo')
+
+ def test_with_authentication_policy_no_reg_on_request(self):
+ from repoze.bfg.threadlocal import get_current_registry
+ registry = get_current_registry()
request = DummyRequest({})
+ _registerAuthenticationPolicy(registry, 'yo')
result = self._callFUT(request)
self.assertEqual(result, 'yo')
@@ -262,7 +272,9 @@ class TestPrincipalsAllowedByPermission(unittest.TestCase):
self.assertEqual(result, [Everyone])
def test_with_authorization_policy(self):
- _registerAuthorizationPolicy('yo')
+ from repoze.bfg.threadlocal import get_current_registry
+ registry = get_current_registry()
+ _registerAuthorizationPolicy(registry, 'yo')
context = DummyContext()
result = self._callFUT(context, 'view')
self.assertEqual(result, 'yo')
@@ -280,13 +292,22 @@ class TestRemember(unittest.TestCase):
def test_no_authentication_policy(self):
context = DummyContext()
- request = DummyRequest({})
+ request = _makeRequest()
result = self._callFUT(request, 'me')
self.assertEqual(result, [])
def test_with_authentication_policy(self):
- _registerAuthenticationPolicy('yo')
+ request = _makeRequest()
+ registry = request.registry
+ _registerAuthenticationPolicy(registry, 'yo')
+ result = self._callFUT(request, 'me')
+ self.assertEqual(result, 'yo')
+
+ def test_with_authentication_policy_no_reg_on_request(self):
+ from repoze.bfg.threadlocal import get_current_registry
+ registry = get_current_registry()
request = DummyRequest({})
+ _registerAuthenticationPolicy(registry, 'yo')
result = self._callFUT(request, 'me')
self.assertEqual(result, 'yo')
@@ -302,13 +323,21 @@ class TestForget(unittest.TestCase):
return forget(*arg)
def test_no_authentication_policy(self):
- request = DummyRequest({})
+ request = _makeRequest()
result = self._callFUT(request)
self.assertEqual(result, [])
def test_with_authentication_policy(self):
- _registerAuthenticationPolicy('yo')
+ request = _makeRequest()
+ _registerAuthenticationPolicy(request.registry, 'yo')
+ result = self._callFUT(request)
+ self.assertEqual(result, 'yo')
+
+ def test_with_authentication_policy_no_reg_on_request(self):
+ from repoze.bfg.threadlocal import get_current_registry
+ registry = get_current_registry()
request = DummyRequest({})
+ _registerAuthenticationPolicy(registry, 'yo')
result = self._callFUT(request)
self.assertEqual(result, 'yo')
@@ -346,3 +375,22 @@ class DummyAuthorizationPolicy:
def principals_allowed_by_permission(self, context, permission):
return self.result
+def _registerAuthenticationPolicy(reg, result):
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ policy = DummyAuthenticationPolicy(result)
+ reg.registerUtility(policy, IAuthenticationPolicy)
+ return policy
+
+def _registerAuthorizationPolicy(reg, result):
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ policy = DummyAuthorizationPolicy(result)
+ reg.registerUtility(policy, IAuthorizationPolicy)
+ return policy
+
+def _makeRequest():
+ from repoze.bfg.registry import Registry
+ request = DummyRequest({})
+ request.registry = Registry()
+ return request
+
+