From a6234e4e19efab838b202d0935de0de92c2ee00f Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sun, 17 Feb 2019 12:44:02 -0800 Subject: Implement setting ISecurityPolicy in the configurator. --- tests/test_config/test_init.py | 9 +++++++++ tests/test_config/test_security.py | 9 +++++++++ 2 files changed, 18 insertions(+) (limited to 'tests') diff --git a/tests/test_config/test_init.py b/tests/test_config/test_init.py index ce2b042ec..661654ef0 100644 --- a/tests/test_config/test_init.py +++ b/tests/test_config/test_init.py @@ -205,6 +205,15 @@ class ConfiguratorTests(unittest.TestCase): result = config.registry.getUtility(IDebugLogger) self.assertEqual(logger, result) + def test_ctor_security_policy(self): + from pyramid.interfaces import ISecurityPolicy + + policy = object() + config = self._makeOne(security_policy=policy) + config.commit() + result = config.registry.getUtility(ISecurityPolicy) + self.assertEqual(policy, result) + def test_ctor_authentication_policy(self): from pyramid.interfaces import IAuthenticationPolicy diff --git a/tests/test_config/test_security.py b/tests/test_config/test_security.py index 5ebd78f8d..3062ea154 100644 --- a/tests/test_config/test_security.py +++ b/tests/test_config/test_security.py @@ -11,6 +11,15 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase): config = Configurator(*arg, **kw) return config + def test_set_security_policy(self): + from pyramid.interfaces import ISecurityPolicy + + config = self._makeOne() + policy = object() + config.set_security_policy(policy) + config.commit() + self.assertEqual(config.registry.getUtility(ISecurityPolicy), policy) + def test_set_authentication_policy_no_authz_policy(self): config = self._makeOne() policy = object() -- cgit v1.2.3 From 4c3c826ca9a6069f47fee439576966cf625df528 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sun, 17 Feb 2019 16:38:53 -0800 Subject: Implement legacy security policy. --- tests/test_security.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index 8b8028f61..ee4340ced 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -473,6 +473,58 @@ class TestHasPermission(unittest.TestCase): self.assertRaises(AttributeError, request.has_permission, 'view') +class TestLegacySecurityPolicy(unittest.TestCase): + def setUp(self): + testing.setUp() + + def tearDown(self): + testing.tearDown() + + def test_identity(self): + from pyramid.security import LegacySecurityPolicy + + request = _makeRequest() + policy = LegacySecurityPolicy() + _registerAuthenticationPolicy(request.registry, 'userid') + + self.assertEqual(policy.identify(request), 'userid') + + def test_remember(self): + from pyramid.security import LegacySecurityPolicy + + request = _makeRequest() + policy = LegacySecurityPolicy() + _registerAuthenticationPolicy(request.registry, None) + + self.assertEqual( + policy.remember(request, 'userid'), [('X-Pyramid-Test', 'userid')] + ) + + def test_forget(self): + from pyramid.security import LegacySecurityPolicy + + request = _makeRequest() + policy = LegacySecurityPolicy() + _registerAuthenticationPolicy(request.registry, None) + + self.assertEqual( + policy.forget(request), [('X-Pyramid-Test', 'logout')] + ) + + def test_permits(self): + from pyramid.security import LegacySecurityPolicy + + request = _makeRequest() + policy = LegacySecurityPolicy() + _registerAuthenticationPolicy(request.registry, ['p1', 'p2']) + _registerAuthorizationPolicy(request.registry, True) + + self.assertIs( + policy.permits(request, request.context, 'userid', 'permission'), + True, + ) + + _TEST_HEADER = 'X-Pyramid-Test' -- cgit v1.2.3 From 839bfb427093b94a5e75b22a11a5df20ad94cb1e Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sun, 17 Feb 2019 19:02:33 -0800 Subject: Set legacy policy when using authn/authz policies. --- tests/test_config/test_security.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'tests') diff --git a/tests/test_config/test_security.py b/tests/test_config/test_security.py index 3062ea154..f2b4ba8e5 100644 --- a/tests/test_config/test_security.py +++ b/tests/test_config/test_security.py @@ -20,6 +20,19 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase): config.commit() self.assertEqual(config.registry.getUtility(ISecurityPolicy), policy) + def test_set_authentication_policy_with_security_policy(self): + from pyramid.interfaces import IAuthorizationPolicy + from pyramid.interfaces import ISecurityPolicy + + config = self._makeOne() + security_policy = object() + authn_policy = object() + authz_policy = object() + config.registry.registerUtility(security_policy, ISecurityPolicy) + config.registry.registerUtility(authz_policy, IAuthorizationPolicy) + config.set_authentication_policy(authn_policy) + self.assertRaises(ConfigurationError, config.commit) + def test_set_authentication_policy_no_authz_policy(self): config = self._makeOne() policy = object() @@ -36,6 +49,8 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase): def test_set_authentication_policy_with_authz_policy(self): from pyramid.interfaces import IAuthenticationPolicy from pyramid.interfaces import IAuthorizationPolicy + from pyramid.interfaces import ISecurityPolicy + from pyramid.security import LegacySecurityPolicy config = self._makeOne() authn_policy = object() @@ -46,10 +61,15 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase): self.assertEqual( config.registry.getUtility(IAuthenticationPolicy), authn_policy ) + self.assertIsInstance( + config.registry.getUtility(ISecurityPolicy), LegacySecurityPolicy + ) def test_set_authentication_policy_with_authz_policy_autocommit(self): from pyramid.interfaces import IAuthenticationPolicy from pyramid.interfaces import IAuthorizationPolicy + from pyramid.interfaces import ISecurityPolicy + from pyramid.security import LegacySecurityPolicy config = self._makeOne(autocommit=True) authn_policy = object() @@ -60,6 +80,9 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase): self.assertEqual( config.registry.getUtility(IAuthenticationPolicy), authn_policy ) + self.assertIsInstance( + config.registry.getUtility(ISecurityPolicy), LegacySecurityPolicy + ) def test_set_authorization_policy_no_authn_policy(self): config = self._makeOne() -- cgit v1.2.3 From aae1c513c0a940c1c31c97b4d79fec3d09cbd01e Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Tue, 26 Feb 2019 17:49:36 -0800 Subject: Add `request.identity`. --- tests/test_security.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index ee4340ced..514175a92 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -338,6 +338,23 @@ class TestViewExecutionPermitted(unittest.TestCase): self.assertTrue(result) +class TestIdentity(unittest.TestCase): + def setUp(self): + testing.setUp() + + def tearDown(self): + testing.tearDown() + + def test_identity_no_security_policy(self): + request = _makeRequest() + self.assertEquals(request.identity, None) + + def test_identity(self): + request = _makeRequest() + _registerSecurityPolicy(request.registry, 'yo') + self.assertEqual(request.identity, 'yo') + + class TestAuthenticatedUserId(unittest.TestCase): def setUp(self): testing.setUp() @@ -533,6 +550,27 @@ class DummyContext: self.__dict__.update(kw) +class DummySecurityPolicy: + def __init__(self, result): + self.result = result + + def identify(self, request): + return self.result + + def permits(self, request, context, identity, permission): + return self.result + + def remember(self, request, userid, **kw): + headers = [(_TEST_HEADER, userid)] + self._header_remembered = headers[0] + return headers + + def forget(self, request): + headers = [(_TEST_HEADER, 'logout')] + self._header_forgotten = headers[0] + return headers + + class DummyAuthenticationPolicy: def __init__(self, result): self.result = result @@ -568,6 +606,14 @@ class DummyAuthorizationPolicy: return self.result +def _registerSecurityPolicy(reg, result): + from pyramid.interfaces import ISecurityPolicy + + policy = DummySecurityPolicy(result) + reg.registerUtility(policy, ISecurityPolicy) + return policy + + def _registerAuthenticationPolicy(reg, result): from pyramid.interfaces import IAuthenticationPolicy -- cgit v1.2.3 From 140fdbb54c467159313ede564dd3ad4077e30f20 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 2 Mar 2019 11:26:37 -0800 Subject: Implement bw-compat authenticated_userid and unauthenticated_userid --- tests/test_security.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index 514175a92..dd2c225d3 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -369,6 +369,12 @@ class TestAuthenticatedUserId(unittest.TestCase): def test_with_authentication_policy(self): request = _makeRequest() _registerAuthenticationPolicy(request.registry, 'yo') + _registerSecurityPolicy(request.registry, 'wat') + self.assertEqual(request.authenticated_userid, 'yo') + + def test_with_security_policy(self): + request = _makeRequest() + _registerSecurityPolicy(request.registry, 'yo') self.assertEqual(request.authenticated_userid, 'yo') def test_with_authentication_policy_no_reg_on_request(self): @@ -395,6 +401,12 @@ class TestUnAuthenticatedUserId(unittest.TestCase): def test_with_authentication_policy(self): request = _makeRequest() _registerAuthenticationPolicy(request.registry, 'yo') + _registerSecurityPolicy(request.registry, 'wat') + self.assertEqual(request.unauthenticated_userid, 'yo') + + def test_with_security_policy(self): + request = _makeRequest() + _registerSecurityPolicy(request.registry, 'yo') self.assertEqual(request.unauthenticated_userid, 'yo') def test_with_authentication_policy_no_reg_on_request(self): -- cgit v1.2.3 From 5abdd1d7636a8f7c5cda4c8fcf2669c3937c1186 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sun, 3 Mar 2019 08:49:58 -0800 Subject: Implement new request.has_permission. Deleted AuthorizationAPIMixin --- tests/test_config/test_testing.py | 4 ++-- tests/test_request.py | 4 ++-- tests/test_security.py | 30 ++++++------------------------ 3 files changed, 10 insertions(+), 28 deletions(-) (limited to 'tests') diff --git a/tests/test_config/test_testing.py b/tests/test_config/test_testing.py index 0fb73d268..822eeac8f 100644 --- a/tests/test_config/test_testing.py +++ b/tests/test_config/test_testing.py @@ -1,7 +1,7 @@ import unittest from zope.interface import implementer -from pyramid.security import AuthenticationAPIMixin, AuthorizationAPIMixin +from pyramid.security import SecurityAPIMixin, AuthenticationAPIMixin from pyramid.util import text_ from . import IDummy @@ -232,7 +232,7 @@ class DummyEvent: pass -class DummyRequest(AuthenticationAPIMixin, AuthorizationAPIMixin): +class DummyRequest(SecurityAPIMixin, AuthenticationAPIMixin): def __init__(self, environ=None): if environ is None: environ = {} diff --git a/tests/test_request.py b/tests/test_request.py index 484d86e01..1a10a8509 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -1,7 +1,7 @@ import unittest from pyramid import testing -from pyramid.security import AuthenticationAPIMixin, AuthorizationAPIMixin +from pyramid.security import SecurityAPIMixin, AuthenticationAPIMixin from pyramid.util import text_, bytes_ @@ -54,7 +54,7 @@ class TestRequest(unittest.TestCase): self.assertEqual(cls.ResponseClass, Response) def test_implements_security_apis(self): - apis = (AuthenticationAPIMixin, AuthorizationAPIMixin) + apis = (SecurityAPIMixin, AuthenticationAPIMixin) r = self._makeOne() self.assertTrue(isinstance(r, apis)) diff --git a/tests/test_security.py b/tests/test_security.py index dd2c225d3..40b5cd061 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -455,43 +455,25 @@ class TestHasPermission(unittest.TestCase): testing.tearDown() def _makeOne(self): - from pyramid.security import AuthorizationAPIMixin + from pyramid.security import SecurityAPIMixin from pyramid.registry import Registry - mixin = AuthorizationAPIMixin() + mixin = SecurityAPIMixin() mixin.registry = Registry() mixin.context = object() return mixin - def test_no_authentication_policy(self): + def test_no_security_policy(self): request = self._makeOne() result = request.has_permission('view') self.assertTrue(result) - self.assertEqual(result.msg, 'No authentication policy in use.') + self.assertEqual(result.msg, 'No security policy in use.') - def test_with_no_authorization_policy(self): + def test_with_security_registered(self): request = self._makeOne() - _registerAuthenticationPolicy(request.registry, None) - self.assertRaises( - ValueError, request.has_permission, 'view', context=None - ) - - def test_with_authn_and_authz_policies_registered(self): - request = self._makeOne() - _registerAuthenticationPolicy(request.registry, None) - _registerAuthorizationPolicy(request.registry, 'yo') + _registerSecurityPolicy(request.registry, 'yo') self.assertEqual(request.has_permission('view', context=None), 'yo') - def test_with_no_reg_on_request(self): - from pyramid.threadlocal import get_current_registry - - registry = get_current_registry() - request = self._makeOne() - del request.registry - _registerAuthenticationPolicy(registry, None) - _registerAuthorizationPolicy(registry, 'yo') - self.assertEqual(request.has_permission('view'), 'yo') - def test_with_no_context_passed(self): request = self._makeOne() self.assertTrue(request.has_permission('view')) -- cgit v1.2.3 From edf7ef0c379361f3a056014b068a01657decfb76 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 9 Mar 2019 12:02:24 -0800 Subject: Implement secured view deriver. Some tests still need fixing. --- tests/test_viewderivers.py | 134 ++++++++------------------------------------- 1 file changed, 22 insertions(+), 112 deletions(-) (limited to 'tests') diff --git a/tests/test_viewderivers.py b/tests/test_viewderivers.py index f01cb490e..9a61ea9f1 100644 --- a/tests/test_viewderivers.py +++ b/tests/test_viewderivers.py @@ -28,12 +28,11 @@ class TestDeriveView(unittest.TestCase): return logger def _registerSecurityPolicy(self, permissive): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy + from pyramid.interfaces import ISecurityPolicy policy = DummySecurityPolicy(permissive) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) + self.config.registry.registerUtility(policy, ISecurityPolicy) + return policy def test_function_returns_non_adaptable(self): def view(request): @@ -421,7 +420,7 @@ class TestDeriveView(unittest.TestCase): self.assertFalse(hasattr(result, '__call_permissive__')) self.assertEqual(result(None, None), response) - def test_with_debug_authorization_no_authpol(self): + def test_with_debug_authorization_no_security_policy(self): response = DummyResponse() view = lambda *arg: response self.config.registry.settings = dict( @@ -442,59 +441,7 @@ class TestDeriveView(unittest.TestCase): logger.messages[0], "debug_authorization of url url (view name " "'view_name' against context None): Allowed " - "(no authorization policy in use)", - ) - - def test_with_debug_authorization_authn_policy_no_authz_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict(debug_authorization=True) - from pyramid.interfaces import IAuthenticationPolicy - - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual( - logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)", - ) - - def test_with_debug_authorization_authz_policy_no_authn_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = dict(debug_authorization=True) - from pyramid.interfaces import IAuthorizationPolicy - - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) - logger = self._registerLogger() - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - self.assertEqual(len(logger.messages), 1) - self.assertEqual( - logger.messages[0], - "debug_authorization of url url (view name " - "'view_name' against context None): Allowed " - "(no authorization policy in use)", + "(no security policy in use)", ) def test_with_debug_authorization_no_permission(self): @@ -665,32 +612,11 @@ class TestDeriveView(unittest.TestCase): "'view_name' against context Exception()): True", ) - def test_secured_view_authn_policy_no_authz_policy(self): + def test_secured_view_authn_policy_no_security_policy(self): response = DummyResponse() view = lambda *arg: response self.config.registry.settings = {} - from pyramid.interfaces import IAuthenticationPolicy - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - result = self.config._derive_view(view, permission='view') - self.assertEqual(view.__module__, result.__module__) - self.assertEqual(view.__doc__, result.__doc__) - self.assertEqual(view.__name__, result.__name__) - self.assertFalse(hasattr(result, '__call_permissive__')) - request = self._makeRequest() - request.view_name = 'view_name' - request.url = 'url' - self.assertEqual(result(None, request), response) - - def test_secured_view_authz_policy_no_authn_policy(self): - response = DummyResponse() - view = lambda *arg: response - self.config.registry.settings = {} - from pyramid.interfaces import IAuthorizationPolicy - - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) result = self.config._derive_view(view, permission='view') self.assertEqual(view.__module__, result.__module__) self.assertEqual(view.__doc__, result.__doc__) @@ -702,53 +628,41 @@ class TestDeriveView(unittest.TestCase): self.assertEqual(result(None, request), response) def test_secured_view_raises_forbidden_no_name(self): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy from pyramid.httpexceptions import HTTPForbidden response = DummyResponse() view = lambda *arg: response self.config.registry.settings = {} - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) + self._registerSecurityPolicy(False) result = self.config._derive_view(view, permission='view') request = self._makeRequest() request.view_name = 'view_name' request.url = 'url' - try: + with self.assertRaises(HTTPForbidden) as cm: result(None, request) - except HTTPForbidden as e: - self.assertEqual( - e.message, 'Unauthorized: failed permission check' - ) - else: # pragma: no cover - raise AssertionError + self.assertEqual( + cm.exception.message, + 'Unauthorized: failed permission check', + ) def test_secured_view_raises_forbidden_with_name(self): - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy from pyramid.httpexceptions import HTTPForbidden def myview(request): # pragma: no cover pass self.config.registry.settings = {} - policy = DummySecurityPolicy(False) - self.config.registry.registerUtility(policy, IAuthenticationPolicy) - self.config.registry.registerUtility(policy, IAuthorizationPolicy) + self._registerSecurityPolicy(False) result = self.config._derive_view(myview, permission='view') request = self._makeRequest() request.view_name = 'view_name' request.url = 'url' - try: + with self.assertRaises(HTTPForbidden) as cm: result(None, request) - except HTTPForbidden as e: - self.assertEqual( - e.message, 'Unauthorized: myview failed permission check' - ) - else: # pragma: no cover - raise AssertionError + self.assertEqual( + cm.exception.message, + 'Unauthorized: myview failed permission check', + ) def test_secured_view_skipped_by_default_on_exception_view(self): from pyramid.request import Request @@ -794,12 +708,8 @@ class TestDeriveView(unittest.TestCase): app = self.config.make_wsgi_app() request = Request.blank('/foo', base_url='http://example.com') request.method = 'POST' - try: + with self.assertRaises(HTTPForbidden): request.get_response(app) - except HTTPForbidden: - pass - else: # pragma: no cover - raise AssertionError def test_secured_view_passed_on_explicit_exception_view(self): from pyramid.request import Request @@ -2130,10 +2040,10 @@ class DummySecurityPolicy: def __init__(self, permitted=True): self.permitted = permitted - def effective_principals(self, request): - return [] + def identify(self, request): + return 123 - def permits(self, context, principals, permission): + def permits(self, request, context, identity, permission): return self.permitted -- cgit v1.2.3 From 027d3cbd461be0555e7f4e44b508428228a4b56f Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 9 Mar 2019 12:39:41 -0800 Subject: Revamp tests for EffectivePrincipalsPredicate. --- tests/test_predicates.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'tests') diff --git a/tests/test_predicates.py b/tests/test_predicates.py index a99651a8f..60e36047e 100644 --- a/tests/test_predicates.py +++ b/tests/test_predicates.py @@ -502,6 +502,22 @@ class Test_EffectivePrincipalsPredicate(unittest.TestCase): return EffectivePrincipalsPredicate(val, config) + def _testing_authn_policy(self, userid, groupids=tuple()): + from pyramid.interfaces import IAuthenticationPolicy + from pyramid.security import Everyone, Authenticated + + class DummyPolicy: + def effective_principals(self, request): + p = [Everyone] + if userid: + p.append(Authenticated) + p.append(userid) + p.extend(groupids) + return p + + registry = self.config.registry + registry.registerUtility(DummyPolicy(), IAuthenticationPolicy) + def test_text(self): inst = self._makeOne(('verna', 'fred'), None) self.assertEqual( @@ -526,7 +542,7 @@ class Test_EffectivePrincipalsPredicate(unittest.TestCase): def test_it_call_authentication_policy_provides_superset(self): request = testing.DummyRequest() - self.config.testing_securitypolicy('fred', groupids=('verna', 'bambi')) + self._testing_authn_policy('fred', groupids=('verna', 'bambi')) inst = self._makeOne(('verna', 'fred'), None) context = Dummy() self.assertTrue(inst(context, request)) @@ -535,14 +551,14 @@ class Test_EffectivePrincipalsPredicate(unittest.TestCase): from pyramid.security import Authenticated request = testing.DummyRequest() - self.config.testing_securitypolicy('fred', groupids=('verna', 'bambi')) + self._testing_authn_policy('fred', groupids=('verna', 'bambi')) inst = self._makeOne(Authenticated, None) context = Dummy() self.assertTrue(inst(context, request)) def test_it_call_authentication_policy_doesnt_provide_superset(self): request = testing.DummyRequest() - self.config.testing_securitypolicy('fred') + self._testing_authn_policy('fred') inst = self._makeOne(('verna', 'fred'), None) context = Dummy() self.assertFalse(inst(context, request)) -- cgit v1.2.3 From 7cdaabd655b3ba1deba6c22ef7c956529e51cb79 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sun, 3 Mar 2019 09:01:12 -0800 Subject: Reimplement remember and forget. This breaks some more tests. --- tests/test_security.py | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index 40b5cd061..fae9db76f 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -187,32 +187,22 @@ class TestRemember(unittest.TestCase): return remember(*arg, **kwarg) - def test_no_authentication_policy(self): + def test_no_security_policy(self): request = _makeRequest() result = self._callFUT(request, 'me') self.assertEqual(result, []) - def test_with_authentication_policy(self): + def test_with_security_policy(self): request = _makeRequest() registry = request.registry - _registerAuthenticationPolicy(registry, 'yo') - result = self._callFUT(request, 'me') - self.assertEqual(result, [('X-Pyramid-Test', 'me')]) - - def test_with_authentication_policy_no_reg_on_request(self): - from pyramid.threadlocal import get_current_registry - - registry = get_current_registry() - request = _makeRequest() - del request.registry - _registerAuthenticationPolicy(registry, 'yo') + _registerSecurityPolicy(registry, 'yo') result = self._callFUT(request, 'me') self.assertEqual(result, [('X-Pyramid-Test', 'me')]) def test_with_missing_arg(self): request = _makeRequest() registry = request.registry - _registerAuthenticationPolicy(registry, 'yo') + _registerSecurityPolicy(registry, 'yo') self.assertRaises(TypeError, lambda: self._callFUT(request)) @@ -228,24 +218,14 @@ class TestForget(unittest.TestCase): return forget(*arg) - def test_no_authentication_policy(self): + def test_no_security_policy(self): request = _makeRequest() result = self._callFUT(request) self.assertEqual(result, []) - def test_with_authentication_policy(self): - request = _makeRequest() - _registerAuthenticationPolicy(request.registry, 'yo') - result = self._callFUT(request) - self.assertEqual(result, [('X-Pyramid-Test', 'logout')]) - - def test_with_authentication_policy_no_reg_on_request(self): - from pyramid.threadlocal import get_current_registry - - registry = get_current_registry() + def test_with_security_policy(self): request = _makeRequest() - del request.registry - _registerAuthenticationPolicy(registry, 'yo') + _registerSecurityPolicy(request.registry, 'yo') result = self._callFUT(request) self.assertEqual(result, [('X-Pyramid-Test', 'logout')]) -- cgit v1.2.3 From 94a16a7ddd7e9d313e1b447bf478bc51c053e58a Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 9 Mar 2019 12:29:23 -0800 Subject: Implement new dummy security policy. --- tests/test_config/test_testing.py | 24 +++++++-------------- tests/test_config/test_views.py | 29 +++++++++++-------------- tests/test_testing.py | 45 +++++---------------------------------- 3 files changed, 25 insertions(+), 73 deletions(-) (limited to 'tests') diff --git a/tests/test_config/test_testing.py b/tests/test_config/test_testing.py index 822eeac8f..500aedeae 100644 --- a/tests/test_config/test_testing.py +++ b/tests/test_config/test_testing.py @@ -17,28 +17,20 @@ class TestingConfiguratorMixinTests(unittest.TestCase): from pyramid.testing import DummySecurityPolicy config = self._makeOne(autocommit=True) - config.testing_securitypolicy( - 'user', ('group1', 'group2'), permissive=False - ) - from pyramid.interfaces import IAuthenticationPolicy - from pyramid.interfaces import IAuthorizationPolicy + config.testing_securitypolicy('user', permissive=False) + from pyramid.interfaces import ISecurityPolicy - ut = config.registry.getUtility(IAuthenticationPolicy) - self.assertTrue(isinstance(ut, DummySecurityPolicy)) - ut = config.registry.getUtility(IAuthorizationPolicy) - self.assertEqual(ut.userid, 'user') - self.assertEqual(ut.groupids, ('group1', 'group2')) - self.assertEqual(ut.permissive, False) + policy = config.registry.getUtility(ISecurityPolicy) + self.assertTrue(isinstance(policy, DummySecurityPolicy)) + self.assertEqual(policy.identity, 'user') + self.assertEqual(policy.permissive, False) def test_testing_securitypolicy_remember_result(self): from pyramid.security import remember config = self._makeOne(autocommit=True) pol = config.testing_securitypolicy( - 'user', - ('group1', 'group2'), - permissive=False, - remember_result=True, + 'user', permissive=False, remember_result=True ) request = DummyRequest() request.registry = config.registry @@ -51,7 +43,7 @@ class TestingConfiguratorMixinTests(unittest.TestCase): config = self._makeOne(autocommit=True) pol = config.testing_securitypolicy( - 'user', ('group1', 'group2'), permissive=False, forget_result=True + 'user', permissive=False, forget_result=True ) request = DummyRequest() request.registry = config.registry diff --git a/tests/test_config/test_views.py b/tests/test_config/test_views.py index 685b81a0f..28b7a9fb1 100644 --- a/tests/test_config/test_views.py +++ b/tests/test_config/test_views.py @@ -2059,22 +2059,19 @@ class TestViewsConfigurationMixin(unittest.TestCase): outerself = self class DummyPolicy(object): - def effective_principals(self, r): + def identify(self, r): outerself.assertEqual(r, request) - return ['abc'] + return 123 - def permits(self, context, principals, permission): + def permits(self, r, context, identity, permission): + outerself.assertEqual(r, request) outerself.assertEqual(context, None) - outerself.assertEqual(principals, ['abc']) + outerself.assertEqual(identity, 123) outerself.assertEqual(permission, 'view') return True policy = DummyPolicy() - config = self._makeOne( - authorization_policy=policy, - authentication_policy=policy, - autocommit=True, - ) + config = self._makeOne(security_policy=policy, autocommit=True) config.add_view(view=view1, permission='view', renderer=null_renderer) view = self._getViewCallable(config) request = self._makeRequest(config) @@ -2087,22 +2084,20 @@ class TestViewsConfigurationMixin(unittest.TestCase): outerself = self class DummyPolicy(object): - def effective_principals(self, r): + def identify(self, r): outerself.assertEqual(r, request) - return ['abc'] + return 123 - def permits(self, context, principals, permission): + def permits(self, r, context, identity, permission): + outerself.assertEqual(r, request) outerself.assertEqual(context, None) - outerself.assertEqual(principals, ['abc']) + outerself.assertEqual(identity, 123) outerself.assertEqual(permission, 'view') return True policy = DummyPolicy() config = self._makeOne( - authorization_policy=policy, - authentication_policy=policy, - default_permission='view', - autocommit=True, + security_policy=policy, default_permission='view', autocommit=True ) config.add_view(view=view1, renderer=null_renderer) view = self._getViewCallable(config) diff --git a/tests/test_testing.py b/tests/test_testing.py index 5b3ad0f22..874d9f11b 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -23,52 +23,17 @@ class TestDummySecurityPolicy(unittest.TestCase): return DummySecurityPolicy - def _makeOne(self, userid=None, groupids=(), permissive=True): + def _makeOne(self, identity=None, permissive=True): klass = self._getTargetClass() - return klass(userid, groupids, permissive) + return klass(identity, permissive) - def test_authenticated_userid(self): + def test_identify(self): policy = self._makeOne('user') - self.assertEqual(policy.authenticated_userid(None), 'user') - - def test_unauthenticated_userid(self): - policy = self._makeOne('user') - self.assertEqual(policy.unauthenticated_userid(None), 'user') - - def test_effective_principals_userid(self): - policy = self._makeOne('user', ('group1',)) - from pyramid.security import Everyone - from pyramid.security import Authenticated - - self.assertEqual( - policy.effective_principals(None), - [Everyone, Authenticated, 'user', 'group1'], - ) - - def test_effective_principals_nouserid(self): - policy = self._makeOne() - from pyramid.security import Everyone - - self.assertEqual(policy.effective_principals(None), [Everyone]) + self.assertEqual(policy.identify(None), 'user') def test_permits(self): policy = self._makeOne() - self.assertEqual(policy.permits(None, None, None), True) - - def test_principals_allowed_by_permission(self): - policy = self._makeOne('user', ('group1',)) - from pyramid.security import Everyone - from pyramid.security import Authenticated - - result = policy.principals_allowed_by_permission(None, None) - self.assertEqual(result, [Everyone, Authenticated, 'user', 'group1']) - - def test_principals_allowed_by_permission_not_permissive(self): - policy = self._makeOne('user', ('group1',)) - policy.permissive = False - - result = policy.principals_allowed_by_permission(None, None) - self.assertEqual(result, []) + self.assertEqual(policy.permits(None, None, None, None), True) def test_forget(self): policy = self._makeOne() -- cgit v1.2.3 From 8a1f8d4acc77cd9fe57405384b5c20e9fa3fb078 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 9 Mar 2019 13:27:13 -0800 Subject: Get integration tests working again. --- tests/pkgs/defpermbugapp/__init__.py | 4 ++-- tests/pkgs/forbiddenapp/__init__.py | 4 ++-- tests/pkgs/staticpermapp/__init__.py | 4 ++-- tests/test_integration.py | 8 +++++--- 4 files changed, 11 insertions(+), 9 deletions(-) (limited to 'tests') diff --git a/tests/pkgs/defpermbugapp/__init__.py b/tests/pkgs/defpermbugapp/__init__.py index 81897e86a..af78404ae 100644 --- a/tests/pkgs/defpermbugapp/__init__.py +++ b/tests/pkgs/defpermbugapp/__init__.py @@ -25,6 +25,6 @@ def includeme(config): authn_policy = AuthTktAuthenticationPolicy('seekt1t', hashalg='sha512') authz_policy = ACLAuthorizationPolicy() config.scan('tests.pkgs.defpermbugapp') - config._set_authentication_policy(authn_policy) - config._set_authorization_policy(authz_policy) + config.set_authentication_policy(authn_policy) + config.set_authorization_policy(authz_policy) config.set_default_permission('private') diff --git a/tests/pkgs/forbiddenapp/__init__.py b/tests/pkgs/forbiddenapp/__init__.py index 31ea4dd52..79670dd32 100644 --- a/tests/pkgs/forbiddenapp/__init__.py +++ b/tests/pkgs/forbiddenapp/__init__.py @@ -22,7 +22,7 @@ def includeme(config): authn_policy = AuthTktAuthenticationPolicy('seekr1t', hashalg='sha512') authz_policy = ACLAuthorizationPolicy() - config._set_authentication_policy(authn_policy) - config._set_authorization_policy(authz_policy) + config.set_authentication_policy(authn_policy) + config.set_authorization_policy(authz_policy) config.add_view(x_view, name='x', permission='private') config.add_view(forbidden_view, context=HTTPForbidden) diff --git a/tests/pkgs/staticpermapp/__init__.py b/tests/pkgs/staticpermapp/__init__.py index ffc87d39a..a12eac2d3 100644 --- a/tests/pkgs/staticpermapp/__init__.py +++ b/tests/pkgs/staticpermapp/__init__.py @@ -18,8 +18,8 @@ def includeme(config): authn_policy = RemoteUserAuthenticationPolicy() authz_policy = ACLAuthorizationPolicy() - config._set_authentication_policy(authn_policy) - config._set_authorization_policy(authz_policy) + config.set_authentication_policy(authn_policy) + config.set_authorization_policy(authz_policy) config.add_static_view('allowed', 'tests:fixtures/static/') config.add_static_view( 'protected', 'tests:fixtures/static/', permission='view' diff --git a/tests/test_integration.py b/tests/test_integration.py index e6dccbb5b..72465dc93 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -581,10 +581,12 @@ class TestConflictApp(unittest.TestCase): def test_overridden_authorization_policy(self): config = self._makeConfig() config.include(self.package) - from pyramid.testing import DummySecurityPolicy - config.set_authorization_policy(DummySecurityPolicy('fred')) - config.set_authentication_policy(DummySecurityPolicy(permissive=True)) + class DummySecurityPolicy: + def permits(self, context, principals, permission): + return True + + config.set_authorization_policy(DummySecurityPolicy()) app = config.make_wsgi_app() self.testapp = TestApp(app) res = self.testapp.get('/protected', status=200) -- cgit v1.2.3 From 31998bcdd0396316c1a0fdeb50bee59e4b9e14ed Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 30 Mar 2019 10:08:31 -0700 Subject: Implement pyramid.security.ACLHelper Mostly a lift-and-shift of the code in ACLAuthorizationPolicy. --- tests/test_security.py | 275 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index fae9db76f..b91aa7682 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -611,3 +611,278 @@ def _makeRequest(): request.registry = Registry() request.context = object() return request + + +class TestACLHelper(unittest.TestCase): + def test_no_acl(self): + from pyramid.security import ACLHelper + + context = DummyContext() + helper = ACLHelper() + result = helper.permits(context, ['foo'], 'permission') + self.assertEqual(result, False) + self.assertEqual(result.ace, '') + self.assertEqual( + result.acl, '' + ) + self.assertEqual(result.permission, 'permission') + self.assertEqual(result.principals, ['foo']) + self.assertEqual(result.context, context) + + def test_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Deny + from pyramid.security import Allow + from pyramid.security import Everyone + from pyramid.security import Authenticated + from pyramid.security import ALL_PERMISSIONS + from pyramid.security import DENY_ALL + + helper = ACLHelper() + root = DummyContext() + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [(Allow, Authenticated, VIEW)] + community.__acl__ = [ + (Allow, 'fred', ALL_PERMISSIONS), + (Allow, 'wilma', VIEW), + DENY_ALL, + ] + blog.__acl__ = [ + (Allow, 'barney', MEMBER_PERMS), + (Allow, 'wilma', VIEW), + ] + + result = helper.permits( + blog, [Everyone, Authenticated, 'wilma'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) + self.assertEqual(result.acl, blog.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'wilma'], 'delete' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'fred'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + result = helper.permits( + blog, [Everyone, Authenticated, 'fred'], 'doesntevenexistyet' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'barney'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) + result = helper.permits( + blog, [Everyone, Authenticated, 'barney'], 'administer' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + root, [Everyone, Authenticated, 'someguy'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) + result = helper.permits( + blog, [Everyone, Authenticated, 'someguy'], 'view' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits(root, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, '') + self.assertEqual(result.acl, root.__acl__) + + context = DummyContext() + result = helper.permits(context, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.ace, '') + self.assertEqual( + result.acl, '' + ) + + def test_string_permissions_in_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + root = DummyContext() + root.__acl__ = [(Allow, 'wilma', 'view_stuff')] + + result = helper.permits(root, ['wilma'], 'view') + # would be True if matching against 'view_stuff' instead of against + # ['view_stuff'] + self.assertEqual(result, False) + + def test_callable_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + context = DummyContext() + fn = lambda self: [(Allow, 'bob', 'read')] + context.__acl__ = fn.__get__(context, context.__class__) + result = helper.permits(context, ['bob'], 'read') + self.assertTrue(result) + + def test_principals_allowed_by_permission_direct(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + from pyramid.security import DENY_ALL + + helper = ACLHelper() + context = DummyContext() + acl = [ + (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read'), + ] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission_callable_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + from pyramid.security import DENY_ALL + + helper = ACLHelper() + context = DummyContext() + acl = lambda: [ + (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read'), + ] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission_string_permission(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + context = DummyContext() + acl = [(Allow, 'chrism', 'read_it')] + context.__acl__ = acl + result = helper.principals_allowed_by_permission(context, 'read') + # would be ['chrism'] if 'read' were compared against 'read_it' instead + # of against ['read_it'] + self.assertEqual(list(result), []) + + def test_principals_allowed_by_permission(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + from pyramid.security import Deny + from pyramid.security import DENY_ALL + from pyramid.security import ALL_PERMISSIONS + + helper = ACLHelper() + root = DummyContext(__name__='', __parent__=None) + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ + (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', ('read',)), + (Allow, 'jim', ALL_PERMISSIONS), + ] + community.__acl__ = [ + (Deny, 'flooz', 'read'), + (Allow, 'flooz', 'read'), + (Allow, 'mork', 'read'), + (Deny, 'jim', 'read'), + (Allow, 'someguy', 'manage'), + ] + blog.__acl__ = [(Allow, 'fred', 'read'), DENY_ALL] + + result = sorted(helper.principals_allowed_by_permission(blog, 'read')) + self.assertEqual(result, ['fred']) + result = sorted( + helper.principals_allowed_by_permission(community, 'read') + ) + self.assertEqual(result, ['chrism', 'mork', 'other']) + result = sorted( + helper.principals_allowed_by_permission(community, 'read') + ) + result = sorted(helper.principals_allowed_by_permission(root, 'read')) + self.assertEqual(result, ['chrism', 'jim', 'other']) + + def test_principals_allowed_by_permission_no_acls(self): + from pyramid.security import ACLHelper + + helper = ACLHelper() + context = DummyContext() + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + def test_principals_allowed_by_permission_deny_not_permission_in_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Deny + from pyramid.security import Everyone + + helper = ACLHelper() + context = DummyContext() + acl = [(Deny, Everyone, 'write')] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + def test_principals_allowed_by_permission_deny_permission_in_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Deny + from pyramid.security import Everyone + + helper = ACLHelper() + context = DummyContext() + acl = [(Deny, Everyone, 'read')] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + +VIEW = 'view' +EDIT = 'edit' +CREATE = 'create' +DELETE = 'delete' +MODERATE = 'moderate' +ADMINISTER = 'administer' +COMMENT = 'comment' + +GUEST_PERMS = (VIEW, COMMENT) +MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) +MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) +ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) -- cgit v1.2.3 From 6aba89d19cc384021864d3b83d53082f56c3f419 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 30 Mar 2019 10:31:17 -0700 Subject: Add SessionAuthenticationHelper. --- tests/test_security.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index b91aa7682..73d8ba6fc 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -886,3 +886,52 @@ GUEST_PERMS = (VIEW, COMMENT) MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) + + +class TestSessionAuthenticationHelper(unittest.TestCase): + def _makeRequest(self, session=None): + from types import SimpleNamespace + if session is None: + session = dict() + return SimpleNamespace(session=session) + + def _makeOne(self, prefix=''): + from pyramid.security import SessionAuthenticationHelper + + return SessionAuthenticationHelper(prefix=prefix) + + def test_identify(self): + request = self._makeRequest({'userid': 'fred'}) + helper = self._makeOne() + self.assertEqual(helper.identify(request), 'fred') + + def test_identify_with_prefix(self): + request = self._makeRequest({'foo.userid': 'fred'}) + helper = self._makeOne(prefix='foo.') + self.assertEqual(helper.identify(request), 'fred') + + def test_identify_none(self): + request = self._makeRequest() + helper = self._makeOne() + self.assertEqual(helper.identify(request), None) + + def test_remember(self): + request = self._makeRequest() + helper = self._makeOne() + result = helper.remember(request, 'fred') + self.assertEqual(request.session.get('userid'), 'fred') + self.assertEqual(result, []) + + def test_forget(self): + request = self._makeRequest({'userid': 'fred'}) + helper = self._makeOne() + result = helper.forget(request) + self.assertEqual(request.session.get('userid'), None) + self.assertEqual(result, []) + + def test_forget_no_identity(self): + request = self._makeRequest() + helper = self._makeOne() + result = helper.forget(request) + self.assertEqual(request.session.get('userid'), None) + self.assertEqual(result, []) -- cgit v1.2.3 From 3d9c5c534c2200aeebad278466a961895901e617 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 30 Mar 2019 11:04:22 -0700 Subject: Fix formatting. --- tests/test_security.py | 1 + 1 file changed, 1 insertion(+) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index 73d8ba6fc..dd5be54d7 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -891,6 +891,7 @@ ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) class TestSessionAuthenticationHelper(unittest.TestCase): def _makeRequest(self, session=None): from types import SimpleNamespace + if session is None: session = dict() return SimpleNamespace(session=session) -- cgit v1.2.3 From 9f267dd842c5e93336f0392f2809da75a716039a Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 30 Mar 2019 11:09:44 -0700 Subject: Migrate AuthTktCookieHelper to pyramid.security. --- tests/test_authentication.py | 1000 +---------------------------------------- tests/test_security.py | 1008 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1009 insertions(+), 999 deletions(-) (limited to 'tests') diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 8671eba05..89cf9866d 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -1,8 +1,7 @@ -from http.cookies import SimpleCookie import unittest import warnings from pyramid import testing -from pyramid.util import text_, bytes_ +from pyramid.util import bytes_ class TestCallbackAuthenticationPolicyDebugging(unittest.TestCase): @@ -664,926 +663,6 @@ class TestAuthTktAuthenticationPolicy(unittest.TestCase): verifyObject(IAuthenticationPolicy, self._makeOne(None, None)) -class TestAuthTktCookieHelper(unittest.TestCase): - def _getTargetClass(self): - from pyramid.authentication import AuthTktCookieHelper - - return AuthTktCookieHelper - - def _makeOne(self, *arg, **kw): - helper = self._getTargetClass()(*arg, **kw) - # laziness after moving auth_tkt classes and funcs into - # authentication module - auth_tkt = DummyAuthTktModule() - helper.auth_tkt = auth_tkt - helper.AuthTicket = auth_tkt.AuthTicket - helper.parse_ticket = auth_tkt.parse_ticket - helper.BadTicket = auth_tkt.BadTicket - return helper - - def _makeRequest(self, cookie=None, ipv6=False): - environ = {'wsgi.version': (1, 0)} - - if ipv6 is False: - environ['REMOTE_ADDR'] = '1.1.1.1' - else: - environ['REMOTE_ADDR'] = '::1' - environ['SERVER_NAME'] = 'localhost' - return DummyRequest(environ, cookie=cookie) - - def _cookieValue(self, cookie): - items = cookie.value.split('/') - D = {} - for item in items: - k, v = item.split('=', 1) - D[k] = v - return D - - def _parseHeaders(self, headers): - return [self._parseHeader(header) for header in headers] - - def _parseHeader(self, header): - cookie = self._parseCookie(header[1]) - return cookie - - def _parseCookie(self, cookie): - cookies = SimpleCookie() - cookies.load(cookie) - return cookies.get('auth_tkt') - - def test_init_cookie_str_reissue_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', reissue_time='invalid value' - ) - - def test_init_cookie_str_timeout_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', timeout='invalid value' - ) - - def test_init_cookie_str_max_age_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', max_age='invalid value' - ) - - def test_identify_nocookie(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_cookie_value_is_None(self): - helper = self._makeOne('secret') - request = self._makeRequest(None) - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_good_cookie_include_ip(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_include_ipv6(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest('ticket', ipv6=True) - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '::1') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_dont_include_ip(self): - helper = self._makeOne('secret', include_ip=False) - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_int_useridtype(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = '1' - helper.auth_tkt.user_data = 'userid_type:int' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 1) - self.assertEqual(result['userdata'], 'userid_type:int') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_nonuseridtype_user_data(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = '1' - helper.auth_tkt.user_data = 'bogus:int' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], '1') - self.assertEqual(result['userdata'], 'bogus:int') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_unknown_useridtype(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = 'abc' - helper.auth_tkt.user_data = 'userid_type:unknown' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'abc') - self.assertEqual(result['userdata'], 'userid_type:unknown') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_b64str_useridtype(self): - from base64 import b64encode - - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = b64encode(b'encoded').strip() - helper.auth_tkt.user_data = 'userid_type:b64str' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], b'encoded') - self.assertEqual(result['userdata'], 'userid_type:b64str') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_b64unicode_useridtype(self): - from base64 import b64encode - - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip() - helper.auth_tkt.user_data = 'userid_type:b64unicode' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8')) - self.assertEqual(result['userdata'], 'userid_type:b64unicode') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_bad_cookie(self): - helper = self._makeOne('secret', include_ip=True) - helper.auth_tkt.parse_raise = True - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_cookie_timeout(self): - helper = self._makeOne('secret', timeout=1) - self.assertEqual(helper.timeout, 1) - - def test_identify_cookie_str_timeout(self): - helper = self._makeOne('secret', timeout='1') - self.assertEqual(helper.timeout, 1) - - def test_identify_cookie_timeout_aged(self): - import time - - helper = self._makeOne('secret', timeout=10) - now = time.time() - helper.auth_tkt.timestamp = now - 1 - helper.now = now + 10 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertFalse(result) - - def test_identify_cookie_reissue(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - - def test_identify_cookie_str_reissue(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time='0') - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - - def test_identify_cookie_reissue_already_reissued_this_request(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - request._authtkt_reissued = True - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 0) - - def test_identify_cookie_reissue_notyet(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=10) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 0) - - def test_identify_cookie_reissue_revoked_by_forget(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - result = helper.forget(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 0) - - def test_identify_cookie_reissue_revoked_by_remember(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - result = helper.remember(request, 'bob') - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 0) - - def test_identify_cookie_reissue_with_tokens_default(self): - # see https://github.com/Pylons/pyramid/issues#issue/108 - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - auth_tkt = DummyAuthTktModule(tokens=['']) - helper.auth_tkt = auth_tkt - helper.AuthTicket = auth_tkt.AuthTicket - helper.parse_ticket = auth_tkt.parse_ticket - helper.BadTicket = auth_tkt.BadTicket - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](None, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - self.assertTrue("/tokens=/" in response.headerlist[0][1]) - - def test_remember(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_nondefault_samesite(self): - helper = self._makeOne('secret', samesite='Strict') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith( - '; Domain=localhost; Path=/; SameSite=Strict' - ) - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith( - '; Domain=.localhost; Path=/; SameSite=Strict' - ) - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_None_samesite(self): - helper = self._makeOne('secret', samesite=None) - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/')) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/')) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_include_ip(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_path(self): - helper = self._makeOne( - 'secret', include_ip=True, path="/cgi-bin/app.cgi/" - ) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax') - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith( - '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' - ) - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith( - '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' - ) - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_http_only(self): - helper = self._makeOne('secret', include_ip=True, http_only=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue('; HttpOnly' in result[1][1]) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue('; HttpOnly' in result[2][1]) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_secure(self): - helper = self._makeOne('secret', include_ip=True, secure=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue('; secure' in result[0][1]) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue('; secure' in result[1][1]) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue('; secure' in result[2][1]) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_wild_domain_disabled(self): - helper = self._makeOne('secret', wild_domain=False) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 2) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - def test_remember_parent_domain(self): - helper = self._makeOne('secret', parent_domain=True) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith( - '; Domain=.example.com; Path=/; SameSite=Lax' - ) - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - def test_remember_parent_domain_supercedes_wild_domain(self): - helper = self._makeOne('secret', parent_domain=True, wild_domain=True) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - self.assertTrue( - result[0][1].endswith( - '; Domain=.example.com; Path=/; SameSite=Lax' - ) - ) - - def test_remember_explicit_domain(self): - helper = self._makeOne('secret', domain='pyramid.bazinga') - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith( - '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' - ) - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - def test_remember_domain_supercedes_parent_and_wild_domain(self): - helper = self._makeOne( - 'secret', - domain='pyramid.bazinga', - parent_domain=True, - wild_domain=True, - ) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - self.assertTrue( - result[0][1].endswith( - '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' - ) - ) - - def test_remember_binary_userid(self): - import base64 - - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, b'userid') - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual( - val['userid'], text_(base64.b64encode(b'userid').strip()) - ) - self.assertEqual(val['user_data'], 'userid_type:b64str') - - def test_remember_int_userid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 1) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual(val['userid'], '1') - self.assertEqual(val['user_data'], 'userid_type:int') - - def test_remember_unicode_userid(self): - import base64 - - helper = self._makeOne('secret') - request = self._makeRequest() - userid = text_(b'\xc2\xa9', 'utf-8') - result = helper.remember(request, userid) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual( - val['userid'], text_(base64.b64encode(userid.encode('utf-8'))) - ) - self.assertEqual(val['user_data'], 'userid_type:b64unicode') - - def test_remember_insane_userid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - userid = object() - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', RuntimeWarning) - result = helper.remember(request, userid) - self.assertTrue(str(w[-1].message).startswith('userid is of type')) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - value = values[0] - self.assertTrue('userid' in value.value) - - def test_remember_max_age(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid', max_age=500) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - - self.assertEqual(values[0]['max-age'], '500') - self.assertTrue(values[0]['expires']) - - def test_remember_str_max_age(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid', max_age='500') - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - - self.assertEqual(values[0]['max-age'], '500') - self.assertTrue(values[0]['expires']) - - def test_remember_str_max_age_invalid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - self.assertRaises( - ValueError, - helper.remember, - request, - 'userid', - max_age='invalid value', - ) - - def test_remember_tokens(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'other', tokens=('foo', 'bar')) - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[0][1]) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[1][1]) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[2][1]) - - def test_remember_samesite_nondefault(self): - helper = self._makeOne('secret', samesite='Strict') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - cookieval = result[0][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[1][0], 'Set-Cookie') - cookieval = result[1][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[2][0], 'Set-Cookie') - cookieval = result[2][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - def test_remember_samesite_default(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - cookieval = result[0][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[1][0], 'Set-Cookie') - cookieval = result[1][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[2][0], 'Set-Cookie') - cookieval = result[2][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - def test_remember_unicode_but_ascii_token(self): - helper = self._makeOne('secret') - request = self._makeRequest() - la = text_(b'foo', 'utf-8') - result = helper.remember(request, 'other', tokens=(la,)) - # tokens must be str type on both Python 2 and 3 - self.assertTrue("/tokens=foo/" in result[0][1]) - - def test_remember_nonascii_token(self): - helper = self._makeOne('secret') - request = self._makeRequest() - la = text_(b'La Pe\xc3\xb1a', 'utf-8') - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=(la,) - ) - - def test_remember_invalid_token_format(self): - helper = self._makeOne('secret') - request = self._makeRequest() - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=('foo bar',) - ) - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=('1bar',) - ) - - def test_forget(self): - helper = self._makeOne('secret') - request = self._makeRequest() - headers = helper.forget(request) - self.assertEqual(len(headers), 3) - name, value = headers[0] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - name, value = headers[1] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - name, value = headers[2] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - - -class TestAuthTicket(unittest.TestCase): - def _makeOne(self, *arg, **kw): - from pyramid.authentication import AuthTicket - - return AuthTicket(*arg, **kw) - - def test_ctor_with_tokens(self): - ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) - self.assertEqual(ticket.tokens, 'a,b') - - def test_ctor_with_time(self): - ticket = self._makeOne('secret', 'userid', 'ip', time='time') - self.assertEqual(ticket.time, 'time') - - def test_digest(self): - ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) - result = ticket.digest() - self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') - - def test_digest_sha512(self): - ticket = self._makeOne( - 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512' - ) - result = ticket.digest() - self.assertEqual( - result, - '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49' - '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9' - 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278', - ) - - def test_cookie_value(self): - ticket = self._makeOne( - 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b') - ) - result = ticket.cookie_value() - self.assertEqual( - result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' - ) - - def test_ipv4(self): - ticket = self._makeOne( - 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256' - ) - result = ticket.cookie_value() - self.assertEqual( - result, - 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b' - '798400ecdade8d76c530000000auserid!', - ) - - def test_ipv6(self): - ticket = self._makeOne( - 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256' - ) - result = ticket.cookie_value() - self.assertEqual( - result, - 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8' - '5becf8760cd7a2fa4910000000auserid!', - ) - - -class TestBadTicket(unittest.TestCase): - def _makeOne(self, msg, expected=None): - from pyramid.authentication import BadTicket - - return BadTicket(msg, expected) - - def test_it(self): - exc = self._makeOne('msg', expected=True) - self.assertEqual(exc.expected, True) - self.assertTrue(isinstance(exc, Exception)) - - -class Test_parse_ticket(unittest.TestCase): - def _callFUT(self, secret, ticket, ip, hashalg='md5'): - from pyramid.authentication import parse_ticket - - return parse_ticket(secret, ticket, ip, hashalg) - - def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): - from pyramid.authentication import BadTicket - - self.assertRaises( - BadTicket, self._callFUT, secret, ticket, ip, hashalg - ) - - def test_bad_timestamp(self): - ticket = 'x' * 64 - self._assertRaisesBadTicket('secret', ticket, 'ip') - - def test_bad_userid_or_data(self): - ticket = 'x' * 32 + '11111111' + 'x' * 10 - self._assertRaisesBadTicket('secret', ticket, 'ip') - - def test_digest_sig_incorrect(self): - ticket = 'x' * 32 + '11111111' + 'a!b!c' - self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') - - def test_correct_with_user_data(self): - ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') - result = self._callFUT('secret', ticket, '0.0.0.0') - self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) - - def test_correct_with_user_data_sha512(self): - ticket = text_( - '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1' - '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767' - 'ba8a26d02aaeae56599a0000000auserid!a,b!' - ) - result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') - self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) - - def test_ipv4(self): - ticket = text_( - 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd' - 'ade8d76c530000000auserid!' - ) - result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256') - self.assertEqual(result, (10, 'userid', [''], '')) - - def test_ipv6(self): - ticket = text_( - 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760' - 'cd7a2fa4910000000auserid!' - ) - result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256') - self.assertEqual(result, (10, 'userid', [''], '')) - - class TestSessionAuthenticationPolicy(unittest.TestCase): def _getTargetClass(self): from pyramid.authentication import SessionAuthenticationPolicy @@ -1911,14 +990,6 @@ class DummyContext: pass -class DummyCookies(object): - def __init__(self, cookie): - self.cookie = cookie - - def get(self, name): - return self.cookie - - class DummyRequest: domain = 'localhost' @@ -1927,10 +998,6 @@ class DummyRequest: self.session = session or {} self.registry = registry self.callbacks = [] - self.cookies = DummyCookies(cookie) - - def add_response_callback(self, callback): - self.callbacks.append(callback) class DummyWhoPlugin: @@ -1954,68 +1021,3 @@ class DummyCookieHelper: def forget(self, *arg): return [] - - -class DummyAuthTktModule(object): - def __init__( - self, - timestamp=0, - userid='userid', - tokens=(), - user_data='', - parse_raise=False, - hashalg="md5", - ): - self.timestamp = timestamp - self.userid = userid - self.tokens = tokens - self.user_data = user_data - self.parse_raise = parse_raise - self.hashalg = hashalg - - def parse_ticket(secret, value, remote_addr, hashalg): - self.secret = secret - self.value = value - self.remote_addr = remote_addr - if self.parse_raise: - raise self.BadTicket() - return self.timestamp, self.userid, self.tokens, self.user_data - - self.parse_ticket = parse_ticket - - class AuthTicket(object): - def __init__(self, secret, userid, remote_addr, **kw): - self.secret = secret - self.userid = userid - self.remote_addr = remote_addr - self.kw = kw - - def cookie_value(self): - result = { - 'secret': self.secret, - 'userid': self.userid, - 'remote_addr': self.remote_addr, - } - result.update(self.kw) - tokens = result.pop('tokens', None) - if tokens is not None: - tokens = '|'.join(tokens) - result['tokens'] = tokens - items = sorted(result.items()) - new_items = [] - for k, v in items: - if isinstance(v, bytes): - v = text_(v) - new_items.append((k, v)) - result = '/'.join(['%s=%s' % (k, v) for k, v in new_items]) - return result - - self.AuthTicket = AuthTicket - - class BadTicket(Exception): - pass - - -class DummyResponse: - def __init__(self): - self.headerlist = [] diff --git a/tests/test_security.py b/tests/test_security.py index dd5be54d7..b66632baa 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -1,6 +1,9 @@ import unittest +import warnings +from http.cookies import SimpleCookie from pyramid import testing +from pyramid.util import text_ class TestAllPermissionsList(unittest.TestCase): @@ -936,3 +939,1008 @@ class TestSessionAuthenticationHelper(unittest.TestCase): result = helper.forget(request) self.assertEqual(request.session.get('userid'), None) self.assertEqual(result, []) + + +class TestAuthTicket(unittest.TestCase): + def _makeOne(self, *arg, **kw): + from pyramid.security import AuthTicket + + return AuthTicket(*arg, **kw) + + def test_ctor_with_tokens(self): + ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) + self.assertEqual(ticket.tokens, 'a,b') + + def test_ctor_with_time(self): + ticket = self._makeOne('secret', 'userid', 'ip', time='time') + self.assertEqual(ticket.time, 'time') + + def test_digest(self): + ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) + result = ticket.digest() + self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') + + def test_digest_sha512(self): + ticket = self._makeOne( + 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512' + ) + result = ticket.digest() + self.assertEqual( + result, + '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49' + '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9' + 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278', + ) + + def test_cookie_value(self): + ticket = self._makeOne( + 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b') + ) + result = ticket.cookie_value() + self.assertEqual( + result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' + ) + + def test_ipv4(self): + ticket = self._makeOne( + 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256' + ) + result = ticket.cookie_value() + self.assertEqual( + result, + 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b' + '798400ecdade8d76c530000000auserid!', + ) + + def test_ipv6(self): + ticket = self._makeOne( + 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256' + ) + result = ticket.cookie_value() + self.assertEqual( + result, + 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8' + '5becf8760cd7a2fa4910000000auserid!', + ) + + +class TestBadTicket(unittest.TestCase): + def _makeOne(self, msg, expected=None): + from pyramid.security import BadTicket + + return BadTicket(msg, expected) + + def test_it(self): + exc = self._makeOne('msg', expected=True) + self.assertEqual(exc.expected, True) + self.assertTrue(isinstance(exc, Exception)) + + +class Test_parse_ticket(unittest.TestCase): + def _callFUT(self, secret, ticket, ip, hashalg='md5'): + from pyramid.security import parse_ticket + + return parse_ticket(secret, ticket, ip, hashalg) + + def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): + from pyramid.security import BadTicket + + self.assertRaises( + BadTicket, self._callFUT, secret, ticket, ip, hashalg + ) + + def test_bad_timestamp(self): + ticket = 'x' * 64 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_bad_userid_or_data(self): + ticket = 'x' * 32 + '11111111' + 'x' * 10 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_digest_sig_incorrect(self): + ticket = 'x' * 32 + '11111111' + 'a!b!c' + self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') + + def test_correct_with_user_data(self): + ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') + result = self._callFUT('secret', ticket, '0.0.0.0') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) + + def test_correct_with_user_data_sha512(self): + ticket = text_( + '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1' + '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767' + 'ba8a26d02aaeae56599a0000000auserid!a,b!' + ) + result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) + + def test_ipv4(self): + ticket = text_( + 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd' + 'ade8d76c530000000auserid!' + ) + result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256') + self.assertEqual(result, (10, 'userid', [''], '')) + + def test_ipv6(self): + ticket = text_( + 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760' + 'cd7a2fa4910000000auserid!' + ) + result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256') + self.assertEqual(result, (10, 'userid', [''], '')) + + +class TestAuthTktCookieHelper(unittest.TestCase): + def _getTargetClass(self): + from pyramid.security import AuthTktCookieHelper + + return AuthTktCookieHelper + + def _makeOne(self, *arg, **kw): + helper = self._getTargetClass()(*arg, **kw) + auth_tkt = DummyAuthTktModule() + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + return helper + + def _makeRequest(self, cookie=None, ipv6=False): + environ = {'wsgi.version': (1, 0)} + + if ipv6 is False: + environ['REMOTE_ADDR'] = '1.1.1.1' + else: + environ['REMOTE_ADDR'] = '::1' + environ['SERVER_NAME'] = 'localhost' + return DummyRequest(environ, cookie=cookie) + + def _cookieValue(self, cookie): + items = cookie.value.split('/') + D = {} + for item in items: + k, v = item.split('=', 1) + D[k] = v + return D + + def _parseHeaders(self, headers): + return [self._parseHeader(header) for header in headers] + + def _parseHeader(self, header): + cookie = self._parseCookie(header[1]) + return cookie + + def _parseCookie(self, cookie): + cookies = SimpleCookie() + cookies.load(cookie) + return cookies.get('auth_tkt') + + def test_init_cookie_str_reissue_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', reissue_time='invalid value' + ) + + def test_init_cookie_str_timeout_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', timeout='invalid value' + ) + + def test_init_cookie_str_max_age_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', max_age='invalid value' + ) + + def test_identify_nocookie(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_cookie_value_is_None(self): + helper = self._makeOne('secret') + request = self._makeRequest(None) + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_good_cookie_include_ip(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_include_ipv6(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket', ipv6=True) + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '::1') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_dont_include_ip(self): + helper = self._makeOne('secret', include_ip=False) + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_int_useridtype(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'userid_type:int' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 1) + self.assertEqual(result['userdata'], 'userid_type:int') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_nonuseridtype_user_data(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'bogus:int' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], '1') + self.assertEqual(result['userdata'], 'bogus:int') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_unknown_useridtype(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = 'abc' + helper.auth_tkt.user_data = 'userid_type:unknown' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'abc') + self.assertEqual(result['userdata'], 'userid_type:unknown') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_b64str_useridtype(self): + from base64 import b64encode + + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = b64encode(b'encoded').strip() + helper.auth_tkt.user_data = 'userid_type:b64str' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], b'encoded') + self.assertEqual(result['userdata'], 'userid_type:b64str') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_b64unicode_useridtype(self): + from base64 import b64encode + + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip() + helper.auth_tkt.user_data = 'userid_type:b64unicode' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8')) + self.assertEqual(result['userdata'], 'userid_type:b64unicode') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_bad_cookie(self): + helper = self._makeOne('secret', include_ip=True) + helper.auth_tkt.parse_raise = True + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_cookie_timeout(self): + helper = self._makeOne('secret', timeout=1) + self.assertEqual(helper.timeout, 1) + + def test_identify_cookie_str_timeout(self): + helper = self._makeOne('secret', timeout='1') + self.assertEqual(helper.timeout, 1) + + def test_identify_cookie_timeout_aged(self): + import time + + helper = self._makeOne('secret', timeout=10) + now = time.time() + helper.auth_tkt.timestamp = now - 1 + helper.now = now + 10 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertFalse(result) + + def test_identify_cookie_reissue(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + + def test_identify_cookie_str_reissue(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time='0') + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + + def test_identify_cookie_reissue_already_reissued_this_request(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + request._authtkt_reissued = True + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 0) + + def test_identify_cookie_reissue_notyet(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=10) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 0) + + def test_identify_cookie_reissue_revoked_by_forget(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + result = helper.forget(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 0) + + def test_identify_cookie_reissue_revoked_by_remember(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + result = helper.remember(request, 'bob') + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 0) + + def test_identify_cookie_reissue_with_tokens_default(self): + # see https://github.com/Pylons/pyramid/issues#issue/108 + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + auth_tkt = DummyAuthTktModule(tokens=['']) + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](None, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + self.assertTrue("/tokens=/" in response.headerlist[0][1]) + + def test_remember(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_nondefault_samesite(self): + helper = self._makeOne('secret', samesite='Strict') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith( + '; Domain=localhost; Path=/; SameSite=Strict' + ) + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith( + '; Domain=.localhost; Path=/; SameSite=Strict' + ) + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_None_samesite(self): + helper = self._makeOne('secret', samesite=None) + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/')) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/')) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_include_ip(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_path(self): + helper = self._makeOne( + 'secret', include_ip=True, path="/cgi-bin/app.cgi/" + ) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax') + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith( + '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' + ) + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith( + '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' + ) + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_http_only(self): + helper = self._makeOne('secret', include_ip=True, http_only=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue('; HttpOnly' in result[1][1]) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue('; HttpOnly' in result[2][1]) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_secure(self): + helper = self._makeOne('secret', include_ip=True, secure=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue('; secure' in result[0][1]) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue('; secure' in result[1][1]) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue('; secure' in result[2][1]) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_wild_domain_disabled(self): + helper = self._makeOne('secret', wild_domain=False) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 2) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + def test_remember_parent_domain(self): + helper = self._makeOne('secret', parent_domain=True) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith( + '; Domain=.example.com; Path=/; SameSite=Lax' + ) + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + def test_remember_parent_domain_supercedes_wild_domain(self): + helper = self._makeOne('secret', parent_domain=True, wild_domain=True) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + self.assertTrue( + result[0][1].endswith( + '; Domain=.example.com; Path=/; SameSite=Lax' + ) + ) + + def test_remember_explicit_domain(self): + helper = self._makeOne('secret', domain='pyramid.bazinga') + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith( + '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' + ) + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + def test_remember_domain_supercedes_parent_and_wild_domain(self): + helper = self._makeOne( + 'secret', + domain='pyramid.bazinga', + parent_domain=True, + wild_domain=True, + ) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + self.assertTrue( + result[0][1].endswith( + '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' + ) + ) + + def test_remember_binary_userid(self): + import base64 + + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, b'userid') + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual( + val['userid'], text_(base64.b64encode(b'userid').strip()) + ) + self.assertEqual(val['user_data'], 'userid_type:b64str') + + def test_remember_int_userid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 1) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual(val['userid'], '1') + self.assertEqual(val['user_data'], 'userid_type:int') + + def test_remember_unicode_userid(self): + import base64 + + helper = self._makeOne('secret') + request = self._makeRequest() + userid = text_(b'\xc2\xa9', 'utf-8') + result = helper.remember(request, userid) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual( + val['userid'], text_(base64.b64encode(userid.encode('utf-8'))) + ) + self.assertEqual(val['user_data'], 'userid_type:b64unicode') + + def test_remember_insane_userid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + userid = object() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always', RuntimeWarning) + result = helper.remember(request, userid) + self.assertTrue(str(w[-1].message).startswith('userid is of type')) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + value = values[0] + self.assertTrue('userid' in value.value) + + def test_remember_max_age(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid', max_age=500) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + + self.assertEqual(values[0]['max-age'], '500') + self.assertTrue(values[0]['expires']) + + def test_remember_str_max_age(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid', max_age='500') + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + + self.assertEqual(values[0]['max-age'], '500') + self.assertTrue(values[0]['expires']) + + def test_remember_str_max_age_invalid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + self.assertRaises( + ValueError, + helper.remember, + request, + 'userid', + max_age='invalid value', + ) + + def test_remember_tokens(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'other', tokens=('foo', 'bar')) + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[0][1]) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[1][1]) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[2][1]) + + def test_remember_samesite_nondefault(self): + helper = self._makeOne('secret', samesite='Strict') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + cookieval = result[0][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[1][0], 'Set-Cookie') + cookieval = result[1][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[2][0], 'Set-Cookie') + cookieval = result[2][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + def test_remember_samesite_default(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + cookieval = result[0][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[1][0], 'Set-Cookie') + cookieval = result[1][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[2][0], 'Set-Cookie') + cookieval = result[2][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + def test_remember_unicode_but_ascii_token(self): + helper = self._makeOne('secret') + request = self._makeRequest() + la = text_(b'foo', 'utf-8') + result = helper.remember(request, 'other', tokens=(la,)) + # tokens must be str type on both Python 2 and 3 + self.assertTrue("/tokens=foo/" in result[0][1]) + + def test_remember_nonascii_token(self): + helper = self._makeOne('secret') + request = self._makeRequest() + la = text_(b'La Pe\xc3\xb1a', 'utf-8') + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=(la,) + ) + + def test_remember_invalid_token_format(self): + helper = self._makeOne('secret') + request = self._makeRequest() + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=('foo bar',) + ) + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=('1bar',) + ) + + def test_forget(self): + helper = self._makeOne('secret') + request = self._makeRequest() + headers = helper.forget(request) + self.assertEqual(len(headers), 3) + name, value = headers[0] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + name, value = headers[1] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + name, value = headers[2] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + + +class DummyAuthTktModule(object): + def __init__( + self, + timestamp=0, + userid='userid', + tokens=(), + user_data='', + parse_raise=False, + hashalg="md5", + ): + self.timestamp = timestamp + self.userid = userid + self.tokens = tokens + self.user_data = user_data + self.parse_raise = parse_raise + self.hashalg = hashalg + + def parse_ticket(secret, value, remote_addr, hashalg): + self.secret = secret + self.value = value + self.remote_addr = remote_addr + if self.parse_raise: + raise self.BadTicket() + return self.timestamp, self.userid, self.tokens, self.user_data + + self.parse_ticket = parse_ticket + + class AuthTicket(object): + def __init__(self, secret, userid, remote_addr, **kw): + self.secret = secret + self.userid = userid + self.remote_addr = remote_addr + self.kw = kw + + def cookie_value(self): + result = { + 'secret': self.secret, + 'userid': self.userid, + 'remote_addr': self.remote_addr, + } + result.update(self.kw) + tokens = result.pop('tokens', None) + if tokens is not None: + tokens = '|'.join(tokens) + result['tokens'] = tokens + items = sorted(result.items()) + new_items = [] + for k, v in items: + if isinstance(v, bytes): + v = text_(v) + new_items.append((k, v)) + result = '/'.join(['%s=%s' % (k, v) for k, v in new_items]) + return result + + self.AuthTicket = AuthTicket + + class BadTicket(Exception): + pass + + +class DummyCookies(object): + def __init__(self, cookie): + self.cookie = cookie + + def get(self, name): + return self.cookie + + +class DummyRequest: + domain = 'localhost' + + def __init__(self, environ=None, session=None, registry=None, cookie=None): + self.environ = environ or {} + self.session = session or {} + self.registry = registry + self.callbacks = [] + self.cookies = DummyCookies(cookie) + + def add_response_callback(self, callback): + self.callbacks.append(callback) + + +class DummyResponse: + def __init__(self): + self.headerlist = [] -- cgit v1.2.3 From 47f8935bb5423718ec293bba4709307be7d2f51c Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Mon, 15 Apr 2019 19:04:35 -0700 Subject: Stringify identity in legacy authenticated_userid. --- tests/test_security.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index b66632baa..97aec42c6 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -357,8 +357,9 @@ class TestAuthenticatedUserId(unittest.TestCase): def test_with_security_policy(self): request = _makeRequest() - _registerSecurityPolicy(request.registry, 'yo') - self.assertEqual(request.authenticated_userid, 'yo') + # Ensure the identity is stringified. + _registerSecurityPolicy(request.registry, 123) + self.assertEqual(request.authenticated_userid, '123') def test_with_authentication_policy_no_reg_on_request(self): from pyramid.threadlocal import get_current_registry -- cgit v1.2.3 From f4c6c993ded900b4e32d1dd49207ca1e18b11336 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Mon, 15 Apr 2019 19:05:47 -0700 Subject: Revert "Migrate AuthTktCookieHelper to pyramid.security." This reverts commit 9f267dd842c5e93336f0392f2809da75a716039a. --- tests/test_authentication.py | 1000 ++++++++++++++++++++++++++++++++++++++++- tests/test_security.py | 1008 ------------------------------------------ 2 files changed, 999 insertions(+), 1009 deletions(-) (limited to 'tests') diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 89cf9866d..8671eba05 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -1,7 +1,8 @@ +from http.cookies import SimpleCookie import unittest import warnings from pyramid import testing -from pyramid.util import bytes_ +from pyramid.util import text_, bytes_ class TestCallbackAuthenticationPolicyDebugging(unittest.TestCase): @@ -663,6 +664,926 @@ class TestAuthTktAuthenticationPolicy(unittest.TestCase): verifyObject(IAuthenticationPolicy, self._makeOne(None, None)) +class TestAuthTktCookieHelper(unittest.TestCase): + def _getTargetClass(self): + from pyramid.authentication import AuthTktCookieHelper + + return AuthTktCookieHelper + + def _makeOne(self, *arg, **kw): + helper = self._getTargetClass()(*arg, **kw) + # laziness after moving auth_tkt classes and funcs into + # authentication module + auth_tkt = DummyAuthTktModule() + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + return helper + + def _makeRequest(self, cookie=None, ipv6=False): + environ = {'wsgi.version': (1, 0)} + + if ipv6 is False: + environ['REMOTE_ADDR'] = '1.1.1.1' + else: + environ['REMOTE_ADDR'] = '::1' + environ['SERVER_NAME'] = 'localhost' + return DummyRequest(environ, cookie=cookie) + + def _cookieValue(self, cookie): + items = cookie.value.split('/') + D = {} + for item in items: + k, v = item.split('=', 1) + D[k] = v + return D + + def _parseHeaders(self, headers): + return [self._parseHeader(header) for header in headers] + + def _parseHeader(self, header): + cookie = self._parseCookie(header[1]) + return cookie + + def _parseCookie(self, cookie): + cookies = SimpleCookie() + cookies.load(cookie) + return cookies.get('auth_tkt') + + def test_init_cookie_str_reissue_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', reissue_time='invalid value' + ) + + def test_init_cookie_str_timeout_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', timeout='invalid value' + ) + + def test_init_cookie_str_max_age_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', max_age='invalid value' + ) + + def test_identify_nocookie(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_cookie_value_is_None(self): + helper = self._makeOne('secret') + request = self._makeRequest(None) + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_good_cookie_include_ip(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_include_ipv6(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket', ipv6=True) + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '::1') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_dont_include_ip(self): + helper = self._makeOne('secret', include_ip=False) + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_int_useridtype(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'userid_type:int' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 1) + self.assertEqual(result['userdata'], 'userid_type:int') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_nonuseridtype_user_data(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'bogus:int' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], '1') + self.assertEqual(result['userdata'], 'bogus:int') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_unknown_useridtype(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = 'abc' + helper.auth_tkt.user_data = 'userid_type:unknown' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'abc') + self.assertEqual(result['userdata'], 'userid_type:unknown') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_b64str_useridtype(self): + from base64 import b64encode + + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = b64encode(b'encoded').strip() + helper.auth_tkt.user_data = 'userid_type:b64str' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], b'encoded') + self.assertEqual(result['userdata'], 'userid_type:b64str') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_b64unicode_useridtype(self): + from base64 import b64encode + + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip() + helper.auth_tkt.user_data = 'userid_type:b64unicode' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8')) + self.assertEqual(result['userdata'], 'userid_type:b64unicode') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_bad_cookie(self): + helper = self._makeOne('secret', include_ip=True) + helper.auth_tkt.parse_raise = True + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_cookie_timeout(self): + helper = self._makeOne('secret', timeout=1) + self.assertEqual(helper.timeout, 1) + + def test_identify_cookie_str_timeout(self): + helper = self._makeOne('secret', timeout='1') + self.assertEqual(helper.timeout, 1) + + def test_identify_cookie_timeout_aged(self): + import time + + helper = self._makeOne('secret', timeout=10) + now = time.time() + helper.auth_tkt.timestamp = now - 1 + helper.now = now + 10 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertFalse(result) + + def test_identify_cookie_reissue(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + + def test_identify_cookie_str_reissue(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time='0') + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + + def test_identify_cookie_reissue_already_reissued_this_request(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + request._authtkt_reissued = True + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 0) + + def test_identify_cookie_reissue_notyet(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=10) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 0) + + def test_identify_cookie_reissue_revoked_by_forget(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + result = helper.forget(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 0) + + def test_identify_cookie_reissue_revoked_by_remember(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + result = helper.remember(request, 'bob') + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 0) + + def test_identify_cookie_reissue_with_tokens_default(self): + # see https://github.com/Pylons/pyramid/issues#issue/108 + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + auth_tkt = DummyAuthTktModule(tokens=['']) + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](None, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + self.assertTrue("/tokens=/" in response.headerlist[0][1]) + + def test_remember(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_nondefault_samesite(self): + helper = self._makeOne('secret', samesite='Strict') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith( + '; Domain=localhost; Path=/; SameSite=Strict' + ) + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith( + '; Domain=.localhost; Path=/; SameSite=Strict' + ) + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_None_samesite(self): + helper = self._makeOne('secret', samesite=None) + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/')) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/')) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_include_ip(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_path(self): + helper = self._makeOne( + 'secret', include_ip=True, path="/cgi-bin/app.cgi/" + ) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax') + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith( + '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' + ) + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith( + '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' + ) + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_http_only(self): + helper = self._makeOne('secret', include_ip=True, http_only=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue('; HttpOnly' in result[1][1]) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue('; HttpOnly' in result[2][1]) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_secure(self): + helper = self._makeOne('secret', include_ip=True, secure=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue('; secure' in result[0][1]) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue('; secure' in result[1][1]) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue('; secure' in result[2][1]) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_wild_domain_disabled(self): + helper = self._makeOne('secret', wild_domain=False) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 2) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + def test_remember_parent_domain(self): + helper = self._makeOne('secret', parent_domain=True) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith( + '; Domain=.example.com; Path=/; SameSite=Lax' + ) + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + def test_remember_parent_domain_supercedes_wild_domain(self): + helper = self._makeOne('secret', parent_domain=True, wild_domain=True) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + self.assertTrue( + result[0][1].endswith( + '; Domain=.example.com; Path=/; SameSite=Lax' + ) + ) + + def test_remember_explicit_domain(self): + helper = self._makeOne('secret', domain='pyramid.bazinga') + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith( + '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' + ) + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + def test_remember_domain_supercedes_parent_and_wild_domain(self): + helper = self._makeOne( + 'secret', + domain='pyramid.bazinga', + parent_domain=True, + wild_domain=True, + ) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + self.assertTrue( + result[0][1].endswith( + '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' + ) + ) + + def test_remember_binary_userid(self): + import base64 + + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, b'userid') + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual( + val['userid'], text_(base64.b64encode(b'userid').strip()) + ) + self.assertEqual(val['user_data'], 'userid_type:b64str') + + def test_remember_int_userid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 1) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual(val['userid'], '1') + self.assertEqual(val['user_data'], 'userid_type:int') + + def test_remember_unicode_userid(self): + import base64 + + helper = self._makeOne('secret') + request = self._makeRequest() + userid = text_(b'\xc2\xa9', 'utf-8') + result = helper.remember(request, userid) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual( + val['userid'], text_(base64.b64encode(userid.encode('utf-8'))) + ) + self.assertEqual(val['user_data'], 'userid_type:b64unicode') + + def test_remember_insane_userid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + userid = object() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always', RuntimeWarning) + result = helper.remember(request, userid) + self.assertTrue(str(w[-1].message).startswith('userid is of type')) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + value = values[0] + self.assertTrue('userid' in value.value) + + def test_remember_max_age(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid', max_age=500) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + + self.assertEqual(values[0]['max-age'], '500') + self.assertTrue(values[0]['expires']) + + def test_remember_str_max_age(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid', max_age='500') + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + + self.assertEqual(values[0]['max-age'], '500') + self.assertTrue(values[0]['expires']) + + def test_remember_str_max_age_invalid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + self.assertRaises( + ValueError, + helper.remember, + request, + 'userid', + max_age='invalid value', + ) + + def test_remember_tokens(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'other', tokens=('foo', 'bar')) + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[0][1]) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[1][1]) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[2][1]) + + def test_remember_samesite_nondefault(self): + helper = self._makeOne('secret', samesite='Strict') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + cookieval = result[0][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[1][0], 'Set-Cookie') + cookieval = result[1][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[2][0], 'Set-Cookie') + cookieval = result[2][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + def test_remember_samesite_default(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + cookieval = result[0][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[1][0], 'Set-Cookie') + cookieval = result[1][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[2][0], 'Set-Cookie') + cookieval = result[2][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + def test_remember_unicode_but_ascii_token(self): + helper = self._makeOne('secret') + request = self._makeRequest() + la = text_(b'foo', 'utf-8') + result = helper.remember(request, 'other', tokens=(la,)) + # tokens must be str type on both Python 2 and 3 + self.assertTrue("/tokens=foo/" in result[0][1]) + + def test_remember_nonascii_token(self): + helper = self._makeOne('secret') + request = self._makeRequest() + la = text_(b'La Pe\xc3\xb1a', 'utf-8') + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=(la,) + ) + + def test_remember_invalid_token_format(self): + helper = self._makeOne('secret') + request = self._makeRequest() + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=('foo bar',) + ) + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=('1bar',) + ) + + def test_forget(self): + helper = self._makeOne('secret') + request = self._makeRequest() + headers = helper.forget(request) + self.assertEqual(len(headers), 3) + name, value = headers[0] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + name, value = headers[1] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + name, value = headers[2] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + + +class TestAuthTicket(unittest.TestCase): + def _makeOne(self, *arg, **kw): + from pyramid.authentication import AuthTicket + + return AuthTicket(*arg, **kw) + + def test_ctor_with_tokens(self): + ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) + self.assertEqual(ticket.tokens, 'a,b') + + def test_ctor_with_time(self): + ticket = self._makeOne('secret', 'userid', 'ip', time='time') + self.assertEqual(ticket.time, 'time') + + def test_digest(self): + ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) + result = ticket.digest() + self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') + + def test_digest_sha512(self): + ticket = self._makeOne( + 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512' + ) + result = ticket.digest() + self.assertEqual( + result, + '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49' + '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9' + 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278', + ) + + def test_cookie_value(self): + ticket = self._makeOne( + 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b') + ) + result = ticket.cookie_value() + self.assertEqual( + result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' + ) + + def test_ipv4(self): + ticket = self._makeOne( + 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256' + ) + result = ticket.cookie_value() + self.assertEqual( + result, + 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b' + '798400ecdade8d76c530000000auserid!', + ) + + def test_ipv6(self): + ticket = self._makeOne( + 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256' + ) + result = ticket.cookie_value() + self.assertEqual( + result, + 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8' + '5becf8760cd7a2fa4910000000auserid!', + ) + + +class TestBadTicket(unittest.TestCase): + def _makeOne(self, msg, expected=None): + from pyramid.authentication import BadTicket + + return BadTicket(msg, expected) + + def test_it(self): + exc = self._makeOne('msg', expected=True) + self.assertEqual(exc.expected, True) + self.assertTrue(isinstance(exc, Exception)) + + +class Test_parse_ticket(unittest.TestCase): + def _callFUT(self, secret, ticket, ip, hashalg='md5'): + from pyramid.authentication import parse_ticket + + return parse_ticket(secret, ticket, ip, hashalg) + + def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): + from pyramid.authentication import BadTicket + + self.assertRaises( + BadTicket, self._callFUT, secret, ticket, ip, hashalg + ) + + def test_bad_timestamp(self): + ticket = 'x' * 64 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_bad_userid_or_data(self): + ticket = 'x' * 32 + '11111111' + 'x' * 10 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_digest_sig_incorrect(self): + ticket = 'x' * 32 + '11111111' + 'a!b!c' + self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') + + def test_correct_with_user_data(self): + ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') + result = self._callFUT('secret', ticket, '0.0.0.0') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) + + def test_correct_with_user_data_sha512(self): + ticket = text_( + '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1' + '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767' + 'ba8a26d02aaeae56599a0000000auserid!a,b!' + ) + result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) + + def test_ipv4(self): + ticket = text_( + 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd' + 'ade8d76c530000000auserid!' + ) + result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256') + self.assertEqual(result, (10, 'userid', [''], '')) + + def test_ipv6(self): + ticket = text_( + 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760' + 'cd7a2fa4910000000auserid!' + ) + result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256') + self.assertEqual(result, (10, 'userid', [''], '')) + + class TestSessionAuthenticationPolicy(unittest.TestCase): def _getTargetClass(self): from pyramid.authentication import SessionAuthenticationPolicy @@ -990,6 +1911,14 @@ class DummyContext: pass +class DummyCookies(object): + def __init__(self, cookie): + self.cookie = cookie + + def get(self, name): + return self.cookie + + class DummyRequest: domain = 'localhost' @@ -998,6 +1927,10 @@ class DummyRequest: self.session = session or {} self.registry = registry self.callbacks = [] + self.cookies = DummyCookies(cookie) + + def add_response_callback(self, callback): + self.callbacks.append(callback) class DummyWhoPlugin: @@ -1021,3 +1954,68 @@ class DummyCookieHelper: def forget(self, *arg): return [] + + +class DummyAuthTktModule(object): + def __init__( + self, + timestamp=0, + userid='userid', + tokens=(), + user_data='', + parse_raise=False, + hashalg="md5", + ): + self.timestamp = timestamp + self.userid = userid + self.tokens = tokens + self.user_data = user_data + self.parse_raise = parse_raise + self.hashalg = hashalg + + def parse_ticket(secret, value, remote_addr, hashalg): + self.secret = secret + self.value = value + self.remote_addr = remote_addr + if self.parse_raise: + raise self.BadTicket() + return self.timestamp, self.userid, self.tokens, self.user_data + + self.parse_ticket = parse_ticket + + class AuthTicket(object): + def __init__(self, secret, userid, remote_addr, **kw): + self.secret = secret + self.userid = userid + self.remote_addr = remote_addr + self.kw = kw + + def cookie_value(self): + result = { + 'secret': self.secret, + 'userid': self.userid, + 'remote_addr': self.remote_addr, + } + result.update(self.kw) + tokens = result.pop('tokens', None) + if tokens is not None: + tokens = '|'.join(tokens) + result['tokens'] = tokens + items = sorted(result.items()) + new_items = [] + for k, v in items: + if isinstance(v, bytes): + v = text_(v) + new_items.append((k, v)) + result = '/'.join(['%s=%s' % (k, v) for k, v in new_items]) + return result + + self.AuthTicket = AuthTicket + + class BadTicket(Exception): + pass + + +class DummyResponse: + def __init__(self): + self.headerlist = [] diff --git a/tests/test_security.py b/tests/test_security.py index 97aec42c6..f14159156 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -1,9 +1,6 @@ import unittest -import warnings -from http.cookies import SimpleCookie from pyramid import testing -from pyramid.util import text_ class TestAllPermissionsList(unittest.TestCase): @@ -940,1008 +937,3 @@ class TestSessionAuthenticationHelper(unittest.TestCase): result = helper.forget(request) self.assertEqual(request.session.get('userid'), None) self.assertEqual(result, []) - - -class TestAuthTicket(unittest.TestCase): - def _makeOne(self, *arg, **kw): - from pyramid.security import AuthTicket - - return AuthTicket(*arg, **kw) - - def test_ctor_with_tokens(self): - ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) - self.assertEqual(ticket.tokens, 'a,b') - - def test_ctor_with_time(self): - ticket = self._makeOne('secret', 'userid', 'ip', time='time') - self.assertEqual(ticket.time, 'time') - - def test_digest(self): - ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) - result = ticket.digest() - self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') - - def test_digest_sha512(self): - ticket = self._makeOne( - 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512' - ) - result = ticket.digest() - self.assertEqual( - result, - '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49' - '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9' - 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278', - ) - - def test_cookie_value(self): - ticket = self._makeOne( - 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b') - ) - result = ticket.cookie_value() - self.assertEqual( - result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' - ) - - def test_ipv4(self): - ticket = self._makeOne( - 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256' - ) - result = ticket.cookie_value() - self.assertEqual( - result, - 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b' - '798400ecdade8d76c530000000auserid!', - ) - - def test_ipv6(self): - ticket = self._makeOne( - 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256' - ) - result = ticket.cookie_value() - self.assertEqual( - result, - 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8' - '5becf8760cd7a2fa4910000000auserid!', - ) - - -class TestBadTicket(unittest.TestCase): - def _makeOne(self, msg, expected=None): - from pyramid.security import BadTicket - - return BadTicket(msg, expected) - - def test_it(self): - exc = self._makeOne('msg', expected=True) - self.assertEqual(exc.expected, True) - self.assertTrue(isinstance(exc, Exception)) - - -class Test_parse_ticket(unittest.TestCase): - def _callFUT(self, secret, ticket, ip, hashalg='md5'): - from pyramid.security import parse_ticket - - return parse_ticket(secret, ticket, ip, hashalg) - - def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): - from pyramid.security import BadTicket - - self.assertRaises( - BadTicket, self._callFUT, secret, ticket, ip, hashalg - ) - - def test_bad_timestamp(self): - ticket = 'x' * 64 - self._assertRaisesBadTicket('secret', ticket, 'ip') - - def test_bad_userid_or_data(self): - ticket = 'x' * 32 + '11111111' + 'x' * 10 - self._assertRaisesBadTicket('secret', ticket, 'ip') - - def test_digest_sig_incorrect(self): - ticket = 'x' * 32 + '11111111' + 'a!b!c' - self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') - - def test_correct_with_user_data(self): - ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') - result = self._callFUT('secret', ticket, '0.0.0.0') - self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) - - def test_correct_with_user_data_sha512(self): - ticket = text_( - '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1' - '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767' - 'ba8a26d02aaeae56599a0000000auserid!a,b!' - ) - result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') - self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) - - def test_ipv4(self): - ticket = text_( - 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd' - 'ade8d76c530000000auserid!' - ) - result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256') - self.assertEqual(result, (10, 'userid', [''], '')) - - def test_ipv6(self): - ticket = text_( - 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760' - 'cd7a2fa4910000000auserid!' - ) - result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256') - self.assertEqual(result, (10, 'userid', [''], '')) - - -class TestAuthTktCookieHelper(unittest.TestCase): - def _getTargetClass(self): - from pyramid.security import AuthTktCookieHelper - - return AuthTktCookieHelper - - def _makeOne(self, *arg, **kw): - helper = self._getTargetClass()(*arg, **kw) - auth_tkt = DummyAuthTktModule() - helper.auth_tkt = auth_tkt - helper.AuthTicket = auth_tkt.AuthTicket - helper.parse_ticket = auth_tkt.parse_ticket - helper.BadTicket = auth_tkt.BadTicket - return helper - - def _makeRequest(self, cookie=None, ipv6=False): - environ = {'wsgi.version': (1, 0)} - - if ipv6 is False: - environ['REMOTE_ADDR'] = '1.1.1.1' - else: - environ['REMOTE_ADDR'] = '::1' - environ['SERVER_NAME'] = 'localhost' - return DummyRequest(environ, cookie=cookie) - - def _cookieValue(self, cookie): - items = cookie.value.split('/') - D = {} - for item in items: - k, v = item.split('=', 1) - D[k] = v - return D - - def _parseHeaders(self, headers): - return [self._parseHeader(header) for header in headers] - - def _parseHeader(self, header): - cookie = self._parseCookie(header[1]) - return cookie - - def _parseCookie(self, cookie): - cookies = SimpleCookie() - cookies.load(cookie) - return cookies.get('auth_tkt') - - def test_init_cookie_str_reissue_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', reissue_time='invalid value' - ) - - def test_init_cookie_str_timeout_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', timeout='invalid value' - ) - - def test_init_cookie_str_max_age_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', max_age='invalid value' - ) - - def test_identify_nocookie(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_cookie_value_is_None(self): - helper = self._makeOne('secret') - request = self._makeRequest(None) - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_good_cookie_include_ip(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_include_ipv6(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest('ticket', ipv6=True) - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '::1') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_dont_include_ip(self): - helper = self._makeOne('secret', include_ip=False) - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_int_useridtype(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = '1' - helper.auth_tkt.user_data = 'userid_type:int' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 1) - self.assertEqual(result['userdata'], 'userid_type:int') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_nonuseridtype_user_data(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = '1' - helper.auth_tkt.user_data = 'bogus:int' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], '1') - self.assertEqual(result['userdata'], 'bogus:int') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_unknown_useridtype(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = 'abc' - helper.auth_tkt.user_data = 'userid_type:unknown' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'abc') - self.assertEqual(result['userdata'], 'userid_type:unknown') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_b64str_useridtype(self): - from base64 import b64encode - - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = b64encode(b'encoded').strip() - helper.auth_tkt.user_data = 'userid_type:b64str' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], b'encoded') - self.assertEqual(result['userdata'], 'userid_type:b64str') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_b64unicode_useridtype(self): - from base64 import b64encode - - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip() - helper.auth_tkt.user_data = 'userid_type:b64unicode' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8')) - self.assertEqual(result['userdata'], 'userid_type:b64unicode') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_bad_cookie(self): - helper = self._makeOne('secret', include_ip=True) - helper.auth_tkt.parse_raise = True - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_cookie_timeout(self): - helper = self._makeOne('secret', timeout=1) - self.assertEqual(helper.timeout, 1) - - def test_identify_cookie_str_timeout(self): - helper = self._makeOne('secret', timeout='1') - self.assertEqual(helper.timeout, 1) - - def test_identify_cookie_timeout_aged(self): - import time - - helper = self._makeOne('secret', timeout=10) - now = time.time() - helper.auth_tkt.timestamp = now - 1 - helper.now = now + 10 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertFalse(result) - - def test_identify_cookie_reissue(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - - def test_identify_cookie_str_reissue(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time='0') - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - - def test_identify_cookie_reissue_already_reissued_this_request(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - request._authtkt_reissued = True - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 0) - - def test_identify_cookie_reissue_notyet(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=10) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 0) - - def test_identify_cookie_reissue_revoked_by_forget(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - result = helper.forget(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 0) - - def test_identify_cookie_reissue_revoked_by_remember(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - result = helper.remember(request, 'bob') - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 0) - - def test_identify_cookie_reissue_with_tokens_default(self): - # see https://github.com/Pylons/pyramid/issues#issue/108 - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - auth_tkt = DummyAuthTktModule(tokens=['']) - helper.auth_tkt = auth_tkt - helper.AuthTicket = auth_tkt.AuthTicket - helper.parse_ticket = auth_tkt.parse_ticket - helper.BadTicket = auth_tkt.BadTicket - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](None, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - self.assertTrue("/tokens=/" in response.headerlist[0][1]) - - def test_remember(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_nondefault_samesite(self): - helper = self._makeOne('secret', samesite='Strict') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith( - '; Domain=localhost; Path=/; SameSite=Strict' - ) - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith( - '; Domain=.localhost; Path=/; SameSite=Strict' - ) - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_None_samesite(self): - helper = self._makeOne('secret', samesite=None) - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/')) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/')) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_include_ip(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_path(self): - helper = self._makeOne( - 'secret', include_ip=True, path="/cgi-bin/app.cgi/" - ) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax') - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith( - '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' - ) - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith( - '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' - ) - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_http_only(self): - helper = self._makeOne('secret', include_ip=True, http_only=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue('; HttpOnly' in result[1][1]) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue('; HttpOnly' in result[2][1]) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_secure(self): - helper = self._makeOne('secret', include_ip=True, secure=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue('; secure' in result[0][1]) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue('; secure' in result[1][1]) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue('; secure' in result[2][1]) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_wild_domain_disabled(self): - helper = self._makeOne('secret', wild_domain=False) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 2) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - def test_remember_parent_domain(self): - helper = self._makeOne('secret', parent_domain=True) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith( - '; Domain=.example.com; Path=/; SameSite=Lax' - ) - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - def test_remember_parent_domain_supercedes_wild_domain(self): - helper = self._makeOne('secret', parent_domain=True, wild_domain=True) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - self.assertTrue( - result[0][1].endswith( - '; Domain=.example.com; Path=/; SameSite=Lax' - ) - ) - - def test_remember_explicit_domain(self): - helper = self._makeOne('secret', domain='pyramid.bazinga') - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith( - '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' - ) - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - def test_remember_domain_supercedes_parent_and_wild_domain(self): - helper = self._makeOne( - 'secret', - domain='pyramid.bazinga', - parent_domain=True, - wild_domain=True, - ) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - self.assertTrue( - result[0][1].endswith( - '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' - ) - ) - - def test_remember_binary_userid(self): - import base64 - - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, b'userid') - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual( - val['userid'], text_(base64.b64encode(b'userid').strip()) - ) - self.assertEqual(val['user_data'], 'userid_type:b64str') - - def test_remember_int_userid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 1) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual(val['userid'], '1') - self.assertEqual(val['user_data'], 'userid_type:int') - - def test_remember_unicode_userid(self): - import base64 - - helper = self._makeOne('secret') - request = self._makeRequest() - userid = text_(b'\xc2\xa9', 'utf-8') - result = helper.remember(request, userid) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual( - val['userid'], text_(base64.b64encode(userid.encode('utf-8'))) - ) - self.assertEqual(val['user_data'], 'userid_type:b64unicode') - - def test_remember_insane_userid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - userid = object() - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', RuntimeWarning) - result = helper.remember(request, userid) - self.assertTrue(str(w[-1].message).startswith('userid is of type')) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - value = values[0] - self.assertTrue('userid' in value.value) - - def test_remember_max_age(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid', max_age=500) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - - self.assertEqual(values[0]['max-age'], '500') - self.assertTrue(values[0]['expires']) - - def test_remember_str_max_age(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid', max_age='500') - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - - self.assertEqual(values[0]['max-age'], '500') - self.assertTrue(values[0]['expires']) - - def test_remember_str_max_age_invalid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - self.assertRaises( - ValueError, - helper.remember, - request, - 'userid', - max_age='invalid value', - ) - - def test_remember_tokens(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'other', tokens=('foo', 'bar')) - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[0][1]) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[1][1]) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[2][1]) - - def test_remember_samesite_nondefault(self): - helper = self._makeOne('secret', samesite='Strict') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - cookieval = result[0][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[1][0], 'Set-Cookie') - cookieval = result[1][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[2][0], 'Set-Cookie') - cookieval = result[2][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - def test_remember_samesite_default(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - cookieval = result[0][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[1][0], 'Set-Cookie') - cookieval = result[1][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[2][0], 'Set-Cookie') - cookieval = result[2][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - def test_remember_unicode_but_ascii_token(self): - helper = self._makeOne('secret') - request = self._makeRequest() - la = text_(b'foo', 'utf-8') - result = helper.remember(request, 'other', tokens=(la,)) - # tokens must be str type on both Python 2 and 3 - self.assertTrue("/tokens=foo/" in result[0][1]) - - def test_remember_nonascii_token(self): - helper = self._makeOne('secret') - request = self._makeRequest() - la = text_(b'La Pe\xc3\xb1a', 'utf-8') - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=(la,) - ) - - def test_remember_invalid_token_format(self): - helper = self._makeOne('secret') - request = self._makeRequest() - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=('foo bar',) - ) - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=('1bar',) - ) - - def test_forget(self): - helper = self._makeOne('secret') - request = self._makeRequest() - headers = helper.forget(request) - self.assertEqual(len(headers), 3) - name, value = headers[0] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - name, value = headers[1] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - name, value = headers[2] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - - -class DummyAuthTktModule(object): - def __init__( - self, - timestamp=0, - userid='userid', - tokens=(), - user_data='', - parse_raise=False, - hashalg="md5", - ): - self.timestamp = timestamp - self.userid = userid - self.tokens = tokens - self.user_data = user_data - self.parse_raise = parse_raise - self.hashalg = hashalg - - def parse_ticket(secret, value, remote_addr, hashalg): - self.secret = secret - self.value = value - self.remote_addr = remote_addr - if self.parse_raise: - raise self.BadTicket() - return self.timestamp, self.userid, self.tokens, self.user_data - - self.parse_ticket = parse_ticket - - class AuthTicket(object): - def __init__(self, secret, userid, remote_addr, **kw): - self.secret = secret - self.userid = userid - self.remote_addr = remote_addr - self.kw = kw - - def cookie_value(self): - result = { - 'secret': self.secret, - 'userid': self.userid, - 'remote_addr': self.remote_addr, - } - result.update(self.kw) - tokens = result.pop('tokens', None) - if tokens is not None: - tokens = '|'.join(tokens) - result['tokens'] = tokens - items = sorted(result.items()) - new_items = [] - for k, v in items: - if isinstance(v, bytes): - v = text_(v) - new_items.append((k, v)) - result = '/'.join(['%s=%s' % (k, v) for k, v in new_items]) - return result - - self.AuthTicket = AuthTicket - - class BadTicket(Exception): - pass - - -class DummyCookies(object): - def __init__(self, cookie): - self.cookie = cookie - - def get(self, name): - return self.cookie - - -class DummyRequest: - domain = 'localhost' - - def __init__(self, environ=None, session=None, registry=None, cookie=None): - self.environ = environ or {} - self.session = session or {} - self.registry = registry - self.callbacks = [] - self.cookies = DummyCookies(cookie) - - def add_response_callback(self, callback): - self.callbacks.append(callback) - - -class DummyResponse: - def __init__(self): - self.headerlist = [] -- cgit v1.2.3 From d6e543bc01d2f1aa3bb29f005171911f6f09da02 Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Mon, 15 Apr 2019 19:14:42 -0700 Subject: Move SessionAuthenticationHelper to pyramid.authentication. --- tests/test_authentication.py | 50 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_security.py | 50 -------------------------------------------- 2 files changed, 50 insertions(+), 50 deletions(-) (limited to 'tests') diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 8671eba05..710e87423 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -1693,6 +1693,56 @@ class TestSessionAuthenticationPolicy(unittest.TestCase): self.assertEqual(result, []) +class TestSessionAuthenticationHelper(unittest.TestCase): + def _makeRequest(self, session=None): + from types import SimpleNamespace + + if session is None: + session = dict() + return SimpleNamespace(session=session) + + def _makeOne(self, prefix=''): + from pyramid.authentication import SessionAuthenticationHelper + + return SessionAuthenticationHelper(prefix=prefix) + + def test_identify(self): + request = self._makeRequest({'userid': 'fred'}) + helper = self._makeOne() + self.assertEqual(helper.identify(request), 'fred') + + def test_identify_with_prefix(self): + request = self._makeRequest({'foo.userid': 'fred'}) + helper = self._makeOne(prefix='foo.') + self.assertEqual(helper.identify(request), 'fred') + + def test_identify_none(self): + request = self._makeRequest() + helper = self._makeOne() + self.assertEqual(helper.identify(request), None) + + def test_remember(self): + request = self._makeRequest() + helper = self._makeOne() + result = helper.remember(request, 'fred') + self.assertEqual(request.session.get('userid'), 'fred') + self.assertEqual(result, []) + + def test_forget(self): + request = self._makeRequest({'userid': 'fred'}) + helper = self._makeOne() + result = helper.forget(request) + self.assertEqual(request.session.get('userid'), None) + self.assertEqual(result, []) + + def test_forget_no_identity(self): + request = self._makeRequest() + helper = self._makeOne() + result = helper.forget(request) + self.assertEqual(request.session.get('userid'), None) + self.assertEqual(result, []) + + class TestBasicAuthAuthenticationPolicy(unittest.TestCase): def _getTargetClass(self): from pyramid.authentication import BasicAuthAuthenticationPolicy as cls diff --git a/tests/test_security.py b/tests/test_security.py index f14159156..ecd6a088b 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -887,53 +887,3 @@ GUEST_PERMS = (VIEW, COMMENT) MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) - - -class TestSessionAuthenticationHelper(unittest.TestCase): - def _makeRequest(self, session=None): - from types import SimpleNamespace - - if session is None: - session = dict() - return SimpleNamespace(session=session) - - def _makeOne(self, prefix=''): - from pyramid.security import SessionAuthenticationHelper - - return SessionAuthenticationHelper(prefix=prefix) - - def test_identify(self): - request = self._makeRequest({'userid': 'fred'}) - helper = self._makeOne() - self.assertEqual(helper.identify(request), 'fred') - - def test_identify_with_prefix(self): - request = self._makeRequest({'foo.userid': 'fred'}) - helper = self._makeOne(prefix='foo.') - self.assertEqual(helper.identify(request), 'fred') - - def test_identify_none(self): - request = self._makeRequest() - helper = self._makeOne() - self.assertEqual(helper.identify(request), None) - - def test_remember(self): - request = self._makeRequest() - helper = self._makeOne() - result = helper.remember(request, 'fred') - self.assertEqual(request.session.get('userid'), 'fred') - self.assertEqual(result, []) - - def test_forget(self): - request = self._makeRequest({'userid': 'fred'}) - helper = self._makeOne() - result = helper.forget(request) - self.assertEqual(request.session.get('userid'), None) - self.assertEqual(result, []) - - def test_forget_no_identity(self): - request = self._makeRequest() - helper = self._makeOne() - result = helper.forget(request) - self.assertEqual(request.session.get('userid'), None) - self.assertEqual(result, []) -- cgit v1.2.3 From 5497c0f7166308031b3cc3ce2510d22eb214b2ef Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Mon, 15 Apr 2019 19:32:11 -0700 Subject: Move ACLHelper to pyramid.authorizations. --- tests/test_authorization.py | 261 +++++++++++++++++++++++++++++++++++++++++ tests/test_security.py | 275 -------------------------------------------- 2 files changed, 261 insertions(+), 275 deletions(-) (limited to 'tests') diff --git a/tests/test_authorization.py b/tests/test_authorization.py index efb84b203..399b3da60 100644 --- a/tests/test_authorization.py +++ b/tests/test_authorization.py @@ -272,6 +272,267 @@ class TestACLAuthorizationPolicy(unittest.TestCase): self.assertTrue(result) +class TestACLHelper(unittest.TestCase): + def test_no_acl(self): + from pyramid.authorization import ACLHelper + + context = DummyContext() + helper = ACLHelper() + result = helper.permits(context, ['foo'], 'permission') + self.assertEqual(result, False) + self.assertEqual(result.ace, '') + self.assertEqual( + result.acl, '' + ) + self.assertEqual(result.permission, 'permission') + self.assertEqual(result.principals, ['foo']) + self.assertEqual(result.context, context) + + def test_acl(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Deny + from pyramid.security import Allow + from pyramid.security import Everyone + from pyramid.security import Authenticated + from pyramid.security import ALL_PERMISSIONS + from pyramid.security import DENY_ALL + + helper = ACLHelper() + root = DummyContext() + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [(Allow, Authenticated, VIEW)] + community.__acl__ = [ + (Allow, 'fred', ALL_PERMISSIONS), + (Allow, 'wilma', VIEW), + DENY_ALL, + ] + blog.__acl__ = [ + (Allow, 'barney', MEMBER_PERMS), + (Allow, 'wilma', VIEW), + ] + + result = helper.permits( + blog, [Everyone, Authenticated, 'wilma'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) + self.assertEqual(result.acl, blog.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'wilma'], 'delete' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'fred'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + result = helper.permits( + blog, [Everyone, Authenticated, 'fred'], 'doesntevenexistyet' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'barney'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) + result = helper.permits( + blog, [Everyone, Authenticated, 'barney'], 'administer' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + root, [Everyone, Authenticated, 'someguy'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) + result = helper.permits( + blog, [Everyone, Authenticated, 'someguy'], 'view' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits(root, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, '') + self.assertEqual(result.acl, root.__acl__) + + context = DummyContext() + result = helper.permits(context, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.ace, '') + self.assertEqual( + result.acl, '' + ) + + def test_string_permissions_in_acl(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + root = DummyContext() + root.__acl__ = [(Allow, 'wilma', 'view_stuff')] + + result = helper.permits(root, ['wilma'], 'view') + # would be True if matching against 'view_stuff' instead of against + # ['view_stuff'] + self.assertEqual(result, False) + + def test_callable_acl(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + context = DummyContext() + fn = lambda self: [(Allow, 'bob', 'read')] + context.__acl__ = fn.__get__(context, context.__class__) + result = helper.permits(context, ['bob'], 'read') + self.assertTrue(result) + + def test_principals_allowed_by_permission_direct(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Allow + from pyramid.security import DENY_ALL + + helper = ACLHelper() + context = DummyContext() + acl = [ + (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read'), + ] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission_callable_acl(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Allow + from pyramid.security import DENY_ALL + + helper = ACLHelper() + context = DummyContext() + acl = lambda: [ + (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read'), + ] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission_string_permission(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + context = DummyContext() + acl = [(Allow, 'chrism', 'read_it')] + context.__acl__ = acl + result = helper.principals_allowed_by_permission(context, 'read') + # would be ['chrism'] if 'read' were compared against 'read_it' instead + # of against ['read_it'] + self.assertEqual(list(result), []) + + def test_principals_allowed_by_permission(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Allow + from pyramid.security import Deny + from pyramid.security import DENY_ALL + from pyramid.security import ALL_PERMISSIONS + + helper = ACLHelper() + root = DummyContext(__name__='', __parent__=None) + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ + (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', ('read',)), + (Allow, 'jim', ALL_PERMISSIONS), + ] + community.__acl__ = [ + (Deny, 'flooz', 'read'), + (Allow, 'flooz', 'read'), + (Allow, 'mork', 'read'), + (Deny, 'jim', 'read'), + (Allow, 'someguy', 'manage'), + ] + blog.__acl__ = [(Allow, 'fred', 'read'), DENY_ALL] + + result = sorted(helper.principals_allowed_by_permission(blog, 'read')) + self.assertEqual(result, ['fred']) + result = sorted( + helper.principals_allowed_by_permission(community, 'read') + ) + self.assertEqual(result, ['chrism', 'mork', 'other']) + result = sorted( + helper.principals_allowed_by_permission(community, 'read') + ) + result = sorted(helper.principals_allowed_by_permission(root, 'read')) + self.assertEqual(result, ['chrism', 'jim', 'other']) + + def test_principals_allowed_by_permission_no_acls(self): + from pyramid.authorization import ACLHelper + + helper = ACLHelper() + context = DummyContext() + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + def test_principals_allowed_by_permission_deny_not_permission_in_acl(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Deny + from pyramid.security import Everyone + + helper = ACLHelper() + context = DummyContext() + acl = [(Deny, Everyone, 'write')] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + def test_principals_allowed_by_permission_deny_permission_in_acl(self): + from pyramid.authorization import ACLHelper + from pyramid.security import Deny + from pyramid.security import Everyone + + helper = ACLHelper() + context = DummyContext() + acl = [(Deny, Everyone, 'read')] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + class DummyContext: def __init__(self, *arg, **kw): self.__dict__.update(kw) diff --git a/tests/test_security.py b/tests/test_security.py index ecd6a088b..5a0307c66 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -612,278 +612,3 @@ def _makeRequest(): request.registry = Registry() request.context = object() return request - - -class TestACLHelper(unittest.TestCase): - def test_no_acl(self): - from pyramid.security import ACLHelper - - context = DummyContext() - helper = ACLHelper() - result = helper.permits(context, ['foo'], 'permission') - self.assertEqual(result, False) - self.assertEqual(result.ace, '') - self.assertEqual( - result.acl, '' - ) - self.assertEqual(result.permission, 'permission') - self.assertEqual(result.principals, ['foo']) - self.assertEqual(result.context, context) - - def test_acl(self): - from pyramid.security import ACLHelper - from pyramid.security import Deny - from pyramid.security import Allow - from pyramid.security import Everyone - from pyramid.security import Authenticated - from pyramid.security import ALL_PERMISSIONS - from pyramid.security import DENY_ALL - - helper = ACLHelper() - root = DummyContext() - community = DummyContext(__name__='community', __parent__=root) - blog = DummyContext(__name__='blog', __parent__=community) - root.__acl__ = [(Allow, Authenticated, VIEW)] - community.__acl__ = [ - (Allow, 'fred', ALL_PERMISSIONS), - (Allow, 'wilma', VIEW), - DENY_ALL, - ] - blog.__acl__ = [ - (Allow, 'barney', MEMBER_PERMS), - (Allow, 'wilma', VIEW), - ] - - result = helper.permits( - blog, [Everyone, Authenticated, 'wilma'], 'view' - ) - self.assertEqual(result, True) - self.assertEqual(result.context, blog) - self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) - self.assertEqual(result.acl, blog.__acl__) - - result = helper.permits( - blog, [Everyone, Authenticated, 'wilma'], 'delete' - ) - self.assertEqual(result, False) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) - self.assertEqual(result.acl, community.__acl__) - - result = helper.permits( - blog, [Everyone, Authenticated, 'fred'], 'view' - ) - self.assertEqual(result, True) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) - result = helper.permits( - blog, [Everyone, Authenticated, 'fred'], 'doesntevenexistyet' - ) - self.assertEqual(result, True) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) - self.assertEqual(result.acl, community.__acl__) - - result = helper.permits( - blog, [Everyone, Authenticated, 'barney'], 'view' - ) - self.assertEqual(result, True) - self.assertEqual(result.context, blog) - self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) - result = helper.permits( - blog, [Everyone, Authenticated, 'barney'], 'administer' - ) - self.assertEqual(result, False) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) - self.assertEqual(result.acl, community.__acl__) - - result = helper.permits( - root, [Everyone, Authenticated, 'someguy'], 'view' - ) - self.assertEqual(result, True) - self.assertEqual(result.context, root) - self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) - result = helper.permits( - blog, [Everyone, Authenticated, 'someguy'], 'view' - ) - self.assertEqual(result, False) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) - self.assertEqual(result.acl, community.__acl__) - - result = helper.permits(root, [Everyone], 'view') - self.assertEqual(result, False) - self.assertEqual(result.context, root) - self.assertEqual(result.ace, '') - self.assertEqual(result.acl, root.__acl__) - - context = DummyContext() - result = helper.permits(context, [Everyone], 'view') - self.assertEqual(result, False) - self.assertEqual(result.ace, '') - self.assertEqual( - result.acl, '' - ) - - def test_string_permissions_in_acl(self): - from pyramid.security import ACLHelper - from pyramid.security import Allow - - helper = ACLHelper() - root = DummyContext() - root.__acl__ = [(Allow, 'wilma', 'view_stuff')] - - result = helper.permits(root, ['wilma'], 'view') - # would be True if matching against 'view_stuff' instead of against - # ['view_stuff'] - self.assertEqual(result, False) - - def test_callable_acl(self): - from pyramid.security import ACLHelper - from pyramid.security import Allow - - helper = ACLHelper() - context = DummyContext() - fn = lambda self: [(Allow, 'bob', 'read')] - context.__acl__ = fn.__get__(context, context.__class__) - result = helper.permits(context, ['bob'], 'read') - self.assertTrue(result) - - def test_principals_allowed_by_permission_direct(self): - from pyramid.security import ACLHelper - from pyramid.security import Allow - from pyramid.security import DENY_ALL - - helper = ACLHelper() - context = DummyContext() - acl = [ - (Allow, 'chrism', ('read', 'write')), - DENY_ALL, - (Allow, 'other', 'read'), - ] - context.__acl__ = acl - result = sorted( - helper.principals_allowed_by_permission(context, 'read') - ) - self.assertEqual(result, ['chrism']) - - def test_principals_allowed_by_permission_callable_acl(self): - from pyramid.security import ACLHelper - from pyramid.security import Allow - from pyramid.security import DENY_ALL - - helper = ACLHelper() - context = DummyContext() - acl = lambda: [ - (Allow, 'chrism', ('read', 'write')), - DENY_ALL, - (Allow, 'other', 'read'), - ] - context.__acl__ = acl - result = sorted( - helper.principals_allowed_by_permission(context, 'read') - ) - self.assertEqual(result, ['chrism']) - - def test_principals_allowed_by_permission_string_permission(self): - from pyramid.security import ACLHelper - from pyramid.security import Allow - - helper = ACLHelper() - context = DummyContext() - acl = [(Allow, 'chrism', 'read_it')] - context.__acl__ = acl - result = helper.principals_allowed_by_permission(context, 'read') - # would be ['chrism'] if 'read' were compared against 'read_it' instead - # of against ['read_it'] - self.assertEqual(list(result), []) - - def test_principals_allowed_by_permission(self): - from pyramid.security import ACLHelper - from pyramid.security import Allow - from pyramid.security import Deny - from pyramid.security import DENY_ALL - from pyramid.security import ALL_PERMISSIONS - - helper = ACLHelper() - root = DummyContext(__name__='', __parent__=None) - community = DummyContext(__name__='community', __parent__=root) - blog = DummyContext(__name__='blog', __parent__=community) - root.__acl__ = [ - (Allow, 'chrism', ('read', 'write')), - (Allow, 'other', ('read',)), - (Allow, 'jim', ALL_PERMISSIONS), - ] - community.__acl__ = [ - (Deny, 'flooz', 'read'), - (Allow, 'flooz', 'read'), - (Allow, 'mork', 'read'), - (Deny, 'jim', 'read'), - (Allow, 'someguy', 'manage'), - ] - blog.__acl__ = [(Allow, 'fred', 'read'), DENY_ALL] - - result = sorted(helper.principals_allowed_by_permission(blog, 'read')) - self.assertEqual(result, ['fred']) - result = sorted( - helper.principals_allowed_by_permission(community, 'read') - ) - self.assertEqual(result, ['chrism', 'mork', 'other']) - result = sorted( - helper.principals_allowed_by_permission(community, 'read') - ) - result = sorted(helper.principals_allowed_by_permission(root, 'read')) - self.assertEqual(result, ['chrism', 'jim', 'other']) - - def test_principals_allowed_by_permission_no_acls(self): - from pyramid.security import ACLHelper - - helper = ACLHelper() - context = DummyContext() - result = sorted( - helper.principals_allowed_by_permission(context, 'read') - ) - self.assertEqual(result, []) - - def test_principals_allowed_by_permission_deny_not_permission_in_acl(self): - from pyramid.security import ACLHelper - from pyramid.security import Deny - from pyramid.security import Everyone - - helper = ACLHelper() - context = DummyContext() - acl = [(Deny, Everyone, 'write')] - context.__acl__ = acl - result = sorted( - helper.principals_allowed_by_permission(context, 'read') - ) - self.assertEqual(result, []) - - def test_principals_allowed_by_permission_deny_permission_in_acl(self): - from pyramid.security import ACLHelper - from pyramid.security import Deny - from pyramid.security import Everyone - - helper = ACLHelper() - context = DummyContext() - acl = [(Deny, Everyone, 'read')] - context.__acl__ = acl - result = sorted( - helper.principals_allowed_by_permission(context, 'read') - ) - self.assertEqual(result, []) - - -VIEW = 'view' -EDIT = 'edit' -CREATE = 'create' -DELETE = 'delete' -MODERATE = 'moderate' -ADMINISTER = 'administer' -COMMENT = 'comment' - -GUEST_PERMS = (VIEW, COMMENT) -MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) -MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) -ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) -- cgit v1.2.3 From ad611d2696701b611d2ef9dfe93567ecf6cb338d Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sat, 27 Apr 2019 14:51:57 -0700 Subject: Add simple integration tests for security. --- tests/pkgs/legacysecurityapp/__init__.py | 37 ++++++++++++++++++++++++++++ tests/pkgs/securityapp/__init__.py | 41 +++++++++++++++++++++++++++++++ tests/test_integration.py | 42 ++++++++++++++++++++++++++++++++ 3 files changed, 120 insertions(+) create mode 100644 tests/pkgs/legacysecurityapp/__init__.py create mode 100644 tests/pkgs/securityapp/__init__.py (limited to 'tests') diff --git a/tests/pkgs/legacysecurityapp/__init__.py b/tests/pkgs/legacysecurityapp/__init__.py new file mode 100644 index 000000000..12fb6104e --- /dev/null +++ b/tests/pkgs/legacysecurityapp/__init__.py @@ -0,0 +1,37 @@ +from pyramid.response import Response +from pyramid.authentication import RemoteUserAuthenticationPolicy +from pyramid.security import Allowed, Denied + + +class AuthorizationPolicy: + def permits(self, context, principals, permission): + if 'bob' in principals and permission == 'foo': + return Allowed('') + else: + return Denied('') + + def principals_allowed_by_permission(self, context, permission): + raise NotImplementedError() # pragma: no cover + + +def public(context, request): + return Response('Hello') + + +def private(context, request): + return Response('Secret') + + +def inaccessible(context, request): + raise AssertionError() # pragma: no cover + + +def includeme(config): + config.set_authentication_policy(RemoteUserAuthenticationPolicy()) + config.set_authorization_policy(AuthorizationPolicy()) + config.add_route('public', '/public') + config.add_view(public, route_name='public') + config.add_route('private', '/private') + config.add_view(private, route_name='private', permission='foo') + config.add_route('inaccessible', '/inaccessible') + config.add_view(inaccessible, route_name='inaccessible', permission='bar') diff --git a/tests/pkgs/securityapp/__init__.py b/tests/pkgs/securityapp/__init__.py new file mode 100644 index 000000000..6ddba585b --- /dev/null +++ b/tests/pkgs/securityapp/__init__.py @@ -0,0 +1,41 @@ +from pyramid.response import Response +from pyramid.security import Allowed, Denied + + +class SecurityPolicy: + def identify(self, request): + return request.environ.get('REMOTE_USER') + + def permits(self, request, context, identity, permission): + if identity and permission == 'foo': + return Allowed('') + else: + return Denied('') + + def remember(self, request, userid, **kw): + raise NotImplementedError() # pragma: no cover + + def forget(self, request): + raise NotImplementedError() # pragma: no cover + + +def public(context, request): + return Response('Hello') + + +def private(context, request): + return Response('Secret') + + +def inaccessible(context, request): + raise AssertionError() # pragma: no cover + + +def includeme(config): + config.set_security_policy(SecurityPolicy()) + config.add_route('public', '/public') + config.add_view(public, route_name='public') + config.add_route('private', '/private') + config.add_view(private, route_name='private', permission='foo') + config.add_route('inaccessible', '/inaccessible') + config.add_view(inaccessible, route_name='inaccessible', permission='bar') diff --git a/tests/test_integration.py b/tests/test_integration.py index 72465dc93..331542d7d 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -521,6 +521,48 @@ class TestExceptionViewsApp(IntegrationBase, unittest.TestCase): self.assertTrue(b'caught' in res.body) +class TestSecurityApp(IntegrationBase, unittest.TestCase): + package = 'tests.pkgs.securityapp' + + def test_public(self): + res = self.testapp.get('/public', status=200) + self.assertEqual(res.body, b'Hello') + + def test_private_denied(self): + self.testapp.get('/private', status=403) + + def test_private_allowed(self): + self.testapp.extra_environ = {'REMOTE_USER': 'bob'} + res = self.testapp.get('/private', status=200) + self.assertEqual(res.body, b'Secret') + + def test_inaccessible(self): + self.testapp.get('/inaccessible', status=403) + self.testapp.extra_environ = {'REMOTE_USER': 'bob'} + self.testapp.get('/inaccessible', status=403) + + +class TestLegacySecurityApp(IntegrationBase, unittest.TestCase): + package = 'tests.pkgs.legacysecurityapp' + + def test_public(self): + res = self.testapp.get('/public', status=200) + self.assertEqual(res.body, b'Hello') + + def test_private_denied(self): + self.testapp.get('/private', status=403) + + def test_private_allowed(self): + self.testapp.extra_environ = {'REMOTE_USER': 'bob'} + res = self.testapp.get('/private', status=200) + self.assertEqual(res.body, b'Secret') + + def test_inaccessible(self): + self.testapp.get('/inaccessible', status=403) + self.testapp.extra_environ = {'REMOTE_USER': 'bob'} + self.testapp.get('/inaccessible', status=403) + + class TestConflictApp(unittest.TestCase): package = 'tests.pkgs.conflictapp' -- cgit v1.2.3 From 08d5eddb2932a894fac03917508da95f480bfe7d Mon Sep 17 00:00:00 2001 From: Theron Luhn Date: Sun, 12 May 2019 10:55:01 -0700 Subject: Rename request.identity to request.authenticated_identity. --- tests/test_security.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'tests') diff --git a/tests/test_security.py b/tests/test_security.py index 5a0307c66..2a8847f3b 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -318,7 +318,7 @@ class TestViewExecutionPermitted(unittest.TestCase): self.assertTrue(result) -class TestIdentity(unittest.TestCase): +class TestAuthenticatedIdentity(unittest.TestCase): def setUp(self): testing.setUp() @@ -327,12 +327,12 @@ class TestIdentity(unittest.TestCase): def test_identity_no_security_policy(self): request = _makeRequest() - self.assertEquals(request.identity, None) + self.assertEquals(request.authenticated_identity, None) def test_identity(self): request = _makeRequest() _registerSecurityPolicy(request.registry, 'yo') - self.assertEqual(request.identity, 'yo') + self.assertEqual(request.authenticated_identity, 'yo') class TestAuthenticatedUserId(unittest.TestCase): -- cgit v1.2.3