summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/glossary.rst18
-rw-r--r--src/pyramid/authentication.py444
-rw-r--r--src/pyramid/authorization.py79
-rw-r--r--src/pyramid/config/__init__.py14
-rw-r--r--src/pyramid/config/security.py52
-rw-r--r--src/pyramid/config/testing.py15
-rw-r--r--src/pyramid/interfaces.py42
-rw-r--r--src/pyramid/predicates.py7
-rw-r--r--src/pyramid/request.py4
-rw-r--r--src/pyramid/security.py880
-rw-r--r--src/pyramid/testing.py42
-rw-r--r--src/pyramid/viewderivers.py36
-rw-r--r--tests/pkgs/defpermbugapp/__init__.py4
-rw-r--r--tests/pkgs/forbiddenapp/__init__.py4
-rw-r--r--tests/pkgs/staticpermapp/__init__.py4
-rw-r--r--tests/test_authentication.py1000
-rw-r--r--tests/test_config/test_init.py9
-rw-r--r--tests/test_config/test_security.py32
-rw-r--r--tests/test_config/test_testing.py28
-rw-r--r--tests/test_config/test_views.py29
-rw-r--r--tests/test_integration.py8
-rw-r--r--tests/test_predicates.py22
-rw-r--r--tests/test_request.py4
-rw-r--r--tests/test_security.py1507
-rw-r--r--tests/test_testing.py45
-rw-r--r--tests/test_viewderivers.py134
26 files changed, 2572 insertions, 1891 deletions
diff --git a/docs/glossary.rst b/docs/glossary.rst
index cd472a660..8a1d27734 100644
--- a/docs/glossary.rst
+++ b/docs/glossary.rst
@@ -306,6 +306,16 @@ Glossary
a principal, but this is not strictly necessary in custom policies that
define their principals differently.
+ identity
+ An identity is an opaque identifier of the user associated with the
+ current request.
+
+ security policy
+ A security policy in :app:`Pyramid` terms is a bit of code which has an
+ API which identifies the user associated with the current request (perhaps
+ via a cookie or ``Authorization`` header) and determines whether or not
+ that user is permitted to access the requested resource.
+
authorization policy
An authorization policy in :app:`Pyramid` terms is a bit of
code which has an API which determines whether or not the
@@ -313,11 +323,19 @@ Glossary
associated with a permission, based on the information found on the
:term:`context` resource.
+ .. deprecated:: 2.0
+ Authorization policies have been deprecated in favor of a
+ :term:`security policy`.
+
authentication policy
An authentication policy in :app:`Pyramid` terms is a bit of
code which has an API which determines the current
:term:`principal` (or principals) associated with a request.
+ .. deprecated:: 2.0
+ Authentication policies have been deprecated in favor of a
+ :term:`security policy`.
+
WSGI
`Web Server Gateway Interface <https://wsgi.readthedocs.io/en/latest/>`_.
This is a Python standard for connecting web applications to web servers,
diff --git a/src/pyramid/authentication.py b/src/pyramid/authentication.py
index 21cfc0c0e..12f31e5dd 100644
--- a/src/pyramid/authentication.py
+++ b/src/pyramid/authentication.py
@@ -1,26 +1,23 @@
import binascii
-from codecs import utf_8_decode
-from codecs import utf_8_encode
from collections import namedtuple
-import hashlib
-import base64
-import re
-import time as time_mod
-from urllib.parse import quote, unquote
-import warnings
from zope.interface import implementer
-from webob.cookies import CookieProfile
-
from pyramid.interfaces import IAuthenticationPolicy, IDebugLogger
-from pyramid.security import Authenticated, Everyone
-
-from pyramid.util import strings_differ, bytes_, ascii_, text_
-from pyramid.util import SimpleSerializer
-
-VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
+from pyramid.security import Authenticated, Everyone, AuthTktCookieHelper
+
+# bw compat after moving AuthTktHelper and friends to pyramid.security
+from pyramid.security import ( # noqa
+ VALID_TOKEN,
+ b64encode,
+ b64decode,
+ AuthTicket,
+ BadTicket,
+ parse_ticket,
+ calculate_digest,
+ encode_ip_timestamp,
+)
class CallbackAuthenticationPolicy(object):
@@ -651,421 +648,6 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
return self.cookie.forget(request)
-def b64encode(v):
- return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'')
-
-
-def b64decode(v):
- return base64.b64decode(bytes_(v))
-
-
-# this class licensed under the MIT license (stolen from Paste)
-class AuthTicket(object):
- """
- This class represents an authentication token. You must pass in
- the shared secret, the userid, and the IP address. Optionally you
- can include tokens (a list of strings, representing role names),
- 'user_data', which is arbitrary data available for your own use in
- later scripts. Lastly, you can override the cookie name and
- timestamp.
-
- Once you provide all the arguments, use .cookie_value() to
- generate the appropriate authentication ticket.
-
- Usage::
-
- token = AuthTicket('sharedsecret', 'username',
- os.environ['REMOTE_ADDR'], tokens=['admin'])
- val = token.cookie_value()
-
- """
-
- def __init__(
- self,
- secret,
- userid,
- ip,
- tokens=(),
- user_data='',
- time=None,
- cookie_name='auth_tkt',
- secure=False,
- hashalg='md5',
- ):
- self.secret = secret
- self.userid = userid
- self.ip = ip
- self.tokens = ','.join(tokens)
- self.user_data = user_data
- if time is None:
- self.time = time_mod.time()
- else:
- self.time = time
- self.cookie_name = cookie_name
- self.secure = secure
- self.hashalg = hashalg
-
- def digest(self):
- return calculate_digest(
- self.ip,
- self.time,
- self.secret,
- self.userid,
- self.tokens,
- self.user_data,
- self.hashalg,
- )
-
- def cookie_value(self):
- v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid))
- if self.tokens:
- v += self.tokens + '!'
- v += self.user_data
- return v
-
-
-# this class licensed under the MIT license (stolen from Paste)
-class BadTicket(Exception):
- """
- Exception raised when a ticket can't be parsed. If we get far enough to
- determine what the expected digest should have been, expected is set.
- This should not be shown by default, but can be useful for debugging.
- """
-
- def __init__(self, msg, expected=None):
- self.expected = expected
- Exception.__init__(self, msg)
-
-
-# this function licensed under the MIT license (stolen from Paste)
-def parse_ticket(secret, ticket, ip, hashalg='md5'):
- """
- Parse the ticket, returning (timestamp, userid, tokens, user_data).
-
- If the ticket cannot be parsed, a ``BadTicket`` exception will be raised
- with an explanation.
- """
- ticket = text_(ticket).strip('"')
- digest_size = hashlib.new(hashalg).digest_size * 2
- digest = ticket[:digest_size]
- try:
- timestamp = int(ticket[digest_size : digest_size + 8], 16)
- except ValueError as e:
- raise BadTicket('Timestamp is not a hex integer: %s' % e)
- try:
- userid, data = ticket[digest_size + 8 :].split('!', 1)
- except ValueError:
- raise BadTicket('userid is not followed by !')
- userid = unquote(userid)
- if '!' in data:
- tokens, user_data = data.split('!', 1)
- else: # pragma: no cover (never generated)
- # @@: Is this the right order?
- tokens = ''
- user_data = data
-
- expected = calculate_digest(
- ip, timestamp, secret, userid, tokens, user_data, hashalg
- )
-
- # Avoid timing attacks (see
- # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
- if strings_differ(expected, digest):
- raise BadTicket(
- 'Digest signature is not correct', expected=(expected, digest)
- )
-
- tokens = tokens.split(',')
-
- return (timestamp, userid, tokens, user_data)
-
-
-# this function licensed under the MIT license (stolen from Paste)
-def calculate_digest(
- ip, timestamp, secret, userid, tokens, user_data, hashalg='md5'
-):
- secret = bytes_(secret, 'utf-8')
- userid = bytes_(userid, 'utf-8')
- tokens = bytes_(tokens, 'utf-8')
- user_data = bytes_(user_data, 'utf-8')
- hash_obj = hashlib.new(hashalg)
-
- # Check to see if this is an IPv6 address
- if ':' in ip:
- ip_timestamp = ip + str(int(timestamp))
- ip_timestamp = bytes_(ip_timestamp)
- else:
- # encode_ip_timestamp not required, left in for backwards compatibility
- ip_timestamp = encode_ip_timestamp(ip, timestamp)
-
- hash_obj.update(
- ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data
- )
- digest = hash_obj.hexdigest()
- hash_obj2 = hashlib.new(hashalg)
- hash_obj2.update(bytes_(digest) + secret)
- return hash_obj2.hexdigest()
-
-
-# this function licensed under the MIT license (stolen from Paste)
-def encode_ip_timestamp(ip, timestamp):
- ip_chars = ''.join(map(chr, map(int, ip.split('.'))))
- t = int(timestamp)
- ts = (
- (t & 0xFF000000) >> 24,
- (t & 0xFF0000) >> 16,
- (t & 0xFF00) >> 8,
- t & 0xFF,
- )
- ts_chars = ''.join(map(chr, ts))
- return bytes_(ip_chars + ts_chars)
-
-
-class AuthTktCookieHelper(object):
- """
- A helper class for use in third-party authentication policy
- implementations. See
- :class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the
- meanings of the constructor arguments.
- """
-
- parse_ticket = staticmethod(parse_ticket) # for tests
- AuthTicket = AuthTicket # for tests
- BadTicket = BadTicket # for tests
- now = None # for tests
-
- userid_type_decoders = {
- 'int': int,
- 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies
- 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
- 'b64str': lambda x: b64decode(x),
- }
-
- userid_type_encoders = {
- int: ('int', str),
- str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
- bytes: ('b64str', lambda x: b64encode(x)),
- }
-
- def __init__(
- self,
- secret,
- cookie_name='auth_tkt',
- secure=False,
- include_ip=False,
- timeout=None,
- reissue_time=None,
- max_age=None,
- http_only=False,
- path="/",
- wild_domain=True,
- hashalg='md5',
- parent_domain=False,
- domain=None,
- samesite='Lax',
- ):
- self.cookie_profile = CookieProfile(
- cookie_name=cookie_name,
- secure=secure,
- max_age=max_age,
- httponly=http_only,
- path=path,
- serializer=SimpleSerializer(),
- samesite=samesite,
- )
-
- self.secret = secret
- self.cookie_name = cookie_name
- self.secure = secure
- self.include_ip = include_ip
- self.timeout = timeout if timeout is None else int(timeout)
- self.reissue_time = (
- reissue_time if reissue_time is None else int(reissue_time)
- )
- self.max_age = max_age if max_age is None else int(max_age)
- self.wild_domain = wild_domain
- self.parent_domain = parent_domain
- self.domain = domain
- self.hashalg = hashalg
-
- def _get_cookies(self, request, value, max_age=None):
- cur_domain = request.domain
-
- domains = []
- if self.domain:
- domains.append(self.domain)
- else:
- if self.parent_domain and cur_domain.count('.') > 1:
- domains.append('.' + cur_domain.split('.', 1)[1])
- else:
- domains.append(None)
- domains.append(cur_domain)
- if self.wild_domain:
- domains.append('.' + cur_domain)
-
- profile = self.cookie_profile(request)
-
- kw = {}
- kw['domains'] = domains
- if max_age is not None:
- kw['max_age'] = max_age
-
- headers = profile.get_headers(value, **kw)
- return headers
-
- def identify(self, request):
- """ Return a dictionary with authentication information, or ``None``
- if no valid auth_tkt is attached to ``request``"""
- environ = request.environ
- cookie = request.cookies.get(self.cookie_name)
-
- if cookie is None:
- return None
-
- if self.include_ip:
- remote_addr = environ['REMOTE_ADDR']
- else:
- remote_addr = '0.0.0.0'
-
- try:
- timestamp, userid, tokens, user_data = self.parse_ticket(
- self.secret, cookie, remote_addr, self.hashalg
- )
- except self.BadTicket:
- return None
-
- now = self.now # service tests
-
- if now is None:
- now = time_mod.time()
-
- if self.timeout and ((timestamp + self.timeout) < now):
- # the auth_tkt data has expired
- return None
-
- userid_typename = 'userid_type:'
- user_data_info = user_data.split('|')
- for datum in filter(None, user_data_info):
- if datum.startswith(userid_typename):
- userid_type = datum[len(userid_typename) :]
- decoder = self.userid_type_decoders.get(userid_type)
- if decoder:
- userid = decoder(userid)
-
- reissue = self.reissue_time is not None
-
- if reissue and not hasattr(request, '_authtkt_reissued'):
- if (now - timestamp) > self.reissue_time:
- # See https://github.com/Pylons/pyramid/issues#issue/108
- tokens = list(filter(None, tokens))
- headers = self.remember(
- request, userid, max_age=self.max_age, tokens=tokens
- )
-
- def reissue_authtkt(request, response):
- if not hasattr(request, '_authtkt_reissue_revoked'):
- for k, v in headers:
- response.headerlist.append((k, v))
-
- request.add_response_callback(reissue_authtkt)
- request._authtkt_reissued = True
-
- environ['REMOTE_USER_TOKENS'] = tokens
- environ['REMOTE_USER_DATA'] = user_data
- environ['AUTH_TYPE'] = 'cookie'
-
- identity = {}
- identity['timestamp'] = timestamp
- identity['userid'] = userid
- identity['tokens'] = tokens
- identity['userdata'] = user_data
- return identity
-
- def forget(self, request):
- """ Return a set of expires Set-Cookie headers, which will destroy
- any existing auth_tkt cookie when attached to a response"""
- request._authtkt_reissue_revoked = True
- return self._get_cookies(request, None)
-
- def remember(self, request, userid, max_age=None, tokens=()):
- """ Return a set of Set-Cookie headers; when set into a response,
- these headers will represent a valid authentication ticket.
-
- ``max_age``
- The max age of the auth_tkt cookie, in seconds. When this value is
- set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
- allowing the auth_tkt cookie to last between browser sessions. If
- this value is ``None``, the ``max_age`` value provided to the
- helper itself will be used as the ``max_age`` value. Default:
- ``None``.
-
- ``tokens``
- A sequence of strings that will be placed into the auth_tkt tokens
- field. Each string in the sequence must be of the Python ``str``
- type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
- Tokens are available in the returned identity when an auth_tkt is
- found in the request and unpacked. Default: ``()``.
- """
- max_age = self.max_age if max_age is None else int(max_age)
-
- environ = request.environ
-
- if self.include_ip:
- remote_addr = environ['REMOTE_ADDR']
- else:
- remote_addr = '0.0.0.0'
-
- user_data = ''
-
- encoding_data = self.userid_type_encoders.get(type(userid))
-
- if encoding_data:
- encoding, encoder = encoding_data
- else:
- warnings.warn(
- "userid is of type {}, and is not supported by the "
- "AuthTktAuthenticationPolicy. Explicitly converting to string "
- "and storing as base64. Subsequent requests will receive a "
- "string as the userid, it will not be decoded back to the "
- "type provided.".format(type(userid)),
- RuntimeWarning,
- )
- encoding, encoder = self.userid_type_encoders.get(str)
- userid = str(userid)
-
- userid = encoder(userid)
- user_data = 'userid_type:%s' % encoding
-
- new_tokens = []
- for token in tokens:
- if isinstance(token, str):
- try:
- token = ascii_(token)
- except UnicodeEncodeError:
- raise ValueError("Invalid token %r" % (token,))
- if not (isinstance(token, str) and VALID_TOKEN.match(token)):
- raise ValueError("Invalid token %r" % (token,))
- new_tokens.append(token)
- tokens = tuple(new_tokens)
-
- if hasattr(request, '_authtkt_reissued'):
- request._authtkt_reissue_revoked = True
-
- ticket = self.AuthTicket(
- self.secret,
- userid,
- remote_addr,
- tokens=tokens,
- user_data=user_data,
- cookie_name=self.cookie_name,
- secure=self.secure,
- hashalg=self.hashalg,
- )
-
- cookie_value = ticket.cookie_value()
- return self._get_cookies(request, cookie_value, max_age)
-
-
@implementer(IAuthenticationPolicy)
class SessionAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` authentication policy which gets its data from the
diff --git a/src/pyramid/authorization.py b/src/pyramid/authorization.py
index 6056a8d25..19b96e3d1 100644
--- a/src/pyramid/authorization.py
+++ b/src/pyramid/authorization.py
@@ -2,11 +2,7 @@ from zope.interface import implementer
from pyramid.interfaces import IAuthorizationPolicy
-from pyramid.location import lineage
-
-from pyramid.security import ACLAllowed, ACLDenied, Allow, Deny, Everyone
-
-from pyramid.util import is_nonstr_iter
+from pyramid.security import ACLHelper
@implementer(IAuthorizationPolicy)
@@ -61,80 +57,21 @@ class ACLAuthorizationPolicy(object):
:class:`pyramid.interfaces.IAuthorizationPolicy` interface.
"""
+ def __init__(self):
+ self.helper = ACLHelper()
+
def permits(self, context, principals, permission):
""" Return an instance of
:class:`pyramid.security.ACLAllowed` instance if the policy
permits access, return an instance of
:class:`pyramid.security.ACLDenied` if not."""
-
- acl = '<No ACL found on any object in resource lineage>'
-
- for location in lineage(context):
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- if acl and callable(acl):
- acl = acl()
-
- for ace in acl:
- ace_action, ace_principal, ace_permissions = ace
- if ace_principal in principals:
- if not is_nonstr_iter(ace_permissions):
- ace_permissions = [ace_permissions]
- if permission in ace_permissions:
- if ace_action == Allow:
- return ACLAllowed(
- ace, acl, permission, principals, location
- )
- else:
- return ACLDenied(
- ace, acl, permission, principals, location
- )
-
- # default deny (if no ACL in lineage at all, or if none of the
- # principals were mentioned in any ACE we found)
- return ACLDenied(
- '<default deny>', acl, permission, principals, context
- )
+ return self.helper.permits(context, principals, permission)
def principals_allowed_by_permission(self, context, permission):
""" Return the set of principals explicitly granted the
permission named ``permission`` according to the ACL directly
attached to the ``context`` as well as inherited ACLs based on
the :term:`lineage`."""
- allowed = set()
-
- for location in reversed(list(lineage(context))):
- # NB: we're walking *up* the object graph from the root
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- allowed_here = set()
- denied_here = set()
-
- if acl and callable(acl):
- acl = acl()
-
- for ace_action, ace_principal, ace_permissions in acl:
- if not is_nonstr_iter(ace_permissions):
- ace_permissions = [ace_permissions]
- if (ace_action == Allow) and (permission in ace_permissions):
- if ace_principal not in denied_here:
- allowed_here.add(ace_principal)
- if (ace_action == Deny) and (permission in ace_permissions):
- denied_here.add(ace_principal)
- if ace_principal == Everyone:
- # clear the entire allowed set, as we've hit a
- # deny of Everyone ala (Deny, Everyone, ALL)
- allowed = set()
- break
- elif ace_principal in allowed:
- allowed.remove(ace_principal)
-
- allowed.update(allowed_here)
-
- return allowed
+ return self.helper.principals_allowed_by_permission(
+ context, permission
+ )
diff --git a/src/pyramid/config/__init__.py b/src/pyramid/config/__init__.py
index 072b654c4..d8961268a 100644
--- a/src/pyramid/config/__init__.py
+++ b/src/pyramid/config/__init__.py
@@ -139,13 +139,17 @@ class Configurator(
:term:`dotted Python name` to the same. If it is ``None``, a default
root factory will be used.
+ If ``security_policy`` is passed, it should be an instance of a
+ :term:`security policy` or a :term:`dotted Python name` to the same.
+
If ``authentication_policy`` is passed, it should be an instance
of an :term:`authentication policy` or a :term:`dotted Python
- name` to the same.
+ name` to the same. (Deprecated as of Pyramid 2.0 in favor of
+ ``security_policy``.)
If ``authorization_policy`` is passed, it should be an instance of
an :term:`authorization policy` or a :term:`dotted Python name` to
- the same.
+ the same. (Deprecated as of Pyramid 2.0 in favor of ``security_policy``.)
.. note:: A ``ConfigurationError`` will be raised when an
authorization policy is supplied without also supplying an
@@ -278,6 +282,7 @@ class Configurator(
package=None,
settings=None,
root_factory=None,
+ security_policy=None,
authentication_policy=None,
authorization_policy=None,
renderers=None,
@@ -315,6 +320,7 @@ class Configurator(
root_factory=root_factory,
authentication_policy=authentication_policy,
authorization_policy=authorization_policy,
+ security_policy=security_policy,
renderers=renderers,
debug_logger=debug_logger,
locale_negotiator=locale_negotiator,
@@ -330,6 +336,7 @@ class Configurator(
self,
settings=None,
root_factory=None,
+ security_policy=None,
authentication_policy=None,
authorization_policy=None,
renderers=None,
@@ -415,6 +422,9 @@ class Configurator(
if authentication_policy:
self.set_authentication_policy(authentication_policy)
+ if security_policy:
+ self.set_security_policy(security_policy)
+
if default_view_mapper is not None:
self.set_view_mapper(default_view_mapper)
diff --git a/src/pyramid/config/security.py b/src/pyramid/config/security.py
index 08e7cb81a..ac7dcef43 100644
--- a/src/pyramid/config/security.py
+++ b/src/pyramid/config/security.py
@@ -6,6 +6,7 @@ from pyramid.interfaces import (
ICSRFStoragePolicy,
IDefaultCSRFOptions,
IDefaultPermission,
+ ISecurityPolicy,
PHASE1_CONFIG,
PHASE2_CONFIG,
)
@@ -13,6 +14,7 @@ from pyramid.interfaces import (
from pyramid.csrf import LegacySessionCSRFStoragePolicy
from pyramid.exceptions import ConfigurationError
from pyramid.util import as_sorted_tuple
+from pyramid.security import LegacySecurityPolicy
from pyramid.config.actions import action_method
@@ -22,6 +24,35 @@ class SecurityConfiguratorMixin(object):
self.set_csrf_storage_policy(LegacySessionCSRFStoragePolicy())
@action_method
+ def set_security_policy(self, policy):
+ """ Override the :app:`Pyramid` :term:`security policy` in the current
+ configuration. The ``policy`` argument must be an instance
+ of a security policy or a :term:`dotted Python name`
+ that points at an instance of a security policy.
+
+ .. note::
+
+ Using the ``security_policy`` argument to the
+ :class:`pyramid.config.Configurator` constructor can be used to
+ achieve the same purpose.
+
+ """
+
+ def register():
+ self.registry.registerUtility(policy, ISecurityPolicy)
+
+ policy = self.maybe_dotted(policy)
+ intr = self.introspectable(
+ 'security policy',
+ None,
+ self.object_description(policy),
+ 'security policy',
+ )
+ intr['policy'] = policy
+ # authentication policy used by view config (phase 3)
+ self.action(ISecurityPolicy, register, introspectables=(intr,))
+
+ @action_method
def set_authentication_policy(self, policy):
""" Override the :app:`Pyramid` :term:`authentication policy` in the
current configuration. The ``policy`` argument must be an instance
@@ -37,14 +68,22 @@ class SecurityConfiguratorMixin(object):
"""
def register():
- self._set_authentication_policy(policy)
+ self.registry.registerUtility(policy, IAuthenticationPolicy)
if self.registry.queryUtility(IAuthorizationPolicy) is None:
raise ConfigurationError(
'Cannot configure an authentication policy without '
'also configuring an authorization policy '
'(use the set_authorization_policy method)'
)
+ if self.registry.queryUtility(ISecurityPolicy) is not None:
+ raise ConfigurationError(
+ 'Cannot configure an authentication and authorization'
+ 'policy with a configured security policy.'
+ )
+ security_policy = LegacySecurityPolicy()
+ self.registry.registerUtility(security_policy, ISecurityPolicy)
+ policy = self.maybe_dotted(policy)
intr = self.introspectable(
'authentication policy',
None,
@@ -60,10 +99,6 @@ class SecurityConfiguratorMixin(object):
introspectables=(intr,),
)
- def _set_authentication_policy(self, policy):
- policy = self.maybe_dotted(policy)
- self.registry.registerUtility(policy, IAuthenticationPolicy)
-
@action_method
def set_authorization_policy(self, policy):
""" Override the :app:`Pyramid` :term:`authorization policy` in the
@@ -79,7 +114,7 @@ class SecurityConfiguratorMixin(object):
"""
def register():
- self._set_authorization_policy(policy)
+ self.registry.registerUtility(policy, IAuthorizationPolicy)
def ensure():
if self.autocommit:
@@ -91,6 +126,7 @@ class SecurityConfiguratorMixin(object):
'(use the set_authorization_policy method)'
)
+ policy = self.maybe_dotted(policy)
intr = self.introspectable(
'authorization policy',
None,
@@ -108,10 +144,6 @@ class SecurityConfiguratorMixin(object):
)
self.action(None, ensure)
- def _set_authorization_policy(self, policy):
- policy = self.maybe_dotted(policy)
- self.registry.registerUtility(policy, IAuthorizationPolicy)
-
@action_method
def set_default_permission(self, permission):
"""
diff --git a/src/pyramid/config/testing.py b/src/pyramid/config/testing.py
index 9c998840a..21c622656 100644
--- a/src/pyramid/config/testing.py
+++ b/src/pyramid/config/testing.py
@@ -1,11 +1,6 @@
from zope.interface import Interface
-from pyramid.interfaces import (
- ITraverser,
- IAuthorizationPolicy,
- IAuthenticationPolicy,
- IRendererFactory,
-)
+from pyramid.interfaces import ITraverser, ISecurityPolicy, IRendererFactory
from pyramid.renderers import RendererHelper
@@ -18,8 +13,7 @@ class TestingConfiguratorMixin(object):
# testing API
def testing_securitypolicy(
self,
- userid=None,
- groupids=(),
+ identity=None,
permissive=True,
remember_result=None,
forget_result=None,
@@ -69,10 +63,9 @@ class TestingConfiguratorMixin(object):
from pyramid.testing import DummySecurityPolicy
policy = DummySecurityPolicy(
- userid, groupids, permissive, remember_result, forget_result
+ identity, permissive, remember_result, forget_result
)
- self.registry.registerUtility(policy, IAuthorizationPolicy)
- self.registry.registerUtility(policy, IAuthenticationPolicy)
+ self.registry.registerUtility(policy, ISecurityPolicy)
return policy
def testing_resources(self, resources):
diff --git a/src/pyramid/interfaces.py b/src/pyramid/interfaces.py
index f1e238c6b..9dabb9cfc 100644
--- a/src/pyramid/interfaces.py
+++ b/src/pyramid/interfaces.py
@@ -482,8 +482,40 @@ class IViewMapperFactory(Interface):
"""
+class ISecurityPolicy(Interface):
+ def identify(request):
+ """ Return an object identifying a trusted and verified user. """
+
+ def permits(request, context, identity, permission):
+ """ Return an instance of :class:`pyramid.security.Allowed` if a user
+ of the given identity is allowed the ``permission`` in the current
+ ``context``, else return an instance of
+ :class:`pyramid.security.Denied`.
+ """
+
+ def remember(request, userid, **kw):
+ """ Return a set of headers suitable for 'remembering' the
+ :term:`userid` named ``userid`` when set in a response. An
+ individual authentication policy and its consumers can
+ decide on the composition and meaning of ``**kw``.
+
+ """
+
+ def forget(request):
+ """ Return a set of headers suitable for 'forgetting' the
+ current user on subsequent requests.
+
+ """
+
+
class IAuthenticationPolicy(Interface):
- """ An object representing a Pyramid authentication policy. """
+ """ An object representing a Pyramid authentication policy.
+
+ .. deprecated:: 2.0
+
+ Use :class:`ISecurityPolicy`.
+
+ """
def authenticated_userid(request):
""" Return the authenticated :term:`userid` or ``None`` if
@@ -536,7 +568,13 @@ class IAuthenticationPolicy(Interface):
class IAuthorizationPolicy(Interface):
- """ An object representing a Pyramid authorization policy. """
+ """ An object representing a Pyramid authorization policy.
+
+ .. deprecated:: 2.0
+
+ Use :class:`ISecurityPolicy`.
+
+ """
def permits(context, principals, permission):
""" Return an instance of :class:`pyramid.security.Allowed` if any
diff --git a/src/pyramid/predicates.py b/src/pyramid/predicates.py
index 5a1127fb3..974f41cc5 100644
--- a/src/pyramid/predicates.py
+++ b/src/pyramid/predicates.py
@@ -291,6 +291,13 @@ class PhysicalPathPredicate(object):
class EffectivePrincipalsPredicate(object):
+ """
+ .. deprecated:: 2.0
+
+ No longer applicable with the new :term:`security policy`.
+
+ """
+
def __init__(self, val, config):
if is_nonstr_iter(val):
self.val = set(val)
diff --git a/src/pyramid/request.py b/src/pyramid/request.py
index b9bd7451a..5c68abe69 100644
--- a/src/pyramid/request.py
+++ b/src/pyramid/request.py
@@ -15,7 +15,7 @@ from pyramid.interfaces import (
from pyramid.decorator import reify
from pyramid.i18n import LocalizerRequestMixin
from pyramid.response import Response, _get_response_factory
-from pyramid.security import AuthenticationAPIMixin, AuthorizationAPIMixin
+from pyramid.security import SecurityAPIMixin, AuthenticationAPIMixin
from pyramid.url import URLMethodsMixin
from pyramid.util import (
InstancePropertyHelper,
@@ -147,8 +147,8 @@ class Request(
CallbackMethodsMixin,
InstancePropertyMixin,
LocalizerRequestMixin,
+ SecurityAPIMixin,
AuthenticationAPIMixin,
- AuthorizationAPIMixin,
ViewMethodsMixin,
):
"""
diff --git a/src/pyramid/security.py b/src/pyramid/security.py
index 61819588b..e28ce0f1c 100644
--- a/src/pyramid/security.py
+++ b/src/pyramid/security.py
@@ -1,6 +1,18 @@
-from zope.interface import providedBy
+from codecs import utf_8_decode
+from codecs import utf_8_encode
+import hashlib
+import base64
+import time as time_mod
+from urllib.parse import quote, unquote
+import warnings
+import re
+
+from webob.cookies import CookieProfile
+
+from zope.interface import implementer, providedBy
from pyramid.interfaces import (
+ ISecurityPolicy,
IAuthenticationPolicy,
IAuthorizationPolicy,
ISecuredView,
@@ -8,8 +20,16 @@ from pyramid.interfaces import (
IViewClassifier,
)
+from pyramid.location import lineage
+
+from pyramid.util import is_nonstr_iter, strings_differ, bytes_, ascii_, text_
+
+from pyramid.util import SimpleSerializer
+
from pyramid.threadlocal import get_current_registry
+VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
+
Everyone = 'system.Everyone'
Authenticated = 'system.Authenticated'
Allow = 'Allow'
@@ -35,17 +55,12 @@ DENY_ALL = (Deny, Everyone, ALL_PERMISSIONS)
NO_PERMISSION_REQUIRED = '__no_permission_required__'
-def _get_registry(request):
- try:
- reg = request.registry
- except AttributeError:
- reg = get_current_registry() # b/c
- return reg
+def _get_security_policy(request):
+ return request.registry.queryUtility(ISecurityPolicy)
def _get_authentication_policy(request):
- registry = _get_registry(request)
- return registry.queryUtility(IAuthenticationPolicy)
+ return request.registry.queryUtility(IAuthenticationPolicy)
def remember(request, userid, **kw):
@@ -54,7 +69,7 @@ def remember(request, userid, **kw):
on this request's response.
These headers are suitable for 'remembering' a set of credentials
implied by the data passed as ``userid`` and ``*kw`` using the
- current :term:`authentication policy`. Common usage might look
+ current :term:`security policy`. Common usage might look
like so within the body of a view function (``response`` is
assumed to be a :term:`WebOb` -style :term:`response` object
computed previously by the view code):
@@ -67,10 +82,10 @@ def remember(request, userid, **kw):
response.headerlist.extend(headers)
return response
- If no :term:`authentication policy` is in use, this function will
+ If no :term:`security policy` is in use, this function will
always return an empty sequence. If used, the composition and
meaning of ``**kw`` must be agreed upon by the calling code and
- the effective authentication policy.
+ the effective security policy.
.. versionchanged:: 1.6
Deprecated the ``principal`` argument in favor of ``userid`` to clarify
@@ -79,7 +94,7 @@ def remember(request, userid, **kw):
.. versionchanged:: 1.10
Removed the deprecated ``principal`` argument.
"""
- policy = _get_authentication_policy(request)
+ policy = _get_security_policy(request)
if policy is None:
return []
return policy.remember(request, userid, **kw)
@@ -101,10 +116,10 @@ def forget(request):
response.headerlist.extend(headers)
return response
- If no :term:`authentication policy` is in use, this function will
+ If no :term:`security policy` is in use, this function will
always return an empty sequence.
"""
- policy = _get_authentication_policy(request)
+ policy = _get_security_policy(request)
if policy is None:
return []
return policy.forget(request)
@@ -126,6 +141,7 @@ def principals_allowed_by_permission(context, permission):
required machinery for this function; those will cause a
:exc:`NotImplementedError` exception to be raised when this
function is invoked.
+
"""
reg = get_current_registry()
policy = reg.queryUtility(IAuthorizationPolicy)
@@ -147,7 +163,7 @@ def view_execution_permitted(context, request, name=''):
An exception is raised if no view is found.
"""
- reg = _get_registry(request)
+ reg = request.registry
provides = [IViewClassifier] + [providedBy(x) for x in (request, context)]
# XXX not sure what to do here about using _find_views or analogue;
# for now let's just keep it as-is
@@ -280,6 +296,48 @@ class ACLAllowed(ACLPermitsResult, Allowed):
"""
+class SecurityAPIMixin(object):
+ @property
+ def identity(self):
+ """
+ Return an opaque object identifying the current user or ``None`` if no
+ user is authenticated or there is no :term:`security policy` in effect.
+
+ """
+ policy = _get_security_policy(self)
+ if policy is None:
+ return None
+ return policy.identify(self)
+
+ def has_permission(self, permission, context=None):
+ """ Given a permission and an optional context, returns an instance of
+ :data:`pyramid.security.Allowed` if the permission is granted to this
+ request with the provided context, or the context already associated
+ with the request. Otherwise, returns an instance of
+ :data:`pyramid.security.Denied`. This method delegates to the current
+ security policy. Returns
+ :data:`pyramid.security.Allowed` unconditionally if no security
+ policy has been registered for this request. If ``context`` is not
+ supplied or is supplied as ``None``, the context used is the
+ ``request.context`` attribute.
+
+ :param permission: Does this request have the given permission?
+ :type permission: str
+ :param context: A resource object or ``None``
+ :type context: object
+ :returns: Either :class:`pyramid.security.Allowed` or
+ :class:`pyramid.security.Denied`.
+
+ """
+ if context is None:
+ context = self.context
+ policy = _get_security_policy(self)
+ if policy is None:
+ return Allowed('No security policy in use.')
+ identity = policy.identify(self)
+ return policy.permits(self, context, identity, permission)
+
+
class AuthenticationAPIMixin(object):
@property
def authenticated_userid(self):
@@ -287,12 +345,19 @@ class AuthenticationAPIMixin(object):
``None`` if there is no :term:`authentication policy` in effect or
there is no currently authenticated user.
- .. versionadded:: 1.5
+ .. deprecated:: 2.0
+
+ Use ``request.identity`` instead.
+
"""
- policy = _get_authentication_policy(self)
- if policy is None:
+ authn = _get_authentication_policy(self)
+ security = _get_security_policy(self)
+ if authn is not None:
+ return authn.authenticated_userid(self)
+ elif security is not None:
+ return security.identify(self)
+ else:
return None
- return policy.authenticated_userid(self)
@property
def unauthenticated_userid(self):
@@ -304,12 +369,19 @@ class AuthenticationAPIMixin(object):
effective authentication policy will not ensure that a record
associated with the userid exists in persistent storage.
- .. versionadded:: 1.5
+ .. deprecated:: 2.0
+
+ Use ``request.identity`` instead.
+
"""
- policy = _get_authentication_policy(self)
- if policy is None:
+ authn = _get_authentication_policy(self)
+ security = _get_security_policy(self)
+ if authn is not None:
+ return authn.unauthenticated_userid(self)
+ elif security is not None:
+ return security.identify(self)
+ else:
return None
- return policy.unauthenticated_userid(self)
@property
def effective_principals(self):
@@ -318,7 +390,8 @@ class AuthenticationAPIMixin(object):
this will return a one-element list containing the
:data:`pyramid.security.Everyone` principal.
- .. versionadded:: 1.5
+ .. deprecated:: 2.0
+
"""
policy = _get_authentication_policy(self)
if policy is None:
@@ -326,40 +399,731 @@ class AuthenticationAPIMixin(object):
return policy.effective_principals(self)
-class AuthorizationAPIMixin(object):
- def has_permission(self, permission, context=None):
- """ Given a permission and an optional context, returns an instance of
- :data:`pyramid.security.Allowed` if the permission is granted to this
- request with the provided context, or the context already associated
- with the request. Otherwise, returns an instance of
- :data:`pyramid.security.Denied`. This method delegates to the current
- authentication and authorization policies. Returns
- :data:`pyramid.security.Allowed` unconditionally if no authentication
- policy has been registered for this request. If ``context`` is not
- supplied or is supplied as ``None``, the context used is the
- ``request.context`` attribute.
+@implementer(ISecurityPolicy)
+class LegacySecurityPolicy:
+ """
+ A :term:`security policy` which provides a backwards compatibility shim for
+ the :term:`authentication policy` and the :term:`authorization policy`.
- :param permission: Does this request have the given permission?
- :type permission: str
- :param context: A resource object or ``None``
- :type context: object
- :returns: Either :class:`pyramid.security.Allowed` or
- :class:`pyramid.security.Denied`.
+ """
+
+ def _get_authn_policy(self, request):
+ return request.registry.getUtility(IAuthenticationPolicy)
+
+ def _get_authz_policy(self, request):
+ return request.registry.getUtility(IAuthorizationPolicy)
- .. versionadded:: 1.5
+ def identify(self, request):
+ authn = self._get_authn_policy(request)
+ return authn.authenticated_userid(request)
+
+ def remember(self, request, userid, **kw):
+ authn = self._get_authn_policy(request)
+ return authn.remember(request, userid, **kw)
+
+ def forget(self, request):
+ authn = self._get_authn_policy(request)
+ return authn.forget(request)
+
+ def permits(self, request, context, identity, permission):
+ authn = self._get_authn_policy(request)
+ authz = self._get_authz_policy(request)
+ principals = authn.effective_principals(request)
+ return authz.permits(context, principals, permission)
+
+
+class ACLHelper:
+ """ A helper for use with constructing a :term:`security policy` which
+ consults an :term:`ACL` object attached to a :term:`context` to determine
+ authorization information about a :term:`principal` or multiple principals.
+ If the context is part of a :term:`lineage`, the context's parents are
+ consulted for ACL information too.
+
+ """
+
+ def permits(self, context, principals, permission):
+ """ Return an instance of :class:`pyramid.security.ACLAllowed` if the
+ ACL allows access a user with the given principals, return an instance
+ of :class:`pyramid.security.ACLDenied` if not.
+
+ When checking if principals are allowed, the security policy consults
+ the ``context`` for an ACL first. If no ACL exists on the context, or
+ one does exist but the ACL does not explicitly allow or deny access for
+ any of the effective principals, consult the context's parent ACL, and
+ so on, until the lineage is exhausted or we determine that the policy
+ permits or denies.
+
+ During this processing, if any :data:`pyramid.security.Deny`
+ ACE is found matching any principal in ``principals``, stop
+ processing by returning an
+ :class:`pyramid.security.ACLDenied` instance (equals
+ ``False``) immediately. If any
+ :data:`pyramid.security.Allow` ACE is found matching any
+ principal, stop processing by returning an
+ :class:`pyramid.security.ACLAllowed` instance (equals
+ ``True``) immediately. If we exhaust the context's
+ :term:`lineage`, and no ACE has explicitly permitted or denied
+ access, return an instance of
+ :class:`pyramid.security.ACLDenied` (equals ``False``).
"""
- if context is None:
- context = self.context
- reg = _get_registry(self)
- authn_policy = reg.queryUtility(IAuthenticationPolicy)
- if authn_policy is None:
- return Allowed('No authentication policy in use.')
- authz_policy = reg.queryUtility(IAuthorizationPolicy)
- if authz_policy is None:
- raise ValueError(
- 'Authentication policy registered without '
- 'authorization policy'
- ) # should never happen
- principals = authn_policy.effective_principals(self)
- return authz_policy.permits(context, principals, permission)
+ acl = '<No ACL found on any object in resource lineage>'
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ if acl and callable(acl):
+ acl = acl()
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not is_nonstr_iter(ace_permissions):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(
+ ace, acl, permission, principals, location
+ )
+ else:
+ return ACLDenied(
+ ace, acl, permission, principals, location
+ )
+
+ # default deny (if no ACL in lineage at all, or if none of the
+ # principals were mentioned in any ACE we found)
+ return ACLDenied(
+ '<default deny>', acl, permission, principals, context
+ )
+
+ def principals_allowed_by_permission(self, context, permission):
+ """ Return the set of principals explicitly granted the permission
+ named ``permission`` according to the ACL directly attached to the
+ ``context`` as well as inherited ACLs based on the :term:`lineage`.
+
+ When computing principals allowed by a permission, we compute the set
+ of principals that are explicitly granted the ``permission`` in the
+ provided ``context``. We do this by walking 'up' the object graph
+ *from the root* to the context. During this walking process, if we
+ find an explicit :data:`pyramid.security.Allow` ACE for a principal
+ that matches the ``permission``, the principal is included in the allow
+ list. However, if later in the walking process that principal is
+ mentioned in any :data:`pyramid.security.Deny` ACE for the permission,
+ the principal is removed from the allow list. If a
+ :data:`pyramid.security.Deny` to the principal
+ :data:`pyramid.security.Everyone` is encountered during the walking
+ process that matches the ``permission``, the allow list is cleared for
+ all principals encountered in previous ACLs. The walking process ends
+ after we've processed the any ACL directly attached to ``context``; a
+ set of principals is returned.
+
+ """
+ allowed = set()
+
+ for location in reversed(list(lineage(context))):
+ # NB: we're walking *up* the object graph from the root
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed_here = set()
+ denied_here = set()
+
+ if acl and callable(acl):
+ acl = acl()
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if not is_nonstr_iter(ace_permissions):
+ ace_permissions = [ace_permissions]
+ if (ace_action == Allow) and (permission in ace_permissions):
+ if ace_principal not in denied_here:
+ allowed_here.add(ace_principal)
+ if (ace_action == Deny) and (permission in ace_permissions):
+ denied_here.add(ace_principal)
+ if ace_principal == Everyone:
+ # clear the entire allowed set, as we've hit a
+ # deny of Everyone ala (Deny, Everyone, ALL)
+ allowed = set()
+ break
+ elif ace_principal in allowed:
+ allowed.remove(ace_principal)
+
+ allowed.update(allowed_here)
+
+ return allowed
+
+
+class SessionAuthenticationHelper:
+ """ A helper for use with a :term:`security policy` which stores user data
+ in the configured :term:`session`.
+
+ Constructor Arguments
+
+ ``prefix``
+
+ A prefix used when storing the authentication parameters in the
+ session. Defaults to 'auth.'. Optional.
+
+ """
+
+ def __init__(self, prefix='auth.'):
+ self.userid_key = prefix + 'userid'
+
+ def remember(self, request, userid, **kw):
+ """ Store a userid in the session."""
+ request.session[self.userid_key] = userid
+ return []
+
+ def forget(self, request):
+ """ Remove the stored userid from the session."""
+ if self.userid_key in request.session:
+ del request.session[self.userid_key]
+ return []
+
+ def identify(self, request):
+ return request.session.get(self.userid_key)
+
+
+def b64encode(v):
+ return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'')
+
+
+def b64decode(v):
+ return base64.b64decode(bytes_(v))
+
+
+# this class licensed under the MIT license (stolen from Paste)
+class AuthTicket(object):
+ """
+ This class represents an authentication token. You must pass in
+ the shared secret, the userid, and the IP address. Optionally you
+ can include tokens (a list of strings, representing role names),
+ 'user_data', which is arbitrary data available for your own use in
+ later scripts. Lastly, you can override the cookie name and
+ timestamp.
+
+ Once you provide all the arguments, use .cookie_value() to
+ generate the appropriate authentication ticket.
+
+ Usage::
+
+ token = AuthTicket('sharedsecret', 'username',
+ os.environ['REMOTE_ADDR'], tokens=['admin'])
+ val = token.cookie_value()
+
+ """
+
+ def __init__(
+ self,
+ secret,
+ userid,
+ ip,
+ tokens=(),
+ user_data='',
+ time=None,
+ cookie_name='auth_tkt',
+ secure=False,
+ hashalg='md5',
+ ):
+ self.secret = secret
+ self.userid = userid
+ self.ip = ip
+ self.tokens = ','.join(tokens)
+ self.user_data = user_data
+ if time is None:
+ self.time = time_mod.time()
+ else:
+ self.time = time
+ self.cookie_name = cookie_name
+ self.secure = secure
+ self.hashalg = hashalg
+
+ def digest(self):
+ return calculate_digest(
+ self.ip,
+ self.time,
+ self.secret,
+ self.userid,
+ self.tokens,
+ self.user_data,
+ self.hashalg,
+ )
+
+ def cookie_value(self):
+ v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid))
+ if self.tokens:
+ v += self.tokens + '!'
+ v += self.user_data
+ return v
+
+
+# this class licensed under the MIT license (stolen from Paste)
+class BadTicket(Exception):
+ """
+ Exception raised when a ticket can't be parsed. If we get far enough to
+ determine what the expected digest should have been, expected is set.
+ This should not be shown by default, but can be useful for debugging.
+ """
+
+ def __init__(self, msg, expected=None):
+ self.expected = expected
+ Exception.__init__(self, msg)
+
+
+# this function licensed under the MIT license (stolen from Paste)
+def parse_ticket(secret, ticket, ip, hashalg='md5'):
+ """
+ Parse the ticket, returning (timestamp, userid, tokens, user_data).
+
+ If the ticket cannot be parsed, a ``BadTicket`` exception will be raised
+ with an explanation.
+ """
+ ticket = text_(ticket).strip('"')
+ digest_size = hashlib.new(hashalg).digest_size * 2
+ digest = ticket[:digest_size]
+ try:
+ timestamp = int(ticket[digest_size : digest_size + 8], 16)
+ except ValueError as e:
+ raise BadTicket('Timestamp is not a hex integer: %s' % e)
+ try:
+ userid, data = ticket[digest_size + 8 :].split('!', 1)
+ except ValueError:
+ raise BadTicket('userid is not followed by !')
+ userid = unquote(userid)
+ if '!' in data:
+ tokens, user_data = data.split('!', 1)
+ else: # pragma: no cover (never generated)
+ # @@: Is this the right order?
+ tokens = ''
+ user_data = data
+
+ expected = calculate_digest(
+ ip, timestamp, secret, userid, tokens, user_data, hashalg
+ )
+
+ # Avoid timing attacks (see
+ # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
+ if strings_differ(expected, digest):
+ raise BadTicket(
+ 'Digest signature is not correct', expected=(expected, digest)
+ )
+
+ tokens = tokens.split(',')
+
+ return (timestamp, userid, tokens, user_data)
+
+
+# this function licensed under the MIT license (stolen from Paste)
+def calculate_digest(
+ ip, timestamp, secret, userid, tokens, user_data, hashalg='md5'
+):
+ secret = bytes_(secret, 'utf-8')
+ userid = bytes_(userid, 'utf-8')
+ tokens = bytes_(tokens, 'utf-8')
+ user_data = bytes_(user_data, 'utf-8')
+ hash_obj = hashlib.new(hashalg)
+
+ # Check to see if this is an IPv6 address
+ if ':' in ip:
+ ip_timestamp = ip + str(int(timestamp))
+ ip_timestamp = bytes_(ip_timestamp)
+ else:
+ # encode_ip_timestamp not required, left in for backwards compatibility
+ ip_timestamp = encode_ip_timestamp(ip, timestamp)
+
+ hash_obj.update(
+ ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data
+ )
+ digest = hash_obj.hexdigest()
+ hash_obj2 = hashlib.new(hashalg)
+ hash_obj2.update(bytes_(digest) + secret)
+ return hash_obj2.hexdigest()
+
+
+# this function licensed under the MIT license (stolen from Paste)
+def encode_ip_timestamp(ip, timestamp):
+ ip_chars = ''.join(map(chr, map(int, ip.split('.'))))
+ t = int(timestamp)
+ ts = (
+ (t & 0xFF000000) >> 24,
+ (t & 0xFF0000) >> 16,
+ (t & 0xFF00) >> 8,
+ t & 0xFF,
+ )
+ ts_chars = ''.join(map(chr, ts))
+ return bytes_(ip_chars + ts_chars)
+
+
+class AuthTktCookieHelper:
+ """
+ A helper class used for constructing a :term:`security policy` with stores
+ the user identity in a signed cookie.
+
+ Constructor Arguments
+
+ ``secret``
+
+ The secret (a string) used for auth_tkt cookie signing. This value
+ should be unique across all values provided to Pyramid for various
+ subsystem secrets (see :ref:`admonishment_against_secret_sharing`).
+ Required.
+
+ ``cookie_name``
+
+ Default: ``auth_tkt``. The cookie name used
+ (string). Optional.
+
+ ``secure``
+
+ Default: ``False``. Only send the cookie back over a secure
+ conn. Optional.
+
+ ``include_ip``
+
+ Default: ``False``. Make the requesting IP address part of
+ the authentication data in the cookie. Optional.
+
+ For IPv6 this option is not recommended. The ``mod_auth_tkt``
+ specification does not specify how to handle IPv6 addresses, so using
+ this option in combination with IPv6 addresses may cause an
+ incompatible cookie. It ties the authentication ticket to that
+ individual's IPv6 address.
+
+ ``timeout``
+
+ Default: ``None``. Maximum number of seconds which a newly
+ issued ticket will be considered valid. After this amount of
+ time, the ticket will expire (effectively logging the user
+ out). If this value is ``None``, the ticket never expires.
+ Optional.
+
+ ``reissue_time``
+
+ Default: ``None``. If this parameter is set, it represents the number
+ of seconds that must pass before an authentication token cookie is
+ automatically reissued as the result of a request which requires
+ authentication. The duration is measured as the number of seconds
+ since the last auth_tkt cookie was issued and 'now'. If this value is
+ ``0``, a new ticket cookie will be reissued on every request which
+ requires authentication.
+
+ A good rule of thumb: if you want auto-expired cookies based on
+ inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
+ ``reissue_time`` value to perhaps a tenth of the ``timeout`` value
+ (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower
+ than the ``reissue_time`` value, as the ticket will never be reissued
+ if so. However, such a configuration is not explicitly prevented.
+
+ Optional.
+
+ ``max_age``
+
+ Default: ``None``. The max age of the auth_tkt cookie, in
+ seconds. This differs from ``timeout`` inasmuch as ``timeout``
+ represents the lifetime of the ticket contained in the cookie,
+ while this value represents the lifetime of the cookie itself.
+ When this value is set, the cookie's ``Max-Age`` and
+ ``Expires`` settings will be set, allowing the auth_tkt cookie
+ to last between browser sessions. It is typically nonsensical
+ to set this to a value that is lower than ``timeout`` or
+ ``reissue_time``, although it is not explicitly prevented.
+ Optional.
+
+ ``path``
+
+ Default: ``/``. The path for which the auth_tkt cookie is valid.
+ May be desirable if the application only serves part of a domain.
+ Optional.
+
+ ``http_only``
+
+ Default: ``False``. Hide cookie from JavaScript by setting the
+ HttpOnly flag. Not honored by all browsers.
+ Optional.
+
+ ``wild_domain``
+
+ Default: ``True``. An auth_tkt cookie will be generated for the
+ wildcard domain. If your site is hosted as ``example.com`` this
+ will make the cookie available for sites underneath ``example.com``
+ such as ``www.example.com``.
+ Optional.
+
+ ``parent_domain``
+
+ Default: ``False``. An auth_tkt cookie will be generated for the
+ parent domain of the current site. For example if your site is
+ hosted under ``www.example.com`` a cookie will be generated for
+ ``.example.com``. This can be useful if you have multiple sites
+ sharing the same domain. This option supercedes the ``wild_domain``
+ option.
+ Optional.
+
+ ``domain``
+
+ Default: ``None``. If provided the auth_tkt cookie will only be
+ set for this domain. This option is not compatible with ``wild_domain``
+ and ``parent_domain``.
+ Optional.
+
+ ``hashalg``
+
+ Default: ``sha512`` (the literal string).
+
+ Any hash algorithm supported by Python's ``hashlib.new()`` function
+ can be used as the ``hashalg``.
+
+ Cookies generated by different instances of AuthTktAuthenticationPolicy
+ using different ``hashalg`` options are not compatible. Switching the
+ ``hashalg`` will imply that all existing users with a valid cookie will
+ be required to re-login.
+
+ Optional.
+
+ ``samesite``
+
+ Default: ``'Lax'``. The 'samesite' option of the session cookie. Set
+ the value to ``None`` to turn off the samesite option.
+
+ This option is available as of :app:`Pyramid` 1.10.
+ """
+
+ parse_ticket = staticmethod(parse_ticket) # for tests
+ AuthTicket = AuthTicket # for tests
+ BadTicket = BadTicket # for tests
+ now = None # for tests
+
+ userid_type_decoders = {
+ 'int': int,
+ 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies
+ 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
+ 'b64str': lambda x: b64decode(x),
+ }
+
+ userid_type_encoders = {
+ int: ('int', str),
+ str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
+ bytes: ('b64str', lambda x: b64encode(x)),
+ }
+
+ def __init__(
+ self,
+ secret,
+ cookie_name='auth_tkt',
+ secure=False,
+ include_ip=False,
+ timeout=None,
+ reissue_time=None,
+ max_age=None,
+ http_only=False,
+ path="/",
+ wild_domain=True,
+ hashalg='md5',
+ parent_domain=False,
+ domain=None,
+ samesite='Lax',
+ ):
+ self.cookie_profile = CookieProfile(
+ cookie_name=cookie_name,
+ secure=secure,
+ max_age=max_age,
+ httponly=http_only,
+ path=path,
+ serializer=SimpleSerializer(),
+ samesite=samesite,
+ )
+
+ self.secret = secret
+ self.cookie_name = cookie_name
+ self.secure = secure
+ self.include_ip = include_ip
+ self.timeout = timeout if timeout is None else int(timeout)
+ self.reissue_time = (
+ reissue_time if reissue_time is None else int(reissue_time)
+ )
+ self.max_age = max_age if max_age is None else int(max_age)
+ self.wild_domain = wild_domain
+ self.parent_domain = parent_domain
+ self.domain = domain
+ self.hashalg = hashalg
+
+ def _get_cookies(self, request, value, max_age=None):
+ cur_domain = request.domain
+
+ domains = []
+ if self.domain:
+ domains.append(self.domain)
+ else:
+ if self.parent_domain and cur_domain.count('.') > 1:
+ domains.append('.' + cur_domain.split('.', 1)[1])
+ else:
+ domains.append(None)
+ domains.append(cur_domain)
+ if self.wild_domain:
+ domains.append('.' + cur_domain)
+
+ profile = self.cookie_profile(request)
+
+ kw = {}
+ kw['domains'] = domains
+ if max_age is not None:
+ kw['max_age'] = max_age
+
+ headers = profile.get_headers(value, **kw)
+ return headers
+
+ def identify(self, request):
+ """ Return a dictionary with authentication information, or ``None``
+ if no valid auth_tkt is attached to ``request``"""
+ environ = request.environ
+ cookie = request.cookies.get(self.cookie_name)
+
+ if cookie is None:
+ return None
+
+ if self.include_ip:
+ remote_addr = environ['REMOTE_ADDR']
+ else:
+ remote_addr = '0.0.0.0'
+
+ try:
+ timestamp, userid, tokens, user_data = self.parse_ticket(
+ self.secret, cookie, remote_addr, self.hashalg
+ )
+ except self.BadTicket:
+ return None
+
+ now = self.now # service tests
+
+ if now is None:
+ now = time_mod.time()
+
+ if self.timeout and ((timestamp + self.timeout) < now):
+ # the auth_tkt data has expired
+ return None
+
+ userid_typename = 'userid_type:'
+ user_data_info = user_data.split('|')
+ for datum in filter(None, user_data_info):
+ if datum.startswith(userid_typename):
+ userid_type = datum[len(userid_typename) :]
+ decoder = self.userid_type_decoders.get(userid_type)
+ if decoder:
+ userid = decoder(userid)
+
+ reissue = self.reissue_time is not None
+
+ if reissue and not hasattr(request, '_authtkt_reissued'):
+ if (now - timestamp) > self.reissue_time:
+ # See https://github.com/Pylons/pyramid/issues#issue/108
+ tokens = list(filter(None, tokens))
+ headers = self.remember(
+ request, userid, max_age=self.max_age, tokens=tokens
+ )
+
+ def reissue_authtkt(request, response):
+ if not hasattr(request, '_authtkt_reissue_revoked'):
+ for k, v in headers:
+ response.headerlist.append((k, v))
+
+ request.add_response_callback(reissue_authtkt)
+ request._authtkt_reissued = True
+
+ environ['REMOTE_USER_TOKENS'] = tokens
+ environ['REMOTE_USER_DATA'] = user_data
+ environ['AUTH_TYPE'] = 'cookie'
+
+ identity = {}
+ identity['timestamp'] = timestamp
+ identity['userid'] = userid
+ identity['tokens'] = tokens
+ identity['userdata'] = user_data
+ return identity
+
+ def forget(self, request):
+ """ Return a set of expires Set-Cookie headers, which will destroy
+ any existing auth_tkt cookie when attached to a response"""
+ request._authtkt_reissue_revoked = True
+ return self._get_cookies(request, None)
+
+ def remember(self, request, userid, max_age=None, tokens=()):
+ """ Return a set of Set-Cookie headers; when set into a response,
+ these headers will represent a valid authentication ticket.
+
+ ``max_age``
+ The max age of the auth_tkt cookie, in seconds. When this value is
+ set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
+ allowing the auth_tkt cookie to last between browser sessions. If
+ this value is ``None``, the ``max_age`` value provided to the
+ helper itself will be used as the ``max_age`` value. Default:
+ ``None``.
+
+ ``tokens``
+ A sequence of strings that will be placed into the auth_tkt tokens
+ field. Each string in the sequence must be of the Python ``str``
+ type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
+ Tokens are available in the returned identity when an auth_tkt is
+ found in the request and unpacked. Default: ``()``.
+ """
+ max_age = self.max_age if max_age is None else int(max_age)
+
+ environ = request.environ
+
+ if self.include_ip:
+ remote_addr = environ['REMOTE_ADDR']
+ else:
+ remote_addr = '0.0.0.0'
+
+ user_data = ''
+
+ encoding_data = self.userid_type_encoders.get(type(userid))
+
+ if encoding_data:
+ encoding, encoder = encoding_data
+ else:
+ warnings.warn(
+ "userid is of type {}, and is not supported by the "
+ "AuthTktAuthenticationPolicy. Explicitly converting to string "
+ "and storing as base64. Subsequent requests will receive a "
+ "string as the userid, it will not be decoded back to the "
+ "type provided.".format(type(userid)),
+ RuntimeWarning,
+ )
+ encoding, encoder = self.userid_type_encoders.get(str)
+ userid = str(userid)
+
+ userid = encoder(userid)
+ user_data = 'userid_type:%s' % encoding
+
+ new_tokens = []
+ for token in tokens:
+ if isinstance(token, str):
+ try:
+ token = ascii_(token)
+ except UnicodeEncodeError:
+ raise ValueError("Invalid token %r" % (token,))
+ if not (isinstance(token, str) and VALID_TOKEN.match(token)):
+ raise ValueError("Invalid token %r" % (token,))
+ new_tokens.append(token)
+ tokens = tuple(new_tokens)
+
+ if hasattr(request, '_authtkt_reissued'):
+ request._authtkt_reissue_revoked = True
+
+ ticket = self.AuthTicket(
+ self.secret,
+ userid,
+ remote_addr,
+ tokens=tokens,
+ user_data=user_data,
+ cookie_name=self.cookie_name,
+ secure=self.secure,
+ hashalg=self.hashalg,
+ )
+
+ cookie_value = ticket.cookie_value()
+ return self._get_cookies(request, cookie_value, max_age)
diff --git a/src/pyramid/testing.py b/src/pyramid/testing.py
index ffddd233f..4bf6d281f 100644
--- a/src/pyramid/testing.py
+++ b/src/pyramid/testing.py
@@ -14,12 +14,7 @@ from pyramid.path import caller_package
from pyramid.response import _get_response_factory
from pyramid.registry import Registry
-from pyramid.security import (
- Authenticated,
- Everyone,
- AuthenticationAPIMixin,
- AuthorizationAPIMixin,
-)
+from pyramid.security import SecurityAPIMixin, AuthenticationAPIMixin
from pyramid.threadlocal import get_current_registry, manager
@@ -43,18 +38,16 @@ class DummyRootFactory(object):
class DummySecurityPolicy(object):
- """ A standin for both an IAuthentication and IAuthorization policy """
+ """ A standin for a security policy"""
def __init__(
self,
- userid=None,
- groupids=(),
+ identity=None,
permissive=True,
remember_result=None,
forget_result=None,
):
- self.userid = userid
- self.groupids = groupids
+ self.identity = identity
self.permissive = permissive
if remember_result is None:
remember_result = []
@@ -63,19 +56,11 @@ class DummySecurityPolicy(object):
self.remember_result = remember_result
self.forget_result = forget_result
- def authenticated_userid(self, request):
- return self.userid
-
- def unauthenticated_userid(self, request):
- return self.userid
+ def identify(self, request):
+ return self.identity
- def effective_principals(self, request):
- effective_principals = [Everyone]
- if self.userid:
- effective_principals.append(Authenticated)
- effective_principals.append(self.userid)
- effective_principals.extend(self.groupids)
- return effective_principals
+ def permits(self, request, context, identity, permission):
+ return self.permissive
def remember(self, request, userid, **kw):
self.remembered = userid
@@ -85,15 +70,6 @@ class DummySecurityPolicy(object):
self.forgotten = True
return self.forget_result
- def permits(self, context, principals, permission):
- return self.permissive
-
- def principals_allowed_by_permission(self, context, permission):
- if self.permissive:
- return self.effective_principals(None)
- else:
- return []
-
class DummyTemplateRenderer(object):
"""
@@ -303,8 +279,8 @@ class DummyRequest(
CallbackMethodsMixin,
InstancePropertyMixin,
LocalizerRequestMixin,
+ SecurityAPIMixin,
AuthenticationAPIMixin,
- AuthorizationAPIMixin,
ViewMethodsMixin,
):
""" A DummyRequest object (incompletely) imitates a :term:`request` object.
diff --git a/src/pyramid/viewderivers.py b/src/pyramid/viewderivers.py
index 181cc9e5c..22659d2a3 100644
--- a/src/pyramid/viewderivers.py
+++ b/src/pyramid/viewderivers.py
@@ -7,12 +7,11 @@ from pyramid.csrf import check_csrf_origin, check_csrf_token
from pyramid.response import Response
from pyramid.interfaces import (
- IAuthenticationPolicy,
- IAuthorizationPolicy,
IDefaultCSRFOptions,
IDefaultPermission,
IDebugLogger,
IResponse,
+ ISecurityPolicy,
IViewMapper,
IViewMapperFactory,
)
@@ -308,19 +307,17 @@ def _secured_view(view, info):
# permission, replacing it with no permission at all
permission = None
- wrapped_view = view
- authn_policy = info.registry.queryUtility(IAuthenticationPolicy)
- authz_policy = info.registry.queryUtility(IAuthorizationPolicy)
+ policy = info.registry.queryUtility(ISecurityPolicy)
# no-op on exception-only views without an explicit permission
if explicit_val is None and info.exception_only:
return view
- if authn_policy and authz_policy and (permission is not None):
+ if policy and (permission is not None):
def permitted(context, request):
- principals = authn_policy.effective_principals(request)
- return authz_policy.permits(context, principals, permission)
+ identity = policy.identify(request)
+ return policy.permits(request, context, identity, permission)
def secured_view(context, request):
result = permitted(context, request)
@@ -334,12 +331,12 @@ def _secured_view(view, info):
)
raise HTTPForbidden(msg, result=result)
- wrapped_view = secured_view
- wrapped_view.__call_permissive__ = view
- wrapped_view.__permitted__ = permitted
- wrapped_view.__permission__ = permission
-
- return wrapped_view
+ secured_view.__call_permissive__ = view
+ secured_view.__permitted__ = permitted
+ secured_view.__permission__ = permission
+ return secured_view
+ else:
+ return view
def _authdebug_view(view, info):
@@ -348,8 +345,7 @@ def _authdebug_view(view, info):
permission = explicit_val = info.options.get('permission')
if permission is None:
permission = info.registry.queryUtility(IDefaultPermission)
- authn_policy = info.registry.queryUtility(IAuthenticationPolicy)
- authz_policy = info.registry.queryUtility(IAuthorizationPolicy)
+ policy = info.registry.queryUtility(ISecurityPolicy)
logger = info.registry.queryUtility(IDebugLogger)
# no-op on exception-only views without an explicit permission
@@ -361,18 +357,18 @@ def _authdebug_view(view, info):
def authdebug_view(context, request):
view_name = getattr(request, 'view_name', None)
- if authn_policy and authz_policy:
+ if policy:
if permission is NO_PERMISSION_REQUIRED:
msg = 'Allowed (NO_PERMISSION_REQUIRED)'
elif permission is None:
msg = 'Allowed (no permission registered)'
else:
- principals = authn_policy.effective_principals(request)
+ identity = policy.identify(request)
msg = str(
- authz_policy.permits(context, principals, permission)
+ policy.permits(request, context, identity, permission)
)
else:
- msg = 'Allowed (no authorization policy in use)'
+ msg = 'Allowed (no security policy in use)'
view_name = getattr(request, 'view_name', None)
url = getattr(request, 'url', None)
diff --git a/tests/pkgs/defpermbugapp/__init__.py b/tests/pkgs/defpermbugapp/__init__.py
index 81897e86a..af78404ae 100644
--- a/tests/pkgs/defpermbugapp/__init__.py
+++ b/tests/pkgs/defpermbugapp/__init__.py
@@ -25,6 +25,6 @@ def includeme(config):
authn_policy = AuthTktAuthenticationPolicy('seekt1t', hashalg='sha512')
authz_policy = ACLAuthorizationPolicy()
config.scan('tests.pkgs.defpermbugapp')
- config._set_authentication_policy(authn_policy)
- config._set_authorization_policy(authz_policy)
+ config.set_authentication_policy(authn_policy)
+ config.set_authorization_policy(authz_policy)
config.set_default_permission('private')
diff --git a/tests/pkgs/forbiddenapp/__init__.py b/tests/pkgs/forbiddenapp/__init__.py
index 31ea4dd52..79670dd32 100644
--- a/tests/pkgs/forbiddenapp/__init__.py
+++ b/tests/pkgs/forbiddenapp/__init__.py
@@ -22,7 +22,7 @@ def includeme(config):
authn_policy = AuthTktAuthenticationPolicy('seekr1t', hashalg='sha512')
authz_policy = ACLAuthorizationPolicy()
- config._set_authentication_policy(authn_policy)
- config._set_authorization_policy(authz_policy)
+ config.set_authentication_policy(authn_policy)
+ config.set_authorization_policy(authz_policy)
config.add_view(x_view, name='x', permission='private')
config.add_view(forbidden_view, context=HTTPForbidden)
diff --git a/tests/pkgs/staticpermapp/__init__.py b/tests/pkgs/staticpermapp/__init__.py
index ffc87d39a..a12eac2d3 100644
--- a/tests/pkgs/staticpermapp/__init__.py
+++ b/tests/pkgs/staticpermapp/__init__.py
@@ -18,8 +18,8 @@ def includeme(config):
authn_policy = RemoteUserAuthenticationPolicy()
authz_policy = ACLAuthorizationPolicy()
- config._set_authentication_policy(authn_policy)
- config._set_authorization_policy(authz_policy)
+ config.set_authentication_policy(authn_policy)
+ config.set_authorization_policy(authz_policy)
config.add_static_view('allowed', 'tests:fixtures/static/')
config.add_static_view(
'protected', 'tests:fixtures/static/', permission='view'
diff --git a/tests/test_authentication.py b/tests/test_authentication.py
index 8671eba05..89cf9866d 100644
--- a/tests/test_authentication.py
+++ b/tests/test_authentication.py
@@ -1,8 +1,7 @@
-from http.cookies import SimpleCookie
import unittest
import warnings
from pyramid import testing
-from pyramid.util import text_, bytes_
+from pyramid.util import bytes_
class TestCallbackAuthenticationPolicyDebugging(unittest.TestCase):
@@ -664,926 +663,6 @@ class TestAuthTktAuthenticationPolicy(unittest.TestCase):
verifyObject(IAuthenticationPolicy, self._makeOne(None, None))
-class TestAuthTktCookieHelper(unittest.TestCase):
- def _getTargetClass(self):
- from pyramid.authentication import AuthTktCookieHelper
-
- return AuthTktCookieHelper
-
- def _makeOne(self, *arg, **kw):
- helper = self._getTargetClass()(*arg, **kw)
- # laziness after moving auth_tkt classes and funcs into
- # authentication module
- auth_tkt = DummyAuthTktModule()
- helper.auth_tkt = auth_tkt
- helper.AuthTicket = auth_tkt.AuthTicket
- helper.parse_ticket = auth_tkt.parse_ticket
- helper.BadTicket = auth_tkt.BadTicket
- return helper
-
- def _makeRequest(self, cookie=None, ipv6=False):
- environ = {'wsgi.version': (1, 0)}
-
- if ipv6 is False:
- environ['REMOTE_ADDR'] = '1.1.1.1'
- else:
- environ['REMOTE_ADDR'] = '::1'
- environ['SERVER_NAME'] = 'localhost'
- return DummyRequest(environ, cookie=cookie)
-
- def _cookieValue(self, cookie):
- items = cookie.value.split('/')
- D = {}
- for item in items:
- k, v = item.split('=', 1)
- D[k] = v
- return D
-
- def _parseHeaders(self, headers):
- return [self._parseHeader(header) for header in headers]
-
- def _parseHeader(self, header):
- cookie = self._parseCookie(header[1])
- return cookie
-
- def _parseCookie(self, cookie):
- cookies = SimpleCookie()
- cookies.load(cookie)
- return cookies.get('auth_tkt')
-
- def test_init_cookie_str_reissue_invalid(self):
- self.assertRaises(
- ValueError, self._makeOne, 'secret', reissue_time='invalid value'
- )
-
- def test_init_cookie_str_timeout_invalid(self):
- self.assertRaises(
- ValueError, self._makeOne, 'secret', timeout='invalid value'
- )
-
- def test_init_cookie_str_max_age_invalid(self):
- self.assertRaises(
- ValueError, self._makeOne, 'secret', max_age='invalid value'
- )
-
- def test_identify_nocookie(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.identify(request)
- self.assertEqual(result, None)
-
- def test_identify_cookie_value_is_None(self):
- helper = self._makeOne('secret')
- request = self._makeRequest(None)
- result = helper.identify(request)
- self.assertEqual(result, None)
-
- def test_identify_good_cookie_include_ip(self):
- helper = self._makeOne('secret', include_ip=True)
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], 'userid')
- self.assertEqual(result['userdata'], '')
- self.assertEqual(result['timestamp'], 0)
- self.assertEqual(helper.auth_tkt.value, 'ticket')
- self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1')
- self.assertEqual(helper.auth_tkt.secret, 'secret')
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], '')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_good_cookie_include_ipv6(self):
- helper = self._makeOne('secret', include_ip=True)
- request = self._makeRequest('ticket', ipv6=True)
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], 'userid')
- self.assertEqual(result['userdata'], '')
- self.assertEqual(result['timestamp'], 0)
- self.assertEqual(helper.auth_tkt.value, 'ticket')
- self.assertEqual(helper.auth_tkt.remote_addr, '::1')
- self.assertEqual(helper.auth_tkt.secret, 'secret')
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], '')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_good_cookie_dont_include_ip(self):
- helper = self._makeOne('secret', include_ip=False)
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], 'userid')
- self.assertEqual(result['userdata'], '')
- self.assertEqual(result['timestamp'], 0)
- self.assertEqual(helper.auth_tkt.value, 'ticket')
- self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0')
- self.assertEqual(helper.auth_tkt.secret, 'secret')
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], '')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_good_cookie_int_useridtype(self):
- helper = self._makeOne('secret', include_ip=False)
- helper.auth_tkt.userid = '1'
- helper.auth_tkt.user_data = 'userid_type:int'
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], 1)
- self.assertEqual(result['userdata'], 'userid_type:int')
- self.assertEqual(result['timestamp'], 0)
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_nonuseridtype_user_data(self):
- helper = self._makeOne('secret', include_ip=False)
- helper.auth_tkt.userid = '1'
- helper.auth_tkt.user_data = 'bogus:int'
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], '1')
- self.assertEqual(result['userdata'], 'bogus:int')
- self.assertEqual(result['timestamp'], 0)
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_good_cookie_unknown_useridtype(self):
- helper = self._makeOne('secret', include_ip=False)
- helper.auth_tkt.userid = 'abc'
- helper.auth_tkt.user_data = 'userid_type:unknown'
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], 'abc')
- self.assertEqual(result['userdata'], 'userid_type:unknown')
- self.assertEqual(result['timestamp'], 0)
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_good_cookie_b64str_useridtype(self):
- from base64 import b64encode
-
- helper = self._makeOne('secret', include_ip=False)
- helper.auth_tkt.userid = b64encode(b'encoded').strip()
- helper.auth_tkt.user_data = 'userid_type:b64str'
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], b'encoded')
- self.assertEqual(result['userdata'], 'userid_type:b64str')
- self.assertEqual(result['timestamp'], 0)
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_good_cookie_b64unicode_useridtype(self):
- from base64 import b64encode
-
- helper = self._makeOne('secret', include_ip=False)
- helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip()
- helper.auth_tkt.user_data = 'userid_type:b64unicode'
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(len(result), 4)
- self.assertEqual(result['tokens'], ())
- self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8'))
- self.assertEqual(result['userdata'], 'userid_type:b64unicode')
- self.assertEqual(result['timestamp'], 0)
- environ = request.environ
- self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
- self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode')
- self.assertEqual(environ['AUTH_TYPE'], 'cookie')
-
- def test_identify_bad_cookie(self):
- helper = self._makeOne('secret', include_ip=True)
- helper.auth_tkt.parse_raise = True
- request = self._makeRequest('ticket')
- result = helper.identify(request)
- self.assertEqual(result, None)
-
- def test_identify_cookie_timeout(self):
- helper = self._makeOne('secret', timeout=1)
- self.assertEqual(helper.timeout, 1)
-
- def test_identify_cookie_str_timeout(self):
- helper = self._makeOne('secret', timeout='1')
- self.assertEqual(helper.timeout, 1)
-
- def test_identify_cookie_timeout_aged(self):
- import time
-
- helper = self._makeOne('secret', timeout=10)
- now = time.time()
- helper.auth_tkt.timestamp = now - 1
- helper.now = now + 10
- helper.auth_tkt.tokens = (text_('a'),)
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertFalse(result)
-
- def test_identify_cookie_reissue(self):
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time=0)
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- helper.auth_tkt.tokens = (text_('a'),)
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- response = DummyResponse()
- request.callbacks[0](request, response)
- self.assertEqual(len(response.headerlist), 3)
- self.assertEqual(response.headerlist[0][0], 'Set-Cookie')
-
- def test_identify_cookie_str_reissue(self):
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time='0')
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- helper.auth_tkt.tokens = (text_('a'),)
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- response = DummyResponse()
- request.callbacks[0](request, response)
- self.assertEqual(len(response.headerlist), 3)
- self.assertEqual(response.headerlist[0][0], 'Set-Cookie')
-
- def test_identify_cookie_reissue_already_reissued_this_request(self):
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time=0)
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- request = self._makeRequest('bogus')
- request._authtkt_reissued = True
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 0)
-
- def test_identify_cookie_reissue_notyet(self):
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time=10)
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 0)
-
- def test_identify_cookie_reissue_revoked_by_forget(self):
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time=0)
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- result = helper.forget(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- response = DummyResponse()
- request.callbacks[0](request, response)
- self.assertEqual(len(response.headerlist), 0)
-
- def test_identify_cookie_reissue_revoked_by_remember(self):
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time=0)
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- result = helper.remember(request, 'bob')
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- response = DummyResponse()
- request.callbacks[0](request, response)
- self.assertEqual(len(response.headerlist), 0)
-
- def test_identify_cookie_reissue_with_tokens_default(self):
- # see https://github.com/Pylons/pyramid/issues#issue/108
- import time
-
- helper = self._makeOne('secret', timeout=10, reissue_time=0)
- auth_tkt = DummyAuthTktModule(tokens=[''])
- helper.auth_tkt = auth_tkt
- helper.AuthTicket = auth_tkt.AuthTicket
- helper.parse_ticket = auth_tkt.parse_ticket
- helper.BadTicket = auth_tkt.BadTicket
- now = time.time()
- helper.auth_tkt.timestamp = now
- helper.now = now + 1
- request = self._makeRequest('bogus')
- result = helper.identify(request)
- self.assertTrue(result)
- self.assertEqual(len(request.callbacks), 1)
- response = DummyResponse()
- request.callbacks[0](None, response)
- self.assertEqual(len(response.headerlist), 3)
- self.assertEqual(response.headerlist[0][0], 'Set-Cookie')
- self.assertTrue("/tokens=/" in response.headerlist[0][1])
-
- def test_remember(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, 'userid')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax'))
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue(
- result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax')
- )
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue(
- result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax')
- )
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_nondefault_samesite(self):
- helper = self._makeOne('secret', samesite='Strict')
- request = self._makeRequest()
- result = helper.remember(request, 'userid')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict'))
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue(
- result[1][1].endswith(
- '; Domain=localhost; Path=/; SameSite=Strict'
- )
- )
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue(
- result[2][1].endswith(
- '; Domain=.localhost; Path=/; SameSite=Strict'
- )
- )
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_None_samesite(self):
- helper = self._makeOne('secret', samesite=None)
- request = self._makeRequest()
- result = helper.remember(request, 'userid')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/'))
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/'))
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_include_ip(self):
- helper = self._makeOne('secret', include_ip=True)
- request = self._makeRequest()
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax'))
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue(
- result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax')
- )
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue(
- result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax')
- )
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_path(self):
- helper = self._makeOne(
- 'secret', include_ip=True, path="/cgi-bin/app.cgi/"
- )
- request = self._makeRequest()
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(
- result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax')
- )
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue(
- result[1][1].endswith(
- '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax'
- )
- )
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue(
- result[2][1].endswith(
- '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax'
- )
- )
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_http_only(self):
- helper = self._makeOne('secret', include_ip=True, http_only=True)
- request = self._makeRequest()
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax'))
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue('; HttpOnly' in result[1][1])
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue('; HttpOnly' in result[2][1])
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_secure(self):
- helper = self._makeOne('secret', include_ip=True, secure=True)
- request = self._makeRequest()
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue('; secure' in result[0][1])
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue('; secure' in result[1][1])
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue('; secure' in result[2][1])
- self.assertTrue(result[2][1].startswith('auth_tkt='))
-
- def test_remember_wild_domain_disabled(self):
- helper = self._makeOne('secret', wild_domain=False)
- request = self._makeRequest()
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 2)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax'))
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue(
- result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax')
- )
- self.assertTrue(result[1][1].startswith('auth_tkt='))
-
- def test_remember_parent_domain(self):
- helper = self._makeOne('secret', parent_domain=True)
- request = self._makeRequest()
- request.domain = 'www.example.com'
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 1)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(
- result[0][1].endswith(
- '; Domain=.example.com; Path=/; SameSite=Lax'
- )
- )
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- def test_remember_parent_domain_supercedes_wild_domain(self):
- helper = self._makeOne('secret', parent_domain=True, wild_domain=True)
- request = self._makeRequest()
- request.domain = 'www.example.com'
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 1)
- self.assertTrue(
- result[0][1].endswith(
- '; Domain=.example.com; Path=/; SameSite=Lax'
- )
- )
-
- def test_remember_explicit_domain(self):
- helper = self._makeOne('secret', domain='pyramid.bazinga')
- request = self._makeRequest()
- request.domain = 'www.example.com'
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 1)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue(
- result[0][1].endswith(
- '; Domain=pyramid.bazinga; Path=/; SameSite=Lax'
- )
- )
- self.assertTrue(result[0][1].startswith('auth_tkt='))
-
- def test_remember_domain_supercedes_parent_and_wild_domain(self):
- helper = self._makeOne(
- 'secret',
- domain='pyramid.bazinga',
- parent_domain=True,
- wild_domain=True,
- )
- request = self._makeRequest()
- request.domain = 'www.example.com'
- result = helper.remember(request, 'other')
- self.assertEqual(len(result), 1)
- self.assertTrue(
- result[0][1].endswith(
- '; Domain=pyramid.bazinga; Path=/; SameSite=Lax'
- )
- )
-
- def test_remember_binary_userid(self):
- import base64
-
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, b'userid')
- values = self._parseHeaders(result)
- self.assertEqual(len(result), 3)
- val = self._cookieValue(values[0])
- self.assertEqual(
- val['userid'], text_(base64.b64encode(b'userid').strip())
- )
- self.assertEqual(val['user_data'], 'userid_type:b64str')
-
- def test_remember_int_userid(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, 1)
- values = self._parseHeaders(result)
- self.assertEqual(len(result), 3)
- val = self._cookieValue(values[0])
- self.assertEqual(val['userid'], '1')
- self.assertEqual(val['user_data'], 'userid_type:int')
-
- def test_remember_unicode_userid(self):
- import base64
-
- helper = self._makeOne('secret')
- request = self._makeRequest()
- userid = text_(b'\xc2\xa9', 'utf-8')
- result = helper.remember(request, userid)
- values = self._parseHeaders(result)
- self.assertEqual(len(result), 3)
- val = self._cookieValue(values[0])
- self.assertEqual(
- val['userid'], text_(base64.b64encode(userid.encode('utf-8')))
- )
- self.assertEqual(val['user_data'], 'userid_type:b64unicode')
-
- def test_remember_insane_userid(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- userid = object()
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter('always', RuntimeWarning)
- result = helper.remember(request, userid)
- self.assertTrue(str(w[-1].message).startswith('userid is of type'))
- values = self._parseHeaders(result)
- self.assertEqual(len(result), 3)
- value = values[0]
- self.assertTrue('userid' in value.value)
-
- def test_remember_max_age(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, 'userid', max_age=500)
- values = self._parseHeaders(result)
- self.assertEqual(len(result), 3)
-
- self.assertEqual(values[0]['max-age'], '500')
- self.assertTrue(values[0]['expires'])
-
- def test_remember_str_max_age(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, 'userid', max_age='500')
- values = self._parseHeaders(result)
- self.assertEqual(len(result), 3)
-
- self.assertEqual(values[0]['max-age'], '500')
- self.assertTrue(values[0]['expires'])
-
- def test_remember_str_max_age_invalid(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- self.assertRaises(
- ValueError,
- helper.remember,
- request,
- 'userid',
- max_age='invalid value',
- )
-
- def test_remember_tokens(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, 'other', tokens=('foo', 'bar'))
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- self.assertTrue("/tokens=foo|bar/" in result[0][1])
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- self.assertTrue("/tokens=foo|bar/" in result[1][1])
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- self.assertTrue("/tokens=foo|bar/" in result[2][1])
-
- def test_remember_samesite_nondefault(self):
- helper = self._makeOne('secret', samesite='Strict')
- request = self._makeRequest()
- result = helper.remember(request, 'userid')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- cookieval = result[0][1]
- self.assertTrue(
- 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')],
- cookieval,
- )
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- cookieval = result[1][1]
- self.assertTrue(
- 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')],
- cookieval,
- )
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- cookieval = result[2][1]
- self.assertTrue(
- 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')],
- cookieval,
- )
-
- def test_remember_samesite_default(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- result = helper.remember(request, 'userid')
- self.assertEqual(len(result), 3)
-
- self.assertEqual(result[0][0], 'Set-Cookie')
- cookieval = result[0][1]
- self.assertTrue(
- 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')],
- cookieval,
- )
-
- self.assertEqual(result[1][0], 'Set-Cookie')
- cookieval = result[1][1]
- self.assertTrue(
- 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')],
- cookieval,
- )
-
- self.assertEqual(result[2][0], 'Set-Cookie')
- cookieval = result[2][1]
- self.assertTrue(
- 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')],
- cookieval,
- )
-
- def test_remember_unicode_but_ascii_token(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- la = text_(b'foo', 'utf-8')
- result = helper.remember(request, 'other', tokens=(la,))
- # tokens must be str type on both Python 2 and 3
- self.assertTrue("/tokens=foo/" in result[0][1])
-
- def test_remember_nonascii_token(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- la = text_(b'La Pe\xc3\xb1a', 'utf-8')
- self.assertRaises(
- ValueError, helper.remember, request, 'other', tokens=(la,)
- )
-
- def test_remember_invalid_token_format(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- self.assertRaises(
- ValueError, helper.remember, request, 'other', tokens=('foo bar',)
- )
- self.assertRaises(
- ValueError, helper.remember, request, 'other', tokens=('1bar',)
- )
-
- def test_forget(self):
- helper = self._makeOne('secret')
- request = self._makeRequest()
- headers = helper.forget(request)
- self.assertEqual(len(headers), 3)
- name, value = headers[0]
- self.assertEqual(name, 'Set-Cookie')
- self.assertEqual(
- value,
- 'auth_tkt=; Max-Age=0; Path=/; '
- 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax',
- )
- name, value = headers[1]
- self.assertEqual(name, 'Set-Cookie')
- self.assertEqual(
- value,
- 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; '
- 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax',
- )
- name, value = headers[2]
- self.assertEqual(name, 'Set-Cookie')
- self.assertEqual(
- value,
- 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; '
- 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax',
- )
-
-
-class TestAuthTicket(unittest.TestCase):
- def _makeOne(self, *arg, **kw):
- from pyramid.authentication import AuthTicket
-
- return AuthTicket(*arg, **kw)
-
- def test_ctor_with_tokens(self):
- ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b'))
- self.assertEqual(ticket.tokens, 'a,b')
-
- def test_ctor_with_time(self):
- ticket = self._makeOne('secret', 'userid', 'ip', time='time')
- self.assertEqual(ticket.time, 'time')
-
- def test_digest(self):
- ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10)
- result = ticket.digest()
- self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c')
-
- def test_digest_sha512(self):
- ticket = self._makeOne(
- 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512'
- )
- result = ticket.digest()
- self.assertEqual(
- result,
- '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49'
- '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9'
- 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278',
- )
-
- def test_cookie_value(self):
- ticket = self._makeOne(
- 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b')
- )
- result = ticket.cookie_value()
- self.assertEqual(
- result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!'
- )
-
- def test_ipv4(self):
- ticket = self._makeOne(
- 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256'
- )
- result = ticket.cookie_value()
- self.assertEqual(
- result,
- 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b'
- '798400ecdade8d76c530000000auserid!',
- )
-
- def test_ipv6(self):
- ticket = self._makeOne(
- 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256'
- )
- result = ticket.cookie_value()
- self.assertEqual(
- result,
- 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8'
- '5becf8760cd7a2fa4910000000auserid!',
- )
-
-
-class TestBadTicket(unittest.TestCase):
- def _makeOne(self, msg, expected=None):
- from pyramid.authentication import BadTicket
-
- return BadTicket(msg, expected)
-
- def test_it(self):
- exc = self._makeOne('msg', expected=True)
- self.assertEqual(exc.expected, True)
- self.assertTrue(isinstance(exc, Exception))
-
-
-class Test_parse_ticket(unittest.TestCase):
- def _callFUT(self, secret, ticket, ip, hashalg='md5'):
- from pyramid.authentication import parse_ticket
-
- return parse_ticket(secret, ticket, ip, hashalg)
-
- def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'):
- from pyramid.authentication import BadTicket
-
- self.assertRaises(
- BadTicket, self._callFUT, secret, ticket, ip, hashalg
- )
-
- def test_bad_timestamp(self):
- ticket = 'x' * 64
- self._assertRaisesBadTicket('secret', ticket, 'ip')
-
- def test_bad_userid_or_data(self):
- ticket = 'x' * 32 + '11111111' + 'x' * 10
- self._assertRaisesBadTicket('secret', ticket, 'ip')
-
- def test_digest_sig_incorrect(self):
- ticket = 'x' * 32 + '11111111' + 'a!b!c'
- self._assertRaisesBadTicket('secret', ticket, '0.0.0.0')
-
- def test_correct_with_user_data(self):
- ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!')
- result = self._callFUT('secret', ticket, '0.0.0.0')
- self.assertEqual(result, (10, 'userid', ['a', 'b'], ''))
-
- def test_correct_with_user_data_sha512(self):
- ticket = text_(
- '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1'
- '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767'
- 'ba8a26d02aaeae56599a0000000auserid!a,b!'
- )
- result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512')
- self.assertEqual(result, (10, 'userid', ['a', 'b'], ''))
-
- def test_ipv4(self):
- ticket = text_(
- 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd'
- 'ade8d76c530000000auserid!'
- )
- result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256')
- self.assertEqual(result, (10, 'userid', [''], ''))
-
- def test_ipv6(self):
- ticket = text_(
- 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760'
- 'cd7a2fa4910000000auserid!'
- )
- result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256')
- self.assertEqual(result, (10, 'userid', [''], ''))
-
-
class TestSessionAuthenticationPolicy(unittest.TestCase):
def _getTargetClass(self):
from pyramid.authentication import SessionAuthenticationPolicy
@@ -1911,14 +990,6 @@ class DummyContext:
pass
-class DummyCookies(object):
- def __init__(self, cookie):
- self.cookie = cookie
-
- def get(self, name):
- return self.cookie
-
-
class DummyRequest:
domain = 'localhost'
@@ -1927,10 +998,6 @@ class DummyRequest:
self.session = session or {}
self.registry = registry
self.callbacks = []
- self.cookies = DummyCookies(cookie)
-
- def add_response_callback(self, callback):
- self.callbacks.append(callback)
class DummyWhoPlugin:
@@ -1954,68 +1021,3 @@ class DummyCookieHelper:
def forget(self, *arg):
return []
-
-
-class DummyAuthTktModule(object):
- def __init__(
- self,
- timestamp=0,
- userid='userid',
- tokens=(),
- user_data='',
- parse_raise=False,
- hashalg="md5",
- ):
- self.timestamp = timestamp
- self.userid = userid
- self.tokens = tokens
- self.user_data = user_data
- self.parse_raise = parse_raise
- self.hashalg = hashalg
-
- def parse_ticket(secret, value, remote_addr, hashalg):
- self.secret = secret
- self.value = value
- self.remote_addr = remote_addr
- if self.parse_raise:
- raise self.BadTicket()
- return self.timestamp, self.userid, self.tokens, self.user_data
-
- self.parse_ticket = parse_ticket
-
- class AuthTicket(object):
- def __init__(self, secret, userid, remote_addr, **kw):
- self.secret = secret
- self.userid = userid
- self.remote_addr = remote_addr
- self.kw = kw
-
- def cookie_value(self):
- result = {
- 'secret': self.secret,
- 'userid': self.userid,
- 'remote_addr': self.remote_addr,
- }
- result.update(self.kw)
- tokens = result.pop('tokens', None)
- if tokens is not None:
- tokens = '|'.join(tokens)
- result['tokens'] = tokens
- items = sorted(result.items())
- new_items = []
- for k, v in items:
- if isinstance(v, bytes):
- v = text_(v)
- new_items.append((k, v))
- result = '/'.join(['%s=%s' % (k, v) for k, v in new_items])
- return result
-
- self.AuthTicket = AuthTicket
-
- class BadTicket(Exception):
- pass
-
-
-class DummyResponse:
- def __init__(self):
- self.headerlist = []
diff --git a/tests/test_config/test_init.py b/tests/test_config/test_init.py
index ce2b042ec..661654ef0 100644
--- a/tests/test_config/test_init.py
+++ b/tests/test_config/test_init.py
@@ -205,6 +205,15 @@ class ConfiguratorTests(unittest.TestCase):
result = config.registry.getUtility(IDebugLogger)
self.assertEqual(logger, result)
+ def test_ctor_security_policy(self):
+ from pyramid.interfaces import ISecurityPolicy
+
+ policy = object()
+ config = self._makeOne(security_policy=policy)
+ config.commit()
+ result = config.registry.getUtility(ISecurityPolicy)
+ self.assertEqual(policy, result)
+
def test_ctor_authentication_policy(self):
from pyramid.interfaces import IAuthenticationPolicy
diff --git a/tests/test_config/test_security.py b/tests/test_config/test_security.py
index 5ebd78f8d..f2b4ba8e5 100644
--- a/tests/test_config/test_security.py
+++ b/tests/test_config/test_security.py
@@ -11,6 +11,28 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase):
config = Configurator(*arg, **kw)
return config
+ def test_set_security_policy(self):
+ from pyramid.interfaces import ISecurityPolicy
+
+ config = self._makeOne()
+ policy = object()
+ config.set_security_policy(policy)
+ config.commit()
+ self.assertEqual(config.registry.getUtility(ISecurityPolicy), policy)
+
+ def test_set_authentication_policy_with_security_policy(self):
+ from pyramid.interfaces import IAuthorizationPolicy
+ from pyramid.interfaces import ISecurityPolicy
+
+ config = self._makeOne()
+ security_policy = object()
+ authn_policy = object()
+ authz_policy = object()
+ config.registry.registerUtility(security_policy, ISecurityPolicy)
+ config.registry.registerUtility(authz_policy, IAuthorizationPolicy)
+ config.set_authentication_policy(authn_policy)
+ self.assertRaises(ConfigurationError, config.commit)
+
def test_set_authentication_policy_no_authz_policy(self):
config = self._makeOne()
policy = object()
@@ -27,6 +49,8 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase):
def test_set_authentication_policy_with_authz_policy(self):
from pyramid.interfaces import IAuthenticationPolicy
from pyramid.interfaces import IAuthorizationPolicy
+ from pyramid.interfaces import ISecurityPolicy
+ from pyramid.security import LegacySecurityPolicy
config = self._makeOne()
authn_policy = object()
@@ -37,10 +61,15 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase):
self.assertEqual(
config.registry.getUtility(IAuthenticationPolicy), authn_policy
)
+ self.assertIsInstance(
+ config.registry.getUtility(ISecurityPolicy), LegacySecurityPolicy
+ )
def test_set_authentication_policy_with_authz_policy_autocommit(self):
from pyramid.interfaces import IAuthenticationPolicy
from pyramid.interfaces import IAuthorizationPolicy
+ from pyramid.interfaces import ISecurityPolicy
+ from pyramid.security import LegacySecurityPolicy
config = self._makeOne(autocommit=True)
authn_policy = object()
@@ -51,6 +80,9 @@ class ConfiguratorSecurityMethodsTests(unittest.TestCase):
self.assertEqual(
config.registry.getUtility(IAuthenticationPolicy), authn_policy
)
+ self.assertIsInstance(
+ config.registry.getUtility(ISecurityPolicy), LegacySecurityPolicy
+ )
def test_set_authorization_policy_no_authn_policy(self):
config = self._makeOne()
diff --git a/tests/test_config/test_testing.py b/tests/test_config/test_testing.py
index 0fb73d268..500aedeae 100644
--- a/tests/test_config/test_testing.py
+++ b/tests/test_config/test_testing.py
@@ -1,7 +1,7 @@
import unittest
from zope.interface import implementer
-from pyramid.security import AuthenticationAPIMixin, AuthorizationAPIMixin
+from pyramid.security import SecurityAPIMixin, AuthenticationAPIMixin
from pyramid.util import text_
from . import IDummy
@@ -17,28 +17,20 @@ class TestingConfiguratorMixinTests(unittest.TestCase):
from pyramid.testing import DummySecurityPolicy
config = self._makeOne(autocommit=True)
- config.testing_securitypolicy(
- 'user', ('group1', 'group2'), permissive=False
- )
- from pyramid.interfaces import IAuthenticationPolicy
- from pyramid.interfaces import IAuthorizationPolicy
+ config.testing_securitypolicy('user', permissive=False)
+ from pyramid.interfaces import ISecurityPolicy
- ut = config.registry.getUtility(IAuthenticationPolicy)
- self.assertTrue(isinstance(ut, DummySecurityPolicy))
- ut = config.registry.getUtility(IAuthorizationPolicy)
- self.assertEqual(ut.userid, 'user')
- self.assertEqual(ut.groupids, ('group1', 'group2'))
- self.assertEqual(ut.permissive, False)
+ policy = config.registry.getUtility(ISecurityPolicy)
+ self.assertTrue(isinstance(policy, DummySecurityPolicy))
+ self.assertEqual(policy.identity, 'user')
+ self.assertEqual(policy.permissive, False)
def test_testing_securitypolicy_remember_result(self):
from pyramid.security import remember
config = self._makeOne(autocommit=True)
pol = config.testing_securitypolicy(
- 'user',
- ('group1', 'group2'),
- permissive=False,
- remember_result=True,
+ 'user', permissive=False, remember_result=True
)
request = DummyRequest()
request.registry = config.registry
@@ -51,7 +43,7 @@ class TestingConfiguratorMixinTests(unittest.TestCase):
config = self._makeOne(autocommit=True)
pol = config.testing_securitypolicy(
- 'user', ('group1', 'group2'), permissive=False, forget_result=True
+ 'user', permissive=False, forget_result=True
)
request = DummyRequest()
request.registry = config.registry
@@ -232,7 +224,7 @@ class DummyEvent:
pass
-class DummyRequest(AuthenticationAPIMixin, AuthorizationAPIMixin):
+class DummyRequest(SecurityAPIMixin, AuthenticationAPIMixin):
def __init__(self, environ=None):
if environ is None:
environ = {}
diff --git a/tests/test_config/test_views.py b/tests/test_config/test_views.py
index 685b81a0f..28b7a9fb1 100644
--- a/tests/test_config/test_views.py
+++ b/tests/test_config/test_views.py
@@ -2059,22 +2059,19 @@ class TestViewsConfigurationMixin(unittest.TestCase):
outerself = self
class DummyPolicy(object):
- def effective_principals(self, r):
+ def identify(self, r):
outerself.assertEqual(r, request)
- return ['abc']
+ return 123
- def permits(self, context, principals, permission):
+ def permits(self, r, context, identity, permission):
+ outerself.assertEqual(r, request)
outerself.assertEqual(context, None)
- outerself.assertEqual(principals, ['abc'])
+ outerself.assertEqual(identity, 123)
outerself.assertEqual(permission, 'view')
return True
policy = DummyPolicy()
- config = self._makeOne(
- authorization_policy=policy,
- authentication_policy=policy,
- autocommit=True,
- )
+ config = self._makeOne(security_policy=policy, autocommit=True)
config.add_view(view=view1, permission='view', renderer=null_renderer)
view = self._getViewCallable(config)
request = self._makeRequest(config)
@@ -2087,22 +2084,20 @@ class TestViewsConfigurationMixin(unittest.TestCase):
outerself = self
class DummyPolicy(object):
- def effective_principals(self, r):
+ def identify(self, r):
outerself.assertEqual(r, request)
- return ['abc']
+ return 123
- def permits(self, context, principals, permission):
+ def permits(self, r, context, identity, permission):
+ outerself.assertEqual(r, request)
outerself.assertEqual(context, None)
- outerself.assertEqual(principals, ['abc'])
+ outerself.assertEqual(identity, 123)
outerself.assertEqual(permission, 'view')
return True
policy = DummyPolicy()
config = self._makeOne(
- authorization_policy=policy,
- authentication_policy=policy,
- default_permission='view',
- autocommit=True,
+ security_policy=policy, default_permission='view', autocommit=True
)
config.add_view(view=view1, renderer=null_renderer)
view = self._getViewCallable(config)
diff --git a/tests/test_integration.py b/tests/test_integration.py
index e6dccbb5b..72465dc93 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -581,10 +581,12 @@ class TestConflictApp(unittest.TestCase):
def test_overridden_authorization_policy(self):
config = self._makeConfig()
config.include(self.package)
- from pyramid.testing import DummySecurityPolicy
- config.set_authorization_policy(DummySecurityPolicy('fred'))
- config.set_authentication_policy(DummySecurityPolicy(permissive=True))
+ class DummySecurityPolicy:
+ def permits(self, context, principals, permission):
+ return True
+
+ config.set_authorization_policy(DummySecurityPolicy())
app = config.make_wsgi_app()
self.testapp = TestApp(app)
res = self.testapp.get('/protected', status=200)
diff --git a/tests/test_predicates.py b/tests/test_predicates.py
index a99651a8f..60e36047e 100644
--- a/tests/test_predicates.py
+++ b/tests/test_predicates.py
@@ -502,6 +502,22 @@ class Test_EffectivePrincipalsPredicate(unittest.TestCase):
return EffectivePrincipalsPredicate(val, config)
+ def _testing_authn_policy(self, userid, groupids=tuple()):
+ from pyramid.interfaces import IAuthenticationPolicy
+ from pyramid.security import Everyone, Authenticated
+
+ class DummyPolicy:
+ def effective_principals(self, request):
+ p = [Everyone]
+ if userid:
+ p.append(Authenticated)
+ p.append(userid)
+ p.extend(groupids)
+ return p
+
+ registry = self.config.registry
+ registry.registerUtility(DummyPolicy(), IAuthenticationPolicy)
+
def test_text(self):
inst = self._makeOne(('verna', 'fred'), None)
self.assertEqual(
@@ -526,7 +542,7 @@ class Test_EffectivePrincipalsPredicate(unittest.TestCase):
def test_it_call_authentication_policy_provides_superset(self):
request = testing.DummyRequest()
- self.config.testing_securitypolicy('fred', groupids=('verna', 'bambi'))
+ self._testing_authn_policy('fred', groupids=('verna', 'bambi'))
inst = self._makeOne(('verna', 'fred'), None)
context = Dummy()
self.assertTrue(inst(context, request))
@@ -535,14 +551,14 @@ class Test_EffectivePrincipalsPredicate(unittest.TestCase):
from pyramid.security import Authenticated
request = testing.DummyRequest()
- self.config.testing_securitypolicy('fred', groupids=('verna', 'bambi'))
+ self._testing_authn_policy('fred', groupids=('verna', 'bambi'))
inst = self._makeOne(Authenticated, None)
context = Dummy()
self.assertTrue(inst(context, request))
def test_it_call_authentication_policy_doesnt_provide_superset(self):
request = testing.DummyRequest()
- self.config.testing_securitypolicy('fred')
+ self._testing_authn_policy('fred')
inst = self._makeOne(('verna', 'fred'), None)
context = Dummy()
self.assertFalse(inst(context, request))
diff --git a/tests/test_request.py b/tests/test_request.py
index 484d86e01..1a10a8509 100644
--- a/tests/test_request.py
+++ b/tests/test_request.py
@@ -1,7 +1,7 @@
import unittest
from pyramid import testing
-from pyramid.security import AuthenticationAPIMixin, AuthorizationAPIMixin
+from pyramid.security import SecurityAPIMixin, AuthenticationAPIMixin
from pyramid.util import text_, bytes_
@@ -54,7 +54,7 @@ class TestRequest(unittest.TestCase):
self.assertEqual(cls.ResponseClass, Response)
def test_implements_security_apis(self):
- apis = (AuthenticationAPIMixin, AuthorizationAPIMixin)
+ apis = (SecurityAPIMixin, AuthenticationAPIMixin)
r = self._makeOne()
self.assertTrue(isinstance(r, apis))
diff --git a/tests/test_security.py b/tests/test_security.py
index 8b8028f61..b66632baa 100644
--- a/tests/test_security.py
+++ b/tests/test_security.py
@@ -1,6 +1,9 @@
import unittest
+import warnings
+from http.cookies import SimpleCookie
from pyramid import testing
+from pyramid.util import text_
class TestAllPermissionsList(unittest.TestCase):
@@ -187,32 +190,22 @@ class TestRemember(unittest.TestCase):
return remember(*arg, **kwarg)
- def test_no_authentication_policy(self):
+ def test_no_security_policy(self):
request = _makeRequest()
result = self._callFUT(request, 'me')
self.assertEqual(result, [])
- def test_with_authentication_policy(self):
+ def test_with_security_policy(self):
request = _makeRequest()
registry = request.registry
- _registerAuthenticationPolicy(registry, 'yo')
- result = self._callFUT(request, 'me')
- self.assertEqual(result, [('X-Pyramid-Test', 'me')])
-
- def test_with_authentication_policy_no_reg_on_request(self):
- from pyramid.threadlocal import get_current_registry
-
- registry = get_current_registry()
- request = _makeRequest()
- del request.registry
- _registerAuthenticationPolicy(registry, 'yo')
+ _registerSecurityPolicy(registry, 'yo')
result = self._callFUT(request, 'me')
self.assertEqual(result, [('X-Pyramid-Test', 'me')])
def test_with_missing_arg(self):
request = _makeRequest()
registry = request.registry
- _registerAuthenticationPolicy(registry, 'yo')
+ _registerSecurityPolicy(registry, 'yo')
self.assertRaises(TypeError, lambda: self._callFUT(request))
@@ -228,24 +221,14 @@ class TestForget(unittest.TestCase):
return forget(*arg)
- def test_no_authentication_policy(self):
+ def test_no_security_policy(self):
request = _makeRequest()
result = self._callFUT(request)
self.assertEqual(result, [])
- def test_with_authentication_policy(self):
+ def test_with_security_policy(self):
request = _makeRequest()
- _registerAuthenticationPolicy(request.registry, 'yo')
- result = self._callFUT(request)
- self.assertEqual(result, [('X-Pyramid-Test', 'logout')])
-
- def test_with_authentication_policy_no_reg_on_request(self):
- from pyramid.threadlocal import get_current_registry
-
- registry = get_current_registry()
- request = _makeRequest()
- del request.registry
- _registerAuthenticationPolicy(registry, 'yo')
+ _registerSecurityPolicy(request.registry, 'yo')
result = self._callFUT(request)
self.assertEqual(result, [('X-Pyramid-Test', 'logout')])
@@ -338,6 +321,23 @@ class TestViewExecutionPermitted(unittest.TestCase):
self.assertTrue(result)
+class TestIdentity(unittest.TestCase):
+ def setUp(self):
+ testing.setUp()
+
+ def tearDown(self):
+ testing.tearDown()
+
+ def test_identity_no_security_policy(self):
+ request = _makeRequest()
+ self.assertEquals(request.identity, None)
+
+ def test_identity(self):
+ request = _makeRequest()
+ _registerSecurityPolicy(request.registry, 'yo')
+ self.assertEqual(request.identity, 'yo')
+
+
class TestAuthenticatedUserId(unittest.TestCase):
def setUp(self):
testing.setUp()
@@ -352,6 +352,12 @@ class TestAuthenticatedUserId(unittest.TestCase):
def test_with_authentication_policy(self):
request = _makeRequest()
_registerAuthenticationPolicy(request.registry, 'yo')
+ _registerSecurityPolicy(request.registry, 'wat')
+ self.assertEqual(request.authenticated_userid, 'yo')
+
+ def test_with_security_policy(self):
+ request = _makeRequest()
+ _registerSecurityPolicy(request.registry, 'yo')
self.assertEqual(request.authenticated_userid, 'yo')
def test_with_authentication_policy_no_reg_on_request(self):
@@ -378,6 +384,12 @@ class TestUnAuthenticatedUserId(unittest.TestCase):
def test_with_authentication_policy(self):
request = _makeRequest()
_registerAuthenticationPolicy(request.registry, 'yo')
+ _registerSecurityPolicy(request.registry, 'wat')
+ self.assertEqual(request.unauthenticated_userid, 'yo')
+
+ def test_with_security_policy(self):
+ request = _makeRequest()
+ _registerSecurityPolicy(request.registry, 'yo')
self.assertEqual(request.unauthenticated_userid, 'yo')
def test_with_authentication_policy_no_reg_on_request(self):
@@ -426,43 +438,25 @@ class TestHasPermission(unittest.TestCase):
testing.tearDown()
def _makeOne(self):
- from pyramid.security import AuthorizationAPIMixin
+ from pyramid.security import SecurityAPIMixin
from pyramid.registry import Registry
- mixin = AuthorizationAPIMixin()
+ mixin = SecurityAPIMixin()
mixin.registry = Registry()
mixin.context = object()
return mixin
- def test_no_authentication_policy(self):
+ def test_no_security_policy(self):
request = self._makeOne()
result = request.has_permission('view')
self.assertTrue(result)
- self.assertEqual(result.msg, 'No authentication policy in use.')
-
- def test_with_no_authorization_policy(self):
- request = self._makeOne()
- _registerAuthenticationPolicy(request.registry, None)
- self.assertRaises(
- ValueError, request.has_permission, 'view', context=None
- )
+ self.assertEqual(result.msg, 'No security policy in use.')
- def test_with_authn_and_authz_policies_registered(self):
+ def test_with_security_registered(self):
request = self._makeOne()
- _registerAuthenticationPolicy(request.registry, None)
- _registerAuthorizationPolicy(request.registry, 'yo')
+ _registerSecurityPolicy(request.registry, 'yo')
self.assertEqual(request.has_permission('view', context=None), 'yo')
- def test_with_no_reg_on_request(self):
- from pyramid.threadlocal import get_current_registry
-
- registry = get_current_registry()
- request = self._makeOne()
- del request.registry
- _registerAuthenticationPolicy(registry, None)
- _registerAuthorizationPolicy(registry, 'yo')
- self.assertEqual(request.has_permission('view'), 'yo')
-
def test_with_no_context_passed(self):
request = self._makeOne()
self.assertTrue(request.has_permission('view'))
@@ -473,6 +467,58 @@ class TestHasPermission(unittest.TestCase):
self.assertRaises(AttributeError, request.has_permission, 'view')
+class TestLegacySecurityPolicy(unittest.TestCase):
+ def setUp(self):
+ testing.setUp()
+
+ def tearDown(self):
+ testing.tearDown()
+
+ def test_identity(self):
+ from pyramid.security import LegacySecurityPolicy
+
+ request = _makeRequest()
+ policy = LegacySecurityPolicy()
+ _registerAuthenticationPolicy(request.registry, 'userid')
+
+ self.assertEqual(policy.identify(request), 'userid')
+
+ def test_remember(self):
+ from pyramid.security import LegacySecurityPolicy
+
+ request = _makeRequest()
+ policy = LegacySecurityPolicy()
+ _registerAuthenticationPolicy(request.registry, None)
+
+ self.assertEqual(
+ policy.remember(request, 'userid'), [('X-Pyramid-Test', 'userid')]
+ )
+
+ def test_forget(self):
+ from pyramid.security import LegacySecurityPolicy
+
+ request = _makeRequest()
+ policy = LegacySecurityPolicy()
+ _registerAuthenticationPolicy(request.registry, None)
+
+ self.assertEqual(
+ policy.forget(request), [('X-Pyramid-Test', 'logout')]
+ )
+
+ def test_permits(self):
+ from pyramid.security import LegacySecurityPolicy
+
+ request = _makeRequest()
+ policy = LegacySecurityPolicy()
+ _registerAuthenticationPolicy(request.registry, ['p1', 'p2'])
+ _registerAuthorizationPolicy(request.registry, True)
+
+ self.assertIs(
+ policy.permits(request, request.context, 'userid', 'permission'),
+ True,
+ )
+
+
_TEST_HEADER = 'X-Pyramid-Test'
@@ -481,6 +527,27 @@ class DummyContext:
self.__dict__.update(kw)
+class DummySecurityPolicy:
+ def __init__(self, result):
+ self.result = result
+
+ def identify(self, request):
+ return self.result
+
+ def permits(self, request, context, identity, permission):
+ return self.result
+
+ def remember(self, request, userid, **kw):
+ headers = [(_TEST_HEADER, userid)]
+ self._header_remembered = headers[0]
+ return headers
+
+ def forget(self, request):
+ headers = [(_TEST_HEADER, 'logout')]
+ self._header_forgotten = headers[0]
+ return headers
+
+
class DummyAuthenticationPolicy:
def __init__(self, result):
self.result = result
@@ -516,6 +583,14 @@ class DummyAuthorizationPolicy:
return self.result
+def _registerSecurityPolicy(reg, result):
+ from pyramid.interfaces import ISecurityPolicy
+
+ policy = DummySecurityPolicy(result)
+ reg.registerUtility(policy, ISecurityPolicy)
+ return policy
+
+
def _registerAuthenticationPolicy(reg, result):
from pyramid.interfaces import IAuthenticationPolicy
@@ -539,3 +614,1333 @@ def _makeRequest():
request.registry = Registry()
request.context = object()
return request
+
+
+class TestACLHelper(unittest.TestCase):
+ def test_no_acl(self):
+ from pyramid.security import ACLHelper
+
+ context = DummyContext()
+ helper = ACLHelper()
+ result = helper.permits(context, ['foo'], 'permission')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, '<default deny>')
+ self.assertEqual(
+ result.acl, '<No ACL found on any object in resource lineage>'
+ )
+ self.assertEqual(result.permission, 'permission')
+ self.assertEqual(result.principals, ['foo'])
+ self.assertEqual(result.context, context)
+
+ def test_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Deny
+ from pyramid.security import Allow
+ from pyramid.security import Everyone
+ from pyramid.security import Authenticated
+ from pyramid.security import ALL_PERMISSIONS
+ from pyramid.security import DENY_ALL
+
+ helper = ACLHelper()
+ root = DummyContext()
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [(Allow, Authenticated, VIEW)]
+ community.__acl__ = [
+ (Allow, 'fred', ALL_PERMISSIONS),
+ (Allow, 'wilma', VIEW),
+ DENY_ALL,
+ ]
+ blog.__acl__ = [
+ (Allow, 'barney', MEMBER_PERMS),
+ (Allow, 'wilma', VIEW),
+ ]
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'wilma'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'wilma', VIEW))
+ self.assertEqual(result.acl, blog.__acl__)
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'wilma'], 'delete'
+ )
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'fred'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'fred'], 'doesntevenexistyet'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'barney'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS))
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'barney'], 'administer'
+ )
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(
+ root, [Everyone, Authenticated, 'someguy'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, (Allow, Authenticated, VIEW))
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'someguy'], 'view'
+ )
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(root, [Everyone], 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, '<default deny>')
+ self.assertEqual(result.acl, root.__acl__)
+
+ context = DummyContext()
+ result = helper.permits(context, [Everyone], 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, '<default deny>')
+ self.assertEqual(
+ result.acl, '<No ACL found on any object in resource lineage>'
+ )
+
+ def test_string_permissions_in_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+
+ helper = ACLHelper()
+ root = DummyContext()
+ root.__acl__ = [(Allow, 'wilma', 'view_stuff')]
+
+ result = helper.permits(root, ['wilma'], 'view')
+ # would be True if matching against 'view_stuff' instead of against
+ # ['view_stuff']
+ self.assertEqual(result, False)
+
+ def test_callable_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+
+ helper = ACLHelper()
+ context = DummyContext()
+ fn = lambda self: [(Allow, 'bob', 'read')]
+ context.__acl__ = fn.__get__(context, context.__class__)
+ result = helper.permits(context, ['bob'], 'read')
+ self.assertTrue(result)
+
+ def test_principals_allowed_by_permission_direct(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+ from pyramid.security import DENY_ALL
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [
+ (Allow, 'chrism', ('read', 'write')),
+ DENY_ALL,
+ (Allow, 'other', 'read'),
+ ]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, ['chrism'])
+
+ def test_principals_allowed_by_permission_callable_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+ from pyramid.security import DENY_ALL
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = lambda: [
+ (Allow, 'chrism', ('read', 'write')),
+ DENY_ALL,
+ (Allow, 'other', 'read'),
+ ]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, ['chrism'])
+
+ def test_principals_allowed_by_permission_string_permission(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [(Allow, 'chrism', 'read_it')]
+ context.__acl__ = acl
+ result = helper.principals_allowed_by_permission(context, 'read')
+ # would be ['chrism'] if 'read' were compared against 'read_it' instead
+ # of against ['read_it']
+ self.assertEqual(list(result), [])
+
+ def test_principals_allowed_by_permission(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+ from pyramid.security import Deny
+ from pyramid.security import DENY_ALL
+ from pyramid.security import ALL_PERMISSIONS
+
+ helper = ACLHelper()
+ root = DummyContext(__name__='', __parent__=None)
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [
+ (Allow, 'chrism', ('read', 'write')),
+ (Allow, 'other', ('read',)),
+ (Allow, 'jim', ALL_PERMISSIONS),
+ ]
+ community.__acl__ = [
+ (Deny, 'flooz', 'read'),
+ (Allow, 'flooz', 'read'),
+ (Allow, 'mork', 'read'),
+ (Deny, 'jim', 'read'),
+ (Allow, 'someguy', 'manage'),
+ ]
+ blog.__acl__ = [(Allow, 'fred', 'read'), DENY_ALL]
+
+ result = sorted(helper.principals_allowed_by_permission(blog, 'read'))
+ self.assertEqual(result, ['fred'])
+ result = sorted(
+ helper.principals_allowed_by_permission(community, 'read')
+ )
+ self.assertEqual(result, ['chrism', 'mork', 'other'])
+ result = sorted(
+ helper.principals_allowed_by_permission(community, 'read')
+ )
+ result = sorted(helper.principals_allowed_by_permission(root, 'read'))
+ self.assertEqual(result, ['chrism', 'jim', 'other'])
+
+ def test_principals_allowed_by_permission_no_acls(self):
+ from pyramid.security import ACLHelper
+
+ helper = ACLHelper()
+ context = DummyContext()
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, [])
+
+ def test_principals_allowed_by_permission_deny_not_permission_in_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Deny
+ from pyramid.security import Everyone
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [(Deny, Everyone, 'write')]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, [])
+
+ def test_principals_allowed_by_permission_deny_permission_in_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Deny
+ from pyramid.security import Everyone
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [(Deny, Everyone, 'read')]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, [])
+
+
+VIEW = 'view'
+EDIT = 'edit'
+CREATE = 'create'
+DELETE = 'delete'
+MODERATE = 'moderate'
+ADMINISTER = 'administer'
+COMMENT = 'comment'
+
+GUEST_PERMS = (VIEW, COMMENT)
+MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE)
+MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,)
+ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,)
+
+
+class TestSessionAuthenticationHelper(unittest.TestCase):
+ def _makeRequest(self, session=None):
+ from types import SimpleNamespace
+
+ if session is None:
+ session = dict()
+ return SimpleNamespace(session=session)
+
+ def _makeOne(self, prefix=''):
+ from pyramid.security import SessionAuthenticationHelper
+
+ return SessionAuthenticationHelper(prefix=prefix)
+
+ def test_identify(self):
+ request = self._makeRequest({'userid': 'fred'})
+ helper = self._makeOne()
+ self.assertEqual(helper.identify(request), 'fred')
+
+ def test_identify_with_prefix(self):
+ request = self._makeRequest({'foo.userid': 'fred'})
+ helper = self._makeOne(prefix='foo.')
+ self.assertEqual(helper.identify(request), 'fred')
+
+ def test_identify_none(self):
+ request = self._makeRequest()
+ helper = self._makeOne()
+ self.assertEqual(helper.identify(request), None)
+
+ def test_remember(self):
+ request = self._makeRequest()
+ helper = self._makeOne()
+ result = helper.remember(request, 'fred')
+ self.assertEqual(request.session.get('userid'), 'fred')
+ self.assertEqual(result, [])
+
+ def test_forget(self):
+ request = self._makeRequest({'userid': 'fred'})
+ helper = self._makeOne()
+ result = helper.forget(request)
+ self.assertEqual(request.session.get('userid'), None)
+ self.assertEqual(result, [])
+
+ def test_forget_no_identity(self):
+ request = self._makeRequest()
+ helper = self._makeOne()
+ result = helper.forget(request)
+ self.assertEqual(request.session.get('userid'), None)
+ self.assertEqual(result, [])
+
+
+class TestAuthTicket(unittest.TestCase):
+ def _makeOne(self, *arg, **kw):
+ from pyramid.security import AuthTicket
+
+ return AuthTicket(*arg, **kw)
+
+ def test_ctor_with_tokens(self):
+ ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b'))
+ self.assertEqual(ticket.tokens, 'a,b')
+
+ def test_ctor_with_time(self):
+ ticket = self._makeOne('secret', 'userid', 'ip', time='time')
+ self.assertEqual(ticket.time, 'time')
+
+ def test_digest(self):
+ ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10)
+ result = ticket.digest()
+ self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c')
+
+ def test_digest_sha512(self):
+ ticket = self._makeOne(
+ 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512'
+ )
+ result = ticket.digest()
+ self.assertEqual(
+ result,
+ '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49'
+ '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9'
+ 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278',
+ )
+
+ def test_cookie_value(self):
+ ticket = self._makeOne(
+ 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b')
+ )
+ result = ticket.cookie_value()
+ self.assertEqual(
+ result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!'
+ )
+
+ def test_ipv4(self):
+ ticket = self._makeOne(
+ 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256'
+ )
+ result = ticket.cookie_value()
+ self.assertEqual(
+ result,
+ 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b'
+ '798400ecdade8d76c530000000auserid!',
+ )
+
+ def test_ipv6(self):
+ ticket = self._makeOne(
+ 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256'
+ )
+ result = ticket.cookie_value()
+ self.assertEqual(
+ result,
+ 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8'
+ '5becf8760cd7a2fa4910000000auserid!',
+ )
+
+
+class TestBadTicket(unittest.TestCase):
+ def _makeOne(self, msg, expected=None):
+ from pyramid.security import BadTicket
+
+ return BadTicket(msg, expected)
+
+ def test_it(self):
+ exc = self._makeOne('msg', expected=True)
+ self.assertEqual(exc.expected, True)
+ self.assertTrue(isinstance(exc, Exception))
+
+
+class Test_parse_ticket(unittest.TestCase):
+ def _callFUT(self, secret, ticket, ip, hashalg='md5'):
+ from pyramid.security import parse_ticket
+
+ return parse_ticket(secret, ticket, ip, hashalg)
+
+ def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'):
+ from pyramid.security import BadTicket
+
+ self.assertRaises(
+ BadTicket, self._callFUT, secret, ticket, ip, hashalg
+ )
+
+ def test_bad_timestamp(self):
+ ticket = 'x' * 64
+ self._assertRaisesBadTicket('secret', ticket, 'ip')
+
+ def test_bad_userid_or_data(self):
+ ticket = 'x' * 32 + '11111111' + 'x' * 10
+ self._assertRaisesBadTicket('secret', ticket, 'ip')
+
+ def test_digest_sig_incorrect(self):
+ ticket = 'x' * 32 + '11111111' + 'a!b!c'
+ self._assertRaisesBadTicket('secret', ticket, '0.0.0.0')
+
+ def test_correct_with_user_data(self):
+ ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!')
+ result = self._callFUT('secret', ticket, '0.0.0.0')
+ self.assertEqual(result, (10, 'userid', ['a', 'b'], ''))
+
+ def test_correct_with_user_data_sha512(self):
+ ticket = text_(
+ '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1'
+ '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767'
+ 'ba8a26d02aaeae56599a0000000auserid!a,b!'
+ )
+ result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512')
+ self.assertEqual(result, (10, 'userid', ['a', 'b'], ''))
+
+ def test_ipv4(self):
+ ticket = text_(
+ 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd'
+ 'ade8d76c530000000auserid!'
+ )
+ result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256')
+ self.assertEqual(result, (10, 'userid', [''], ''))
+
+ def test_ipv6(self):
+ ticket = text_(
+ 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760'
+ 'cd7a2fa4910000000auserid!'
+ )
+ result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256')
+ self.assertEqual(result, (10, 'userid', [''], ''))
+
+
+class TestAuthTktCookieHelper(unittest.TestCase):
+ def _getTargetClass(self):
+ from pyramid.security import AuthTktCookieHelper
+
+ return AuthTktCookieHelper
+
+ def _makeOne(self, *arg, **kw):
+ helper = self._getTargetClass()(*arg, **kw)
+ auth_tkt = DummyAuthTktModule()
+ helper.auth_tkt = auth_tkt
+ helper.AuthTicket = auth_tkt.AuthTicket
+ helper.parse_ticket = auth_tkt.parse_ticket
+ helper.BadTicket = auth_tkt.BadTicket
+ return helper
+
+ def _makeRequest(self, cookie=None, ipv6=False):
+ environ = {'wsgi.version': (1, 0)}
+
+ if ipv6 is False:
+ environ['REMOTE_ADDR'] = '1.1.1.1'
+ else:
+ environ['REMOTE_ADDR'] = '::1'
+ environ['SERVER_NAME'] = 'localhost'
+ return DummyRequest(environ, cookie=cookie)
+
+ def _cookieValue(self, cookie):
+ items = cookie.value.split('/')
+ D = {}
+ for item in items:
+ k, v = item.split('=', 1)
+ D[k] = v
+ return D
+
+ def _parseHeaders(self, headers):
+ return [self._parseHeader(header) for header in headers]
+
+ def _parseHeader(self, header):
+ cookie = self._parseCookie(header[1])
+ return cookie
+
+ def _parseCookie(self, cookie):
+ cookies = SimpleCookie()
+ cookies.load(cookie)
+ return cookies.get('auth_tkt')
+
+ def test_init_cookie_str_reissue_invalid(self):
+ self.assertRaises(
+ ValueError, self._makeOne, 'secret', reissue_time='invalid value'
+ )
+
+ def test_init_cookie_str_timeout_invalid(self):
+ self.assertRaises(
+ ValueError, self._makeOne, 'secret', timeout='invalid value'
+ )
+
+ def test_init_cookie_str_max_age_invalid(self):
+ self.assertRaises(
+ ValueError, self._makeOne, 'secret', max_age='invalid value'
+ )
+
+ def test_identify_nocookie(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.identify(request)
+ self.assertEqual(result, None)
+
+ def test_identify_cookie_value_is_None(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest(None)
+ result = helper.identify(request)
+ self.assertEqual(result, None)
+
+ def test_identify_good_cookie_include_ip(self):
+ helper = self._makeOne('secret', include_ip=True)
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['userdata'], '')
+ self.assertEqual(result['timestamp'], 0)
+ self.assertEqual(helper.auth_tkt.value, 'ticket')
+ self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1')
+ self.assertEqual(helper.auth_tkt.secret, 'secret')
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], '')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_good_cookie_include_ipv6(self):
+ helper = self._makeOne('secret', include_ip=True)
+ request = self._makeRequest('ticket', ipv6=True)
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['userdata'], '')
+ self.assertEqual(result['timestamp'], 0)
+ self.assertEqual(helper.auth_tkt.value, 'ticket')
+ self.assertEqual(helper.auth_tkt.remote_addr, '::1')
+ self.assertEqual(helper.auth_tkt.secret, 'secret')
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], '')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_good_cookie_dont_include_ip(self):
+ helper = self._makeOne('secret', include_ip=False)
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 'userid')
+ self.assertEqual(result['userdata'], '')
+ self.assertEqual(result['timestamp'], 0)
+ self.assertEqual(helper.auth_tkt.value, 'ticket')
+ self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0')
+ self.assertEqual(helper.auth_tkt.secret, 'secret')
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], '')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_good_cookie_int_useridtype(self):
+ helper = self._makeOne('secret', include_ip=False)
+ helper.auth_tkt.userid = '1'
+ helper.auth_tkt.user_data = 'userid_type:int'
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 1)
+ self.assertEqual(result['userdata'], 'userid_type:int')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_nonuseridtype_user_data(self):
+ helper = self._makeOne('secret', include_ip=False)
+ helper.auth_tkt.userid = '1'
+ helper.auth_tkt.user_data = 'bogus:int'
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], '1')
+ self.assertEqual(result['userdata'], 'bogus:int')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_good_cookie_unknown_useridtype(self):
+ helper = self._makeOne('secret', include_ip=False)
+ helper.auth_tkt.userid = 'abc'
+ helper.auth_tkt.user_data = 'userid_type:unknown'
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], 'abc')
+ self.assertEqual(result['userdata'], 'userid_type:unknown')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_good_cookie_b64str_useridtype(self):
+ from base64 import b64encode
+
+ helper = self._makeOne('secret', include_ip=False)
+ helper.auth_tkt.userid = b64encode(b'encoded').strip()
+ helper.auth_tkt.user_data = 'userid_type:b64str'
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], b'encoded')
+ self.assertEqual(result['userdata'], 'userid_type:b64str')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_good_cookie_b64unicode_useridtype(self):
+ from base64 import b64encode
+
+ helper = self._makeOne('secret', include_ip=False)
+ helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip()
+ helper.auth_tkt.user_data = 'userid_type:b64unicode'
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(len(result), 4)
+ self.assertEqual(result['tokens'], ())
+ self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8'))
+ self.assertEqual(result['userdata'], 'userid_type:b64unicode')
+ self.assertEqual(result['timestamp'], 0)
+ environ = request.environ
+ self.assertEqual(environ['REMOTE_USER_TOKENS'], ())
+ self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode')
+ self.assertEqual(environ['AUTH_TYPE'], 'cookie')
+
+ def test_identify_bad_cookie(self):
+ helper = self._makeOne('secret', include_ip=True)
+ helper.auth_tkt.parse_raise = True
+ request = self._makeRequest('ticket')
+ result = helper.identify(request)
+ self.assertEqual(result, None)
+
+ def test_identify_cookie_timeout(self):
+ helper = self._makeOne('secret', timeout=1)
+ self.assertEqual(helper.timeout, 1)
+
+ def test_identify_cookie_str_timeout(self):
+ helper = self._makeOne('secret', timeout='1')
+ self.assertEqual(helper.timeout, 1)
+
+ def test_identify_cookie_timeout_aged(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10)
+ now = time.time()
+ helper.auth_tkt.timestamp = now - 1
+ helper.now = now + 10
+ helper.auth_tkt.tokens = (text_('a'),)
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertFalse(result)
+
+ def test_identify_cookie_reissue(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time=0)
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ helper.auth_tkt.tokens = (text_('a'),)
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ response = DummyResponse()
+ request.callbacks[0](request, response)
+ self.assertEqual(len(response.headerlist), 3)
+ self.assertEqual(response.headerlist[0][0], 'Set-Cookie')
+
+ def test_identify_cookie_str_reissue(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time='0')
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ helper.auth_tkt.tokens = (text_('a'),)
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ response = DummyResponse()
+ request.callbacks[0](request, response)
+ self.assertEqual(len(response.headerlist), 3)
+ self.assertEqual(response.headerlist[0][0], 'Set-Cookie')
+
+ def test_identify_cookie_reissue_already_reissued_this_request(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time=0)
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ request = self._makeRequest('bogus')
+ request._authtkt_reissued = True
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 0)
+
+ def test_identify_cookie_reissue_notyet(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time=10)
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 0)
+
+ def test_identify_cookie_reissue_revoked_by_forget(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time=0)
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ result = helper.forget(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ response = DummyResponse()
+ request.callbacks[0](request, response)
+ self.assertEqual(len(response.headerlist), 0)
+
+ def test_identify_cookie_reissue_revoked_by_remember(self):
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time=0)
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ result = helper.remember(request, 'bob')
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ response = DummyResponse()
+ request.callbacks[0](request, response)
+ self.assertEqual(len(response.headerlist), 0)
+
+ def test_identify_cookie_reissue_with_tokens_default(self):
+ # see https://github.com/Pylons/pyramid/issues#issue/108
+ import time
+
+ helper = self._makeOne('secret', timeout=10, reissue_time=0)
+ auth_tkt = DummyAuthTktModule(tokens=[''])
+ helper.auth_tkt = auth_tkt
+ helper.AuthTicket = auth_tkt.AuthTicket
+ helper.parse_ticket = auth_tkt.parse_ticket
+ helper.BadTicket = auth_tkt.BadTicket
+ now = time.time()
+ helper.auth_tkt.timestamp = now
+ helper.now = now + 1
+ request = self._makeRequest('bogus')
+ result = helper.identify(request)
+ self.assertTrue(result)
+ self.assertEqual(len(request.callbacks), 1)
+ response = DummyResponse()
+ request.callbacks[0](None, response)
+ self.assertEqual(len(response.headerlist), 3)
+ self.assertEqual(response.headerlist[0][0], 'Set-Cookie')
+ self.assertTrue("/tokens=/" in response.headerlist[0][1])
+
+ def test_remember(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax'))
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue(
+ result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax')
+ )
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue(
+ result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax')
+ )
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_nondefault_samesite(self):
+ helper = self._makeOne('secret', samesite='Strict')
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict'))
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue(
+ result[1][1].endswith(
+ '; Domain=localhost; Path=/; SameSite=Strict'
+ )
+ )
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue(
+ result[2][1].endswith(
+ '; Domain=.localhost; Path=/; SameSite=Strict'
+ )
+ )
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_None_samesite(self):
+ helper = self._makeOne('secret', samesite=None)
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/'))
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/'))
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_include_ip(self):
+ helper = self._makeOne('secret', include_ip=True)
+ request = self._makeRequest()
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax'))
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue(
+ result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax')
+ )
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue(
+ result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax')
+ )
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_path(self):
+ helper = self._makeOne(
+ 'secret', include_ip=True, path="/cgi-bin/app.cgi/"
+ )
+ request = self._makeRequest()
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(
+ result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax')
+ )
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue(
+ result[1][1].endswith(
+ '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax'
+ )
+ )
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue(
+ result[2][1].endswith(
+ '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax'
+ )
+ )
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_http_only(self):
+ helper = self._makeOne('secret', include_ip=True, http_only=True)
+ request = self._makeRequest()
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax'))
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue('; HttpOnly' in result[1][1])
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue('; HttpOnly' in result[2][1])
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_secure(self):
+ helper = self._makeOne('secret', include_ip=True, secure=True)
+ request = self._makeRequest()
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue('; secure' in result[0][1])
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue('; secure' in result[1][1])
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue('; secure' in result[2][1])
+ self.assertTrue(result[2][1].startswith('auth_tkt='))
+
+ def test_remember_wild_domain_disabled(self):
+ helper = self._makeOne('secret', wild_domain=False)
+ request = self._makeRequest()
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 2)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax'))
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue(
+ result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax')
+ )
+ self.assertTrue(result[1][1].startswith('auth_tkt='))
+
+ def test_remember_parent_domain(self):
+ helper = self._makeOne('secret', parent_domain=True)
+ request = self._makeRequest()
+ request.domain = 'www.example.com'
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 1)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(
+ result[0][1].endswith(
+ '; Domain=.example.com; Path=/; SameSite=Lax'
+ )
+ )
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ def test_remember_parent_domain_supercedes_wild_domain(self):
+ helper = self._makeOne('secret', parent_domain=True, wild_domain=True)
+ request = self._makeRequest()
+ request.domain = 'www.example.com'
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 1)
+ self.assertTrue(
+ result[0][1].endswith(
+ '; Domain=.example.com; Path=/; SameSite=Lax'
+ )
+ )
+
+ def test_remember_explicit_domain(self):
+ helper = self._makeOne('secret', domain='pyramid.bazinga')
+ request = self._makeRequest()
+ request.domain = 'www.example.com'
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 1)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue(
+ result[0][1].endswith(
+ '; Domain=pyramid.bazinga; Path=/; SameSite=Lax'
+ )
+ )
+ self.assertTrue(result[0][1].startswith('auth_tkt='))
+
+ def test_remember_domain_supercedes_parent_and_wild_domain(self):
+ helper = self._makeOne(
+ 'secret',
+ domain='pyramid.bazinga',
+ parent_domain=True,
+ wild_domain=True,
+ )
+ request = self._makeRequest()
+ request.domain = 'www.example.com'
+ result = helper.remember(request, 'other')
+ self.assertEqual(len(result), 1)
+ self.assertTrue(
+ result[0][1].endswith(
+ '; Domain=pyramid.bazinga; Path=/; SameSite=Lax'
+ )
+ )
+
+ def test_remember_binary_userid(self):
+ import base64
+
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, b'userid')
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+ val = self._cookieValue(values[0])
+ self.assertEqual(
+ val['userid'], text_(base64.b64encode(b'userid').strip())
+ )
+ self.assertEqual(val['user_data'], 'userid_type:b64str')
+
+ def test_remember_int_userid(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, 1)
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+ val = self._cookieValue(values[0])
+ self.assertEqual(val['userid'], '1')
+ self.assertEqual(val['user_data'], 'userid_type:int')
+
+ def test_remember_unicode_userid(self):
+ import base64
+
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ userid = text_(b'\xc2\xa9', 'utf-8')
+ result = helper.remember(request, userid)
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+ val = self._cookieValue(values[0])
+ self.assertEqual(
+ val['userid'], text_(base64.b64encode(userid.encode('utf-8')))
+ )
+ self.assertEqual(val['user_data'], 'userid_type:b64unicode')
+
+ def test_remember_insane_userid(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ userid = object()
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always', RuntimeWarning)
+ result = helper.remember(request, userid)
+ self.assertTrue(str(w[-1].message).startswith('userid is of type'))
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+ value = values[0]
+ self.assertTrue('userid' in value.value)
+
+ def test_remember_max_age(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid', max_age=500)
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(values[0]['max-age'], '500')
+ self.assertTrue(values[0]['expires'])
+
+ def test_remember_str_max_age(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid', max_age='500')
+ values = self._parseHeaders(result)
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(values[0]['max-age'], '500')
+ self.assertTrue(values[0]['expires'])
+
+ def test_remember_str_max_age_invalid(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ self.assertRaises(
+ ValueError,
+ helper.remember,
+ request,
+ 'userid',
+ max_age='invalid value',
+ )
+
+ def test_remember_tokens(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, 'other', tokens=('foo', 'bar'))
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ self.assertTrue("/tokens=foo|bar/" in result[0][1])
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ self.assertTrue("/tokens=foo|bar/" in result[1][1])
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ self.assertTrue("/tokens=foo|bar/" in result[2][1])
+
+ def test_remember_samesite_nondefault(self):
+ helper = self._makeOne('secret', samesite='Strict')
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ cookieval = result[0][1]
+ self.assertTrue(
+ 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')],
+ cookieval,
+ )
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ cookieval = result[1][1]
+ self.assertTrue(
+ 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')],
+ cookieval,
+ )
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ cookieval = result[2][1]
+ self.assertTrue(
+ 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')],
+ cookieval,
+ )
+
+ def test_remember_samesite_default(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ result = helper.remember(request, 'userid')
+ self.assertEqual(len(result), 3)
+
+ self.assertEqual(result[0][0], 'Set-Cookie')
+ cookieval = result[0][1]
+ self.assertTrue(
+ 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')],
+ cookieval,
+ )
+
+ self.assertEqual(result[1][0], 'Set-Cookie')
+ cookieval = result[1][1]
+ self.assertTrue(
+ 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')],
+ cookieval,
+ )
+
+ self.assertEqual(result[2][0], 'Set-Cookie')
+ cookieval = result[2][1]
+ self.assertTrue(
+ 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')],
+ cookieval,
+ )
+
+ def test_remember_unicode_but_ascii_token(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ la = text_(b'foo', 'utf-8')
+ result = helper.remember(request, 'other', tokens=(la,))
+ # tokens must be str type on both Python 2 and 3
+ self.assertTrue("/tokens=foo/" in result[0][1])
+
+ def test_remember_nonascii_token(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ la = text_(b'La Pe\xc3\xb1a', 'utf-8')
+ self.assertRaises(
+ ValueError, helper.remember, request, 'other', tokens=(la,)
+ )
+
+ def test_remember_invalid_token_format(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ self.assertRaises(
+ ValueError, helper.remember, request, 'other', tokens=('foo bar',)
+ )
+ self.assertRaises(
+ ValueError, helper.remember, request, 'other', tokens=('1bar',)
+ )
+
+ def test_forget(self):
+ helper = self._makeOne('secret')
+ request = self._makeRequest()
+ headers = helper.forget(request)
+ self.assertEqual(len(headers), 3)
+ name, value = headers[0]
+ self.assertEqual(name, 'Set-Cookie')
+ self.assertEqual(
+ value,
+ 'auth_tkt=; Max-Age=0; Path=/; '
+ 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax',
+ )
+ name, value = headers[1]
+ self.assertEqual(name, 'Set-Cookie')
+ self.assertEqual(
+ value,
+ 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; '
+ 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax',
+ )
+ name, value = headers[2]
+ self.assertEqual(name, 'Set-Cookie')
+ self.assertEqual(
+ value,
+ 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; '
+ 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax',
+ )
+
+
+class DummyAuthTktModule(object):
+ def __init__(
+ self,
+ timestamp=0,
+ userid='userid',
+ tokens=(),
+ user_data='',
+ parse_raise=False,
+ hashalg="md5",
+ ):
+ self.timestamp = timestamp
+ self.userid = userid
+ self.tokens = tokens
+ self.user_data = user_data
+ self.parse_raise = parse_raise
+ self.hashalg = hashalg
+
+ def parse_ticket(secret, value, remote_addr, hashalg):
+ self.secret = secret
+ self.value = value
+ self.remote_addr = remote_addr
+ if self.parse_raise:
+ raise self.BadTicket()
+ return self.timestamp, self.userid, self.tokens, self.user_data
+
+ self.parse_ticket = parse_ticket
+
+ class AuthTicket(object):
+ def __init__(self, secret, userid, remote_addr, **kw):
+ self.secret = secret
+ self.userid = userid
+ self.remote_addr = remote_addr
+ self.kw = kw
+
+ def cookie_value(self):
+ result = {
+ 'secret': self.secret,
+ 'userid': self.userid,
+ 'remote_addr': self.remote_addr,
+ }
+ result.update(self.kw)
+ tokens = result.pop('tokens', None)
+ if tokens is not None:
+ tokens = '|'.join(tokens)
+ result['tokens'] = tokens
+ items = sorted(result.items())
+ new_items = []
+ for k, v in items:
+ if isinstance(v, bytes):
+ v = text_(v)
+ new_items.append((k, v))
+ result = '/'.join(['%s=%s' % (k, v) for k, v in new_items])
+ return result
+
+ self.AuthTicket = AuthTicket
+
+ class BadTicket(Exception):
+ pass
+
+
+class DummyCookies(object):
+ def __init__(self, cookie):
+ self.cookie = cookie
+
+ def get(self, name):
+ return self.cookie
+
+
+class DummyRequest:
+ domain = 'localhost'
+
+ def __init__(self, environ=None, session=None, registry=None, cookie=None):
+ self.environ = environ or {}
+ self.session = session or {}
+ self.registry = registry
+ self.callbacks = []
+ self.cookies = DummyCookies(cookie)
+
+ def add_response_callback(self, callback):
+ self.callbacks.append(callback)
+
+
+class DummyResponse:
+ def __init__(self):
+ self.headerlist = []
diff --git a/tests/test_testing.py b/tests/test_testing.py
index 5b3ad0f22..874d9f11b 100644
--- a/tests/test_testing.py
+++ b/tests/test_testing.py
@@ -23,52 +23,17 @@ class TestDummySecurityPolicy(unittest.TestCase):
return DummySecurityPolicy
- def _makeOne(self, userid=None, groupids=(), permissive=True):
+ def _makeOne(self, identity=None, permissive=True):
klass = self._getTargetClass()
- return klass(userid, groupids, permissive)
+ return klass(identity, permissive)
- def test_authenticated_userid(self):
+ def test_identify(self):
policy = self._makeOne('user')
- self.assertEqual(policy.authenticated_userid(None), 'user')
-
- def test_unauthenticated_userid(self):
- policy = self._makeOne('user')
- self.assertEqual(policy.unauthenticated_userid(None), 'user')
-
- def test_effective_principals_userid(self):
- policy = self._makeOne('user', ('group1',))
- from pyramid.security import Everyone
- from pyramid.security import Authenticated
-
- self.assertEqual(
- policy.effective_principals(None),
- [Everyone, Authenticated, 'user', 'group1'],
- )
-
- def test_effective_principals_nouserid(self):
- policy = self._makeOne()
- from pyramid.security import Everyone
-
- self.assertEqual(policy.effective_principals(None), [Everyone])
+ self.assertEqual(policy.identify(None), 'user')
def test_permits(self):
policy = self._makeOne()
- self.assertEqual(policy.permits(None, None, None), True)
-
- def test_principals_allowed_by_permission(self):
- policy = self._makeOne('user', ('group1',))
- from pyramid.security import Everyone
- from pyramid.security import Authenticated
-
- result = policy.principals_allowed_by_permission(None, None)
- self.assertEqual(result, [Everyone, Authenticated, 'user', 'group1'])
-
- def test_principals_allowed_by_permission_not_permissive(self):
- policy = self._makeOne('user', ('group1',))
- policy.permissive = False
-
- result = policy.principals_allowed_by_permission(None, None)
- self.assertEqual(result, [])
+ self.assertEqual(policy.permits(None, None, None, None), True)
def test_forget(self):
policy = self._makeOne()
diff --git a/tests/test_viewderivers.py b/tests/test_viewderivers.py
index f01cb490e..9a61ea9f1 100644
--- a/tests/test_viewderivers.py
+++ b/tests/test_viewderivers.py
@@ -28,12 +28,11 @@ class TestDeriveView(unittest.TestCase):
return logger
def _registerSecurityPolicy(self, permissive):
- from pyramid.interfaces import IAuthenticationPolicy
- from pyramid.interfaces import IAuthorizationPolicy
+ from pyramid.interfaces import ISecurityPolicy
policy = DummySecurityPolicy(permissive)
- self.config.registry.registerUtility(policy, IAuthenticationPolicy)
- self.config.registry.registerUtility(policy, IAuthorizationPolicy)
+ self.config.registry.registerUtility(policy, ISecurityPolicy)
+ return policy
def test_function_returns_non_adaptable(self):
def view(request):
@@ -421,7 +420,7 @@ class TestDeriveView(unittest.TestCase):
self.assertFalse(hasattr(result, '__call_permissive__'))
self.assertEqual(result(None, None), response)
- def test_with_debug_authorization_no_authpol(self):
+ def test_with_debug_authorization_no_security_policy(self):
response = DummyResponse()
view = lambda *arg: response
self.config.registry.settings = dict(
@@ -442,59 +441,7 @@ class TestDeriveView(unittest.TestCase):
logger.messages[0],
"debug_authorization of url url (view name "
"'view_name' against context None): Allowed "
- "(no authorization policy in use)",
- )
-
- def test_with_debug_authorization_authn_policy_no_authz_policy(self):
- response = DummyResponse()
- view = lambda *arg: response
- self.config.registry.settings = dict(debug_authorization=True)
- from pyramid.interfaces import IAuthenticationPolicy
-
- policy = DummySecurityPolicy(False)
- self.config.registry.registerUtility(policy, IAuthenticationPolicy)
- logger = self._registerLogger()
- result = self.config._derive_view(view, permission='view')
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertFalse(hasattr(result, '__call_permissive__'))
- request = self._makeRequest()
- request.view_name = 'view_name'
- request.url = 'url'
- self.assertEqual(result(None, request), response)
- self.assertEqual(len(logger.messages), 1)
- self.assertEqual(
- logger.messages[0],
- "debug_authorization of url url (view name "
- "'view_name' against context None): Allowed "
- "(no authorization policy in use)",
- )
-
- def test_with_debug_authorization_authz_policy_no_authn_policy(self):
- response = DummyResponse()
- view = lambda *arg: response
- self.config.registry.settings = dict(debug_authorization=True)
- from pyramid.interfaces import IAuthorizationPolicy
-
- policy = DummySecurityPolicy(False)
- self.config.registry.registerUtility(policy, IAuthorizationPolicy)
- logger = self._registerLogger()
- result = self.config._derive_view(view, permission='view')
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertFalse(hasattr(result, '__call_permissive__'))
- request = self._makeRequest()
- request.view_name = 'view_name'
- request.url = 'url'
- self.assertEqual(result(None, request), response)
- self.assertEqual(len(logger.messages), 1)
- self.assertEqual(
- logger.messages[0],
- "debug_authorization of url url (view name "
- "'view_name' against context None): Allowed "
- "(no authorization policy in use)",
+ "(no security policy in use)",
)
def test_with_debug_authorization_no_permission(self):
@@ -665,32 +612,11 @@ class TestDeriveView(unittest.TestCase):
"'view_name' against context Exception()): True",
)
- def test_secured_view_authn_policy_no_authz_policy(self):
+ def test_secured_view_authn_policy_no_security_policy(self):
response = DummyResponse()
view = lambda *arg: response
self.config.registry.settings = {}
- from pyramid.interfaces import IAuthenticationPolicy
- policy = DummySecurityPolicy(False)
- self.config.registry.registerUtility(policy, IAuthenticationPolicy)
- result = self.config._derive_view(view, permission='view')
- self.assertEqual(view.__module__, result.__module__)
- self.assertEqual(view.__doc__, result.__doc__)
- self.assertEqual(view.__name__, result.__name__)
- self.assertFalse(hasattr(result, '__call_permissive__'))
- request = self._makeRequest()
- request.view_name = 'view_name'
- request.url = 'url'
- self.assertEqual(result(None, request), response)
-
- def test_secured_view_authz_policy_no_authn_policy(self):
- response = DummyResponse()
- view = lambda *arg: response
- self.config.registry.settings = {}
- from pyramid.interfaces import IAuthorizationPolicy
-
- policy = DummySecurityPolicy(False)
- self.config.registry.registerUtility(policy, IAuthorizationPolicy)
result = self.config._derive_view(view, permission='view')
self.assertEqual(view.__module__, result.__module__)
self.assertEqual(view.__doc__, result.__doc__)
@@ -702,53 +628,41 @@ class TestDeriveView(unittest.TestCase):
self.assertEqual(result(None, request), response)
def test_secured_view_raises_forbidden_no_name(self):
- from pyramid.interfaces import IAuthenticationPolicy
- from pyramid.interfaces import IAuthorizationPolicy
from pyramid.httpexceptions import HTTPForbidden
response = DummyResponse()
view = lambda *arg: response
self.config.registry.settings = {}
- policy = DummySecurityPolicy(False)
- self.config.registry.registerUtility(policy, IAuthenticationPolicy)
- self.config.registry.registerUtility(policy, IAuthorizationPolicy)
+ self._registerSecurityPolicy(False)
result = self.config._derive_view(view, permission='view')
request = self._makeRequest()
request.view_name = 'view_name'
request.url = 'url'
- try:
+ with self.assertRaises(HTTPForbidden) as cm:
result(None, request)
- except HTTPForbidden as e:
- self.assertEqual(
- e.message, 'Unauthorized: <lambda> failed permission check'
- )
- else: # pragma: no cover
- raise AssertionError
+ self.assertEqual(
+ cm.exception.message,
+ 'Unauthorized: <lambda> failed permission check',
+ )
def test_secured_view_raises_forbidden_with_name(self):
- from pyramid.interfaces import IAuthenticationPolicy
- from pyramid.interfaces import IAuthorizationPolicy
from pyramid.httpexceptions import HTTPForbidden
def myview(request): # pragma: no cover
pass
self.config.registry.settings = {}
- policy = DummySecurityPolicy(False)
- self.config.registry.registerUtility(policy, IAuthenticationPolicy)
- self.config.registry.registerUtility(policy, IAuthorizationPolicy)
+ self._registerSecurityPolicy(False)
result = self.config._derive_view(myview, permission='view')
request = self._makeRequest()
request.view_name = 'view_name'
request.url = 'url'
- try:
+ with self.assertRaises(HTTPForbidden) as cm:
result(None, request)
- except HTTPForbidden as e:
- self.assertEqual(
- e.message, 'Unauthorized: myview failed permission check'
- )
- else: # pragma: no cover
- raise AssertionError
+ self.assertEqual(
+ cm.exception.message,
+ 'Unauthorized: myview failed permission check',
+ )
def test_secured_view_skipped_by_default_on_exception_view(self):
from pyramid.request import Request
@@ -794,12 +708,8 @@ class TestDeriveView(unittest.TestCase):
app = self.config.make_wsgi_app()
request = Request.blank('/foo', base_url='http://example.com')
request.method = 'POST'
- try:
+ with self.assertRaises(HTTPForbidden):
request.get_response(app)
- except HTTPForbidden:
- pass
- else: # pragma: no cover
- raise AssertionError
def test_secured_view_passed_on_explicit_exception_view(self):
from pyramid.request import Request
@@ -2130,10 +2040,10 @@ class DummySecurityPolicy:
def __init__(self, permitted=True):
self.permitted = permitted
- def effective_principals(self, request):
- return []
+ def identify(self, request):
+ return 123
- def permits(self, context, principals, permission):
+ def permits(self, request, context, identity, permission):
return self.permitted