diff options
| author | Theron Luhn <theron@luhn.com> | 2019-04-15 19:05:47 -0700 |
|---|---|---|
| committer | Theron Luhn <theron@luhn.com> | 2019-04-15 19:05:47 -0700 |
| commit | f4c6c993ded900b4e32d1dd49207ca1e18b11336 (patch) | |
| tree | 6c8bc3f9148699408fcfddcd52d95e51be662faf | |
| parent | 47f8935bb5423718ec293bba4709307be7d2f51c (diff) | |
| download | pyramid-f4c6c993ded900b4e32d1dd49207ca1e18b11336.tar.gz pyramid-f4c6c993ded900b4e32d1dd49207ca1e18b11336.tar.bz2 pyramid-f4c6c993ded900b4e32d1dd49207ca1e18b11336.zip | |
Revert "Migrate AuthTktCookieHelper to pyramid.security."
This reverts commit 9f267dd842c5e93336f0392f2809da75a716039a.
| -rw-r--r-- | src/pyramid/authentication.py | 444 | ||||
| -rw-r--r-- | src/pyramid/security.py | 558 | ||||
| -rw-r--r-- | tests/test_authentication.py | 1000 | ||||
| -rw-r--r-- | tests/test_security.py | 1008 |
4 files changed, 1431 insertions, 1579 deletions
diff --git a/src/pyramid/authentication.py b/src/pyramid/authentication.py index 12f31e5dd..21cfc0c0e 100644 --- a/src/pyramid/authentication.py +++ b/src/pyramid/authentication.py @@ -1,23 +1,26 @@ import binascii +from codecs import utf_8_decode +from codecs import utf_8_encode from collections import namedtuple +import hashlib +import base64 +import re +import time as time_mod +from urllib.parse import quote, unquote +import warnings from zope.interface import implementer +from webob.cookies import CookieProfile + from pyramid.interfaces import IAuthenticationPolicy, IDebugLogger -from pyramid.security import Authenticated, Everyone, AuthTktCookieHelper - -# bw compat after moving AuthTktHelper and friends to pyramid.security -from pyramid.security import ( # noqa - VALID_TOKEN, - b64encode, - b64decode, - AuthTicket, - BadTicket, - parse_ticket, - calculate_digest, - encode_ip_timestamp, -) +from pyramid.security import Authenticated, Everyone + +from pyramid.util import strings_differ, bytes_, ascii_, text_ +from pyramid.util import SimpleSerializer + +VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$") class CallbackAuthenticationPolicy(object): @@ -648,6 +651,421 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy): return self.cookie.forget(request) +def b64encode(v): + return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'') + + +def b64decode(v): + return base64.b64decode(bytes_(v)) + + +# this class licensed under the MIT license (stolen from Paste) +class AuthTicket(object): + """ + This class represents an authentication token. You must pass in + the shared secret, the userid, and the IP address. Optionally you + can include tokens (a list of strings, representing role names), + 'user_data', which is arbitrary data available for your own use in + later scripts. Lastly, you can override the cookie name and + timestamp. + + Once you provide all the arguments, use .cookie_value() to + generate the appropriate authentication ticket. + + Usage:: + + token = AuthTicket('sharedsecret', 'username', + os.environ['REMOTE_ADDR'], tokens=['admin']) + val = token.cookie_value() + + """ + + def __init__( + self, + secret, + userid, + ip, + tokens=(), + user_data='', + time=None, + cookie_name='auth_tkt', + secure=False, + hashalg='md5', + ): + self.secret = secret + self.userid = userid + self.ip = ip + self.tokens = ','.join(tokens) + self.user_data = user_data + if time is None: + self.time = time_mod.time() + else: + self.time = time + self.cookie_name = cookie_name + self.secure = secure + self.hashalg = hashalg + + def digest(self): + return calculate_digest( + self.ip, + self.time, + self.secret, + self.userid, + self.tokens, + self.user_data, + self.hashalg, + ) + + def cookie_value(self): + v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid)) + if self.tokens: + v += self.tokens + '!' + v += self.user_data + return v + + +# this class licensed under the MIT license (stolen from Paste) +class BadTicket(Exception): + """ + Exception raised when a ticket can't be parsed. If we get far enough to + determine what the expected digest should have been, expected is set. + This should not be shown by default, but can be useful for debugging. + """ + + def __init__(self, msg, expected=None): + self.expected = expected + Exception.__init__(self, msg) + + +# this function licensed under the MIT license (stolen from Paste) +def parse_ticket(secret, ticket, ip, hashalg='md5'): + """ + Parse the ticket, returning (timestamp, userid, tokens, user_data). + + If the ticket cannot be parsed, a ``BadTicket`` exception will be raised + with an explanation. + """ + ticket = text_(ticket).strip('"') + digest_size = hashlib.new(hashalg).digest_size * 2 + digest = ticket[:digest_size] + try: + timestamp = int(ticket[digest_size : digest_size + 8], 16) + except ValueError as e: + raise BadTicket('Timestamp is not a hex integer: %s' % e) + try: + userid, data = ticket[digest_size + 8 :].split('!', 1) + except ValueError: + raise BadTicket('userid is not followed by !') + userid = unquote(userid) + if '!' in data: + tokens, user_data = data.split('!', 1) + else: # pragma: no cover (never generated) + # @@: Is this the right order? + tokens = '' + user_data = data + + expected = calculate_digest( + ip, timestamp, secret, userid, tokens, user_data, hashalg + ) + + # Avoid timing attacks (see + # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf) + if strings_differ(expected, digest): + raise BadTicket( + 'Digest signature is not correct', expected=(expected, digest) + ) + + tokens = tokens.split(',') + + return (timestamp, userid, tokens, user_data) + + +# this function licensed under the MIT license (stolen from Paste) +def calculate_digest( + ip, timestamp, secret, userid, tokens, user_data, hashalg='md5' +): + secret = bytes_(secret, 'utf-8') + userid = bytes_(userid, 'utf-8') + tokens = bytes_(tokens, 'utf-8') + user_data = bytes_(user_data, 'utf-8') + hash_obj = hashlib.new(hashalg) + + # Check to see if this is an IPv6 address + if ':' in ip: + ip_timestamp = ip + str(int(timestamp)) + ip_timestamp = bytes_(ip_timestamp) + else: + # encode_ip_timestamp not required, left in for backwards compatibility + ip_timestamp = encode_ip_timestamp(ip, timestamp) + + hash_obj.update( + ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data + ) + digest = hash_obj.hexdigest() + hash_obj2 = hashlib.new(hashalg) + hash_obj2.update(bytes_(digest) + secret) + return hash_obj2.hexdigest() + + +# this function licensed under the MIT license (stolen from Paste) +def encode_ip_timestamp(ip, timestamp): + ip_chars = ''.join(map(chr, map(int, ip.split('.')))) + t = int(timestamp) + ts = ( + (t & 0xFF000000) >> 24, + (t & 0xFF0000) >> 16, + (t & 0xFF00) >> 8, + t & 0xFF, + ) + ts_chars = ''.join(map(chr, ts)) + return bytes_(ip_chars + ts_chars) + + +class AuthTktCookieHelper(object): + """ + A helper class for use in third-party authentication policy + implementations. See + :class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the + meanings of the constructor arguments. + """ + + parse_ticket = staticmethod(parse_ticket) # for tests + AuthTicket = AuthTicket # for tests + BadTicket = BadTicket # for tests + now = None # for tests + + userid_type_decoders = { + 'int': int, + 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies + 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0], + 'b64str': lambda x: b64decode(x), + } + + userid_type_encoders = { + int: ('int', str), + str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])), + bytes: ('b64str', lambda x: b64encode(x)), + } + + def __init__( + self, + secret, + cookie_name='auth_tkt', + secure=False, + include_ip=False, + timeout=None, + reissue_time=None, + max_age=None, + http_only=False, + path="/", + wild_domain=True, + hashalg='md5', + parent_domain=False, + domain=None, + samesite='Lax', + ): + self.cookie_profile = CookieProfile( + cookie_name=cookie_name, + secure=secure, + max_age=max_age, + httponly=http_only, + path=path, + serializer=SimpleSerializer(), + samesite=samesite, + ) + + self.secret = secret + self.cookie_name = cookie_name + self.secure = secure + self.include_ip = include_ip + self.timeout = timeout if timeout is None else int(timeout) + self.reissue_time = ( + reissue_time if reissue_time is None else int(reissue_time) + ) + self.max_age = max_age if max_age is None else int(max_age) + self.wild_domain = wild_domain + self.parent_domain = parent_domain + self.domain = domain + self.hashalg = hashalg + + def _get_cookies(self, request, value, max_age=None): + cur_domain = request.domain + + domains = [] + if self.domain: + domains.append(self.domain) + else: + if self.parent_domain and cur_domain.count('.') > 1: + domains.append('.' + cur_domain.split('.', 1)[1]) + else: + domains.append(None) + domains.append(cur_domain) + if self.wild_domain: + domains.append('.' + cur_domain) + + profile = self.cookie_profile(request) + + kw = {} + kw['domains'] = domains + if max_age is not None: + kw['max_age'] = max_age + + headers = profile.get_headers(value, **kw) + return headers + + def identify(self, request): + """ Return a dictionary with authentication information, or ``None`` + if no valid auth_tkt is attached to ``request``""" + environ = request.environ + cookie = request.cookies.get(self.cookie_name) + + if cookie is None: + return None + + if self.include_ip: + remote_addr = environ['REMOTE_ADDR'] + else: + remote_addr = '0.0.0.0' + + try: + timestamp, userid, tokens, user_data = self.parse_ticket( + self.secret, cookie, remote_addr, self.hashalg + ) + except self.BadTicket: + return None + + now = self.now # service tests + + if now is None: + now = time_mod.time() + + if self.timeout and ((timestamp + self.timeout) < now): + # the auth_tkt data has expired + return None + + userid_typename = 'userid_type:' + user_data_info = user_data.split('|') + for datum in filter(None, user_data_info): + if datum.startswith(userid_typename): + userid_type = datum[len(userid_typename) :] + decoder = self.userid_type_decoders.get(userid_type) + if decoder: + userid = decoder(userid) + + reissue = self.reissue_time is not None + + if reissue and not hasattr(request, '_authtkt_reissued'): + if (now - timestamp) > self.reissue_time: + # See https://github.com/Pylons/pyramid/issues#issue/108 + tokens = list(filter(None, tokens)) + headers = self.remember( + request, userid, max_age=self.max_age, tokens=tokens + ) + + def reissue_authtkt(request, response): + if not hasattr(request, '_authtkt_reissue_revoked'): + for k, v in headers: + response.headerlist.append((k, v)) + + request.add_response_callback(reissue_authtkt) + request._authtkt_reissued = True + + environ['REMOTE_USER_TOKENS'] = tokens + environ['REMOTE_USER_DATA'] = user_data + environ['AUTH_TYPE'] = 'cookie' + + identity = {} + identity['timestamp'] = timestamp + identity['userid'] = userid + identity['tokens'] = tokens + identity['userdata'] = user_data + return identity + + def forget(self, request): + """ Return a set of expires Set-Cookie headers, which will destroy + any existing auth_tkt cookie when attached to a response""" + request._authtkt_reissue_revoked = True + return self._get_cookies(request, None) + + def remember(self, request, userid, max_age=None, tokens=()): + """ Return a set of Set-Cookie headers; when set into a response, + these headers will represent a valid authentication ticket. + + ``max_age`` + The max age of the auth_tkt cookie, in seconds. When this value is + set, the cookie's ``Max-Age`` and ``Expires`` settings will be set, + allowing the auth_tkt cookie to last between browser sessions. If + this value is ``None``, the ``max_age`` value provided to the + helper itself will be used as the ``max_age`` value. Default: + ``None``. + + ``tokens`` + A sequence of strings that will be placed into the auth_tkt tokens + field. Each string in the sequence must be of the Python ``str`` + type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``. + Tokens are available in the returned identity when an auth_tkt is + found in the request and unpacked. Default: ``()``. + """ + max_age = self.max_age if max_age is None else int(max_age) + + environ = request.environ + + if self.include_ip: + remote_addr = environ['REMOTE_ADDR'] + else: + remote_addr = '0.0.0.0' + + user_data = '' + + encoding_data = self.userid_type_encoders.get(type(userid)) + + if encoding_data: + encoding, encoder = encoding_data + else: + warnings.warn( + "userid is of type {}, and is not supported by the " + "AuthTktAuthenticationPolicy. Explicitly converting to string " + "and storing as base64. Subsequent requests will receive a " + "string as the userid, it will not be decoded back to the " + "type provided.".format(type(userid)), + RuntimeWarning, + ) + encoding, encoder = self.userid_type_encoders.get(str) + userid = str(userid) + + userid = encoder(userid) + user_data = 'userid_type:%s' % encoding + + new_tokens = [] + for token in tokens: + if isinstance(token, str): + try: + token = ascii_(token) + except UnicodeEncodeError: + raise ValueError("Invalid token %r" % (token,)) + if not (isinstance(token, str) and VALID_TOKEN.match(token)): + raise ValueError("Invalid token %r" % (token,)) + new_tokens.append(token) + tokens = tuple(new_tokens) + + if hasattr(request, '_authtkt_reissued'): + request._authtkt_reissue_revoked = True + + ticket = self.AuthTicket( + self.secret, + userid, + remote_addr, + tokens=tokens, + user_data=user_data, + cookie_name=self.cookie_name, + secure=self.secure, + hashalg=self.hashalg, + ) + + cookie_value = ticket.cookie_value() + return self._get_cookies(request, cookie_value, max_age) + + @implementer(IAuthenticationPolicy) class SessionAuthenticationPolicy(CallbackAuthenticationPolicy): """ A :app:`Pyramid` authentication policy which gets its data from the diff --git a/src/pyramid/security.py b/src/pyramid/security.py index a55320ce6..dda61ef27 100644 --- a/src/pyramid/security.py +++ b/src/pyramid/security.py @@ -1,14 +1,3 @@ -from codecs import utf_8_decode -from codecs import utf_8_encode -import hashlib -import base64 -import time as time_mod -from urllib.parse import quote, unquote -import warnings -import re - -from webob.cookies import CookieProfile - from zope.interface import implementer, providedBy from pyramid.interfaces import ( @@ -22,14 +11,10 @@ from pyramid.interfaces import ( from pyramid.location import lineage -from pyramid.util import is_nonstr_iter, strings_differ, bytes_, ascii_, text_ - -from pyramid.util import SimpleSerializer +from pyramid.util import is_nonstr_iter from pyramid.threadlocal import get_current_registry -VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$") - Everyone = 'system.Everyone' Authenticated = 'system.Authenticated' Allow = 'Allow' @@ -586,544 +571,3 @@ class SessionAuthenticationHelper: def identify(self, request): return request.session.get(self.userid_key) - - -def b64encode(v): - return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'') - - -def b64decode(v): - return base64.b64decode(bytes_(v)) - - -# this class licensed under the MIT license (stolen from Paste) -class AuthTicket(object): - """ - This class represents an authentication token. You must pass in - the shared secret, the userid, and the IP address. Optionally you - can include tokens (a list of strings, representing role names), - 'user_data', which is arbitrary data available for your own use in - later scripts. Lastly, you can override the cookie name and - timestamp. - - Once you provide all the arguments, use .cookie_value() to - generate the appropriate authentication ticket. - - Usage:: - - token = AuthTicket('sharedsecret', 'username', - os.environ['REMOTE_ADDR'], tokens=['admin']) - val = token.cookie_value() - - """ - - def __init__( - self, - secret, - userid, - ip, - tokens=(), - user_data='', - time=None, - cookie_name='auth_tkt', - secure=False, - hashalg='md5', - ): - self.secret = secret - self.userid = userid - self.ip = ip - self.tokens = ','.join(tokens) - self.user_data = user_data - if time is None: - self.time = time_mod.time() - else: - self.time = time - self.cookie_name = cookie_name - self.secure = secure - self.hashalg = hashalg - - def digest(self): - return calculate_digest( - self.ip, - self.time, - self.secret, - self.userid, - self.tokens, - self.user_data, - self.hashalg, - ) - - def cookie_value(self): - v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid)) - if self.tokens: - v += self.tokens + '!' - v += self.user_data - return v - - -# this class licensed under the MIT license (stolen from Paste) -class BadTicket(Exception): - """ - Exception raised when a ticket can't be parsed. If we get far enough to - determine what the expected digest should have been, expected is set. - This should not be shown by default, but can be useful for debugging. - """ - - def __init__(self, msg, expected=None): - self.expected = expected - Exception.__init__(self, msg) - - -# this function licensed under the MIT license (stolen from Paste) -def parse_ticket(secret, ticket, ip, hashalg='md5'): - """ - Parse the ticket, returning (timestamp, userid, tokens, user_data). - - If the ticket cannot be parsed, a ``BadTicket`` exception will be raised - with an explanation. - """ - ticket = text_(ticket).strip('"') - digest_size = hashlib.new(hashalg).digest_size * 2 - digest = ticket[:digest_size] - try: - timestamp = int(ticket[digest_size : digest_size + 8], 16) - except ValueError as e: - raise BadTicket('Timestamp is not a hex integer: %s' % e) - try: - userid, data = ticket[digest_size + 8 :].split('!', 1) - except ValueError: - raise BadTicket('userid is not followed by !') - userid = unquote(userid) - if '!' in data: - tokens, user_data = data.split('!', 1) - else: # pragma: no cover (never generated) - # @@: Is this the right order? - tokens = '' - user_data = data - - expected = calculate_digest( - ip, timestamp, secret, userid, tokens, user_data, hashalg - ) - - # Avoid timing attacks (see - # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf) - if strings_differ(expected, digest): - raise BadTicket( - 'Digest signature is not correct', expected=(expected, digest) - ) - - tokens = tokens.split(',') - - return (timestamp, userid, tokens, user_data) - - -# this function licensed under the MIT license (stolen from Paste) -def calculate_digest( - ip, timestamp, secret, userid, tokens, user_data, hashalg='md5' -): - secret = bytes_(secret, 'utf-8') - userid = bytes_(userid, 'utf-8') - tokens = bytes_(tokens, 'utf-8') - user_data = bytes_(user_data, 'utf-8') - hash_obj = hashlib.new(hashalg) - - # Check to see if this is an IPv6 address - if ':' in ip: - ip_timestamp = ip + str(int(timestamp)) - ip_timestamp = bytes_(ip_timestamp) - else: - # encode_ip_timestamp not required, left in for backwards compatibility - ip_timestamp = encode_ip_timestamp(ip, timestamp) - - hash_obj.update( - ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data - ) - digest = hash_obj.hexdigest() - hash_obj2 = hashlib.new(hashalg) - hash_obj2.update(bytes_(digest) + secret) - return hash_obj2.hexdigest() - - -# this function licensed under the MIT license (stolen from Paste) -def encode_ip_timestamp(ip, timestamp): - ip_chars = ''.join(map(chr, map(int, ip.split('.')))) - t = int(timestamp) - ts = ( - (t & 0xFF000000) >> 24, - (t & 0xFF0000) >> 16, - (t & 0xFF00) >> 8, - t & 0xFF, - ) - ts_chars = ''.join(map(chr, ts)) - return bytes_(ip_chars + ts_chars) - - -class AuthTktCookieHelper: - """ - A helper class used for constructing a :term:`security policy` with stores - the user identity in a signed cookie. - - Constructor Arguments - - ``secret`` - - The secret (a string) used for auth_tkt cookie signing. This value - should be unique across all values provided to Pyramid for various - subsystem secrets (see :ref:`admonishment_against_secret_sharing`). - Required. - - ``cookie_name`` - - Default: ``auth_tkt``. The cookie name used - (string). Optional. - - ``secure`` - - Default: ``False``. Only send the cookie back over a secure - conn. Optional. - - ``include_ip`` - - Default: ``False``. Make the requesting IP address part of - the authentication data in the cookie. Optional. - - For IPv6 this option is not recommended. The ``mod_auth_tkt`` - specification does not specify how to handle IPv6 addresses, so using - this option in combination with IPv6 addresses may cause an - incompatible cookie. It ties the authentication ticket to that - individual's IPv6 address. - - ``timeout`` - - Default: ``None``. Maximum number of seconds which a newly - issued ticket will be considered valid. After this amount of - time, the ticket will expire (effectively logging the user - out). If this value is ``None``, the ticket never expires. - Optional. - - ``reissue_time`` - - Default: ``None``. If this parameter is set, it represents the number - of seconds that must pass before an authentication token cookie is - automatically reissued as the result of a request which requires - authentication. The duration is measured as the number of seconds - since the last auth_tkt cookie was issued and 'now'. If this value is - ``0``, a new ticket cookie will be reissued on every request which - requires authentication. - - A good rule of thumb: if you want auto-expired cookies based on - inactivity: set the ``timeout`` value to 1200 (20 mins) and set the - ``reissue_time`` value to perhaps a tenth of the ``timeout`` value - (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower - than the ``reissue_time`` value, as the ticket will never be reissued - if so. However, such a configuration is not explicitly prevented. - - Optional. - - ``max_age`` - - Default: ``None``. The max age of the auth_tkt cookie, in - seconds. This differs from ``timeout`` inasmuch as ``timeout`` - represents the lifetime of the ticket contained in the cookie, - while this value represents the lifetime of the cookie itself. - When this value is set, the cookie's ``Max-Age`` and - ``Expires`` settings will be set, allowing the auth_tkt cookie - to last between browser sessions. It is typically nonsensical - to set this to a value that is lower than ``timeout`` or - ``reissue_time``, although it is not explicitly prevented. - Optional. - - ``path`` - - Default: ``/``. The path for which the auth_tkt cookie is valid. - May be desirable if the application only serves part of a domain. - Optional. - - ``http_only`` - - Default: ``False``. Hide cookie from JavaScript by setting the - HttpOnly flag. Not honored by all browsers. - Optional. - - ``wild_domain`` - - Default: ``True``. An auth_tkt cookie will be generated for the - wildcard domain. If your site is hosted as ``example.com`` this - will make the cookie available for sites underneath ``example.com`` - such as ``www.example.com``. - Optional. - - ``parent_domain`` - - Default: ``False``. An auth_tkt cookie will be generated for the - parent domain of the current site. For example if your site is - hosted under ``www.example.com`` a cookie will be generated for - ``.example.com``. This can be useful if you have multiple sites - sharing the same domain. This option supercedes the ``wild_domain`` - option. - Optional. - - ``domain`` - - Default: ``None``. If provided the auth_tkt cookie will only be - set for this domain. This option is not compatible with ``wild_domain`` - and ``parent_domain``. - Optional. - - ``hashalg`` - - Default: ``sha512`` (the literal string). - - Any hash algorithm supported by Python's ``hashlib.new()`` function - can be used as the ``hashalg``. - - Cookies generated by different instances of AuthTktAuthenticationPolicy - using different ``hashalg`` options are not compatible. Switching the - ``hashalg`` will imply that all existing users with a valid cookie will - be required to re-login. - - Optional. - - ``samesite`` - - Default: ``'Lax'``. The 'samesite' option of the session cookie. Set - the value to ``None`` to turn off the samesite option. - - This option is available as of :app:`Pyramid` 1.10. - """ - - parse_ticket = staticmethod(parse_ticket) # for tests - AuthTicket = AuthTicket # for tests - BadTicket = BadTicket # for tests - now = None # for tests - - userid_type_decoders = { - 'int': int, - 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies - 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0], - 'b64str': lambda x: b64decode(x), - } - - userid_type_encoders = { - int: ('int', str), - str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])), - bytes: ('b64str', lambda x: b64encode(x)), - } - - def __init__( - self, - secret, - cookie_name='auth_tkt', - secure=False, - include_ip=False, - timeout=None, - reissue_time=None, - max_age=None, - http_only=False, - path="/", - wild_domain=True, - hashalg='md5', - parent_domain=False, - domain=None, - samesite='Lax', - ): - self.cookie_profile = CookieProfile( - cookie_name=cookie_name, - secure=secure, - max_age=max_age, - httponly=http_only, - path=path, - serializer=SimpleSerializer(), - samesite=samesite, - ) - - self.secret = secret - self.cookie_name = cookie_name - self.secure = secure - self.include_ip = include_ip - self.timeout = timeout if timeout is None else int(timeout) - self.reissue_time = ( - reissue_time if reissue_time is None else int(reissue_time) - ) - self.max_age = max_age if max_age is None else int(max_age) - self.wild_domain = wild_domain - self.parent_domain = parent_domain - self.domain = domain - self.hashalg = hashalg - - def _get_cookies(self, request, value, max_age=None): - cur_domain = request.domain - - domains = [] - if self.domain: - domains.append(self.domain) - else: - if self.parent_domain and cur_domain.count('.') > 1: - domains.append('.' + cur_domain.split('.', 1)[1]) - else: - domains.append(None) - domains.append(cur_domain) - if self.wild_domain: - domains.append('.' + cur_domain) - - profile = self.cookie_profile(request) - - kw = {} - kw['domains'] = domains - if max_age is not None: - kw['max_age'] = max_age - - headers = profile.get_headers(value, **kw) - return headers - - def identify(self, request): - """ Return a dictionary with authentication information, or ``None`` - if no valid auth_tkt is attached to ``request``""" - environ = request.environ - cookie = request.cookies.get(self.cookie_name) - - if cookie is None: - return None - - if self.include_ip: - remote_addr = environ['REMOTE_ADDR'] - else: - remote_addr = '0.0.0.0' - - try: - timestamp, userid, tokens, user_data = self.parse_ticket( - self.secret, cookie, remote_addr, self.hashalg - ) - except self.BadTicket: - return None - - now = self.now # service tests - - if now is None: - now = time_mod.time() - - if self.timeout and ((timestamp + self.timeout) < now): - # the auth_tkt data has expired - return None - - userid_typename = 'userid_type:' - user_data_info = user_data.split('|') - for datum in filter(None, user_data_info): - if datum.startswith(userid_typename): - userid_type = datum[len(userid_typename) :] - decoder = self.userid_type_decoders.get(userid_type) - if decoder: - userid = decoder(userid) - - reissue = self.reissue_time is not None - - if reissue and not hasattr(request, '_authtkt_reissued'): - if (now - timestamp) > self.reissue_time: - # See https://github.com/Pylons/pyramid/issues#issue/108 - tokens = list(filter(None, tokens)) - headers = self.remember( - request, userid, max_age=self.max_age, tokens=tokens - ) - - def reissue_authtkt(request, response): - if not hasattr(request, '_authtkt_reissue_revoked'): - for k, v in headers: - response.headerlist.append((k, v)) - - request.add_response_callback(reissue_authtkt) - request._authtkt_reissued = True - - environ['REMOTE_USER_TOKENS'] = tokens - environ['REMOTE_USER_DATA'] = user_data - environ['AUTH_TYPE'] = 'cookie' - - identity = {} - identity['timestamp'] = timestamp - identity['userid'] = userid - identity['tokens'] = tokens - identity['userdata'] = user_data - return identity - - def forget(self, request): - """ Return a set of expires Set-Cookie headers, which will destroy - any existing auth_tkt cookie when attached to a response""" - request._authtkt_reissue_revoked = True - return self._get_cookies(request, None) - - def remember(self, request, userid, max_age=None, tokens=()): - """ Return a set of Set-Cookie headers; when set into a response, - these headers will represent a valid authentication ticket. - - ``max_age`` - The max age of the auth_tkt cookie, in seconds. When this value is - set, the cookie's ``Max-Age`` and ``Expires`` settings will be set, - allowing the auth_tkt cookie to last between browser sessions. If - this value is ``None``, the ``max_age`` value provided to the - helper itself will be used as the ``max_age`` value. Default: - ``None``. - - ``tokens`` - A sequence of strings that will be placed into the auth_tkt tokens - field. Each string in the sequence must be of the Python ``str`` - type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``. - Tokens are available in the returned identity when an auth_tkt is - found in the request and unpacked. Default: ``()``. - """ - max_age = self.max_age if max_age is None else int(max_age) - - environ = request.environ - - if self.include_ip: - remote_addr = environ['REMOTE_ADDR'] - else: - remote_addr = '0.0.0.0' - - user_data = '' - - encoding_data = self.userid_type_encoders.get(type(userid)) - - if encoding_data: - encoding, encoder = encoding_data - else: - warnings.warn( - "userid is of type {}, and is not supported by the " - "AuthTktAuthenticationPolicy. Explicitly converting to string " - "and storing as base64. Subsequent requests will receive a " - "string as the userid, it will not be decoded back to the " - "type provided.".format(type(userid)), - RuntimeWarning, - ) - encoding, encoder = self.userid_type_encoders.get(str) - userid = str(userid) - - userid = encoder(userid) - user_data = 'userid_type:%s' % encoding - - new_tokens = [] - for token in tokens: - if isinstance(token, str): - try: - token = ascii_(token) - except UnicodeEncodeError: - raise ValueError("Invalid token %r" % (token,)) - if not (isinstance(token, str) and VALID_TOKEN.match(token)): - raise ValueError("Invalid token %r" % (token,)) - new_tokens.append(token) - tokens = tuple(new_tokens) - - if hasattr(request, '_authtkt_reissued'): - request._authtkt_reissue_revoked = True - - ticket = self.AuthTicket( - self.secret, - userid, - remote_addr, - tokens=tokens, - user_data=user_data, - cookie_name=self.cookie_name, - secure=self.secure, - hashalg=self.hashalg, - ) - - cookie_value = ticket.cookie_value() - return self._get_cookies(request, cookie_value, max_age) diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 89cf9866d..8671eba05 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -1,7 +1,8 @@ +from http.cookies import SimpleCookie import unittest import warnings from pyramid import testing -from pyramid.util import bytes_ +from pyramid.util import text_, bytes_ class TestCallbackAuthenticationPolicyDebugging(unittest.TestCase): @@ -663,6 +664,926 @@ class TestAuthTktAuthenticationPolicy(unittest.TestCase): verifyObject(IAuthenticationPolicy, self._makeOne(None, None)) +class TestAuthTktCookieHelper(unittest.TestCase): + def _getTargetClass(self): + from pyramid.authentication import AuthTktCookieHelper + + return AuthTktCookieHelper + + def _makeOne(self, *arg, **kw): + helper = self._getTargetClass()(*arg, **kw) + # laziness after moving auth_tkt classes and funcs into + # authentication module + auth_tkt = DummyAuthTktModule() + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + return helper + + def _makeRequest(self, cookie=None, ipv6=False): + environ = {'wsgi.version': (1, 0)} + + if ipv6 is False: + environ['REMOTE_ADDR'] = '1.1.1.1' + else: + environ['REMOTE_ADDR'] = '::1' + environ['SERVER_NAME'] = 'localhost' + return DummyRequest(environ, cookie=cookie) + + def _cookieValue(self, cookie): + items = cookie.value.split('/') + D = {} + for item in items: + k, v = item.split('=', 1) + D[k] = v + return D + + def _parseHeaders(self, headers): + return [self._parseHeader(header) for header in headers] + + def _parseHeader(self, header): + cookie = self._parseCookie(header[1]) + return cookie + + def _parseCookie(self, cookie): + cookies = SimpleCookie() + cookies.load(cookie) + return cookies.get('auth_tkt') + + def test_init_cookie_str_reissue_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', reissue_time='invalid value' + ) + + def test_init_cookie_str_timeout_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', timeout='invalid value' + ) + + def test_init_cookie_str_max_age_invalid(self): + self.assertRaises( + ValueError, self._makeOne, 'secret', max_age='invalid value' + ) + + def test_identify_nocookie(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_cookie_value_is_None(self): + helper = self._makeOne('secret') + request = self._makeRequest(None) + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_good_cookie_include_ip(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_include_ipv6(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket', ipv6=True) + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '::1') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_dont_include_ip(self): + helper = self._makeOne('secret', include_ip=False) + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'userid') + self.assertEqual(result['userdata'], '') + self.assertEqual(result['timestamp'], 0) + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') + self.assertEqual(helper.auth_tkt.secret, 'secret') + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], '') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_int_useridtype(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'userid_type:int' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 1) + self.assertEqual(result['userdata'], 'userid_type:int') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_nonuseridtype_user_data(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'bogus:int' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], '1') + self.assertEqual(result['userdata'], 'bogus:int') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_unknown_useridtype(self): + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = 'abc' + helper.auth_tkt.user_data = 'userid_type:unknown' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], 'abc') + self.assertEqual(result['userdata'], 'userid_type:unknown') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_b64str_useridtype(self): + from base64 import b64encode + + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = b64encode(b'encoded').strip() + helper.auth_tkt.user_data = 'userid_type:b64str' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], b'encoded') + self.assertEqual(result['userdata'], 'userid_type:b64str') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_good_cookie_b64unicode_useridtype(self): + from base64 import b64encode + + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip() + helper.auth_tkt.user_data = 'userid_type:b64unicode' + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(len(result), 4) + self.assertEqual(result['tokens'], ()) + self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8')) + self.assertEqual(result['userdata'], 'userid_type:b64unicode') + self.assertEqual(result['timestamp'], 0) + environ = request.environ + self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) + self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode') + self.assertEqual(environ['AUTH_TYPE'], 'cookie') + + def test_identify_bad_cookie(self): + helper = self._makeOne('secret', include_ip=True) + helper.auth_tkt.parse_raise = True + request = self._makeRequest('ticket') + result = helper.identify(request) + self.assertEqual(result, None) + + def test_identify_cookie_timeout(self): + helper = self._makeOne('secret', timeout=1) + self.assertEqual(helper.timeout, 1) + + def test_identify_cookie_str_timeout(self): + helper = self._makeOne('secret', timeout='1') + self.assertEqual(helper.timeout, 1) + + def test_identify_cookie_timeout_aged(self): + import time + + helper = self._makeOne('secret', timeout=10) + now = time.time() + helper.auth_tkt.timestamp = now - 1 + helper.now = now + 10 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertFalse(result) + + def test_identify_cookie_reissue(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + + def test_identify_cookie_str_reissue(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time='0') + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + helper.auth_tkt.tokens = (text_('a'),) + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + + def test_identify_cookie_reissue_already_reissued_this_request(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + request._authtkt_reissued = True + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 0) + + def test_identify_cookie_reissue_notyet(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=10) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 0) + + def test_identify_cookie_reissue_revoked_by_forget(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + result = helper.forget(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 0) + + def test_identify_cookie_reissue_revoked_by_remember(self): + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + result = helper.remember(request, 'bob') + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](request, response) + self.assertEqual(len(response.headerlist), 0) + + def test_identify_cookie_reissue_with_tokens_default(self): + # see https://github.com/Pylons/pyramid/issues#issue/108 + import time + + helper = self._makeOne('secret', timeout=10, reissue_time=0) + auth_tkt = DummyAuthTktModule(tokens=['']) + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + now = time.time() + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) + self.assertTrue(result) + self.assertEqual(len(request.callbacks), 1) + response = DummyResponse() + request.callbacks[0](None, response) + self.assertEqual(len(response.headerlist), 3) + self.assertEqual(response.headerlist[0][0], 'Set-Cookie') + self.assertTrue("/tokens=/" in response.headerlist[0][1]) + + def test_remember(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_nondefault_samesite(self): + helper = self._makeOne('secret', samesite='Strict') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith( + '; Domain=localhost; Path=/; SameSite=Strict' + ) + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith( + '; Domain=.localhost; Path=/; SameSite=Strict' + ) + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_None_samesite(self): + helper = self._makeOne('secret', samesite=None) + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/')) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/')) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_include_ip(self): + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_path(self): + helper = self._makeOne( + 'secret', include_ip=True, path="/cgi-bin/app.cgi/" + ) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax') + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith( + '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' + ) + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue( + result[2][1].endswith( + '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' + ) + ) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_http_only(self): + helper = self._makeOne('secret', include_ip=True, http_only=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue('; HttpOnly' in result[1][1]) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue('; HttpOnly' in result[2][1]) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_secure(self): + helper = self._makeOne('secret', include_ip=True, secure=True) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue('; secure' in result[0][1]) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue('; secure' in result[1][1]) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue('; secure' in result[2][1]) + self.assertTrue(result[2][1].startswith('auth_tkt=')) + + def test_remember_wild_domain_disabled(self): + helper = self._makeOne('secret', wild_domain=False) + request = self._makeRequest() + result = helper.remember(request, 'other') + self.assertEqual(len(result), 2) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue( + result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') + ) + self.assertTrue(result[1][1].startswith('auth_tkt=')) + + def test_remember_parent_domain(self): + helper = self._makeOne('secret', parent_domain=True) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith( + '; Domain=.example.com; Path=/; SameSite=Lax' + ) + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + def test_remember_parent_domain_supercedes_wild_domain(self): + helper = self._makeOne('secret', parent_domain=True, wild_domain=True) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + self.assertTrue( + result[0][1].endswith( + '; Domain=.example.com; Path=/; SameSite=Lax' + ) + ) + + def test_remember_explicit_domain(self): + helper = self._makeOne('secret', domain='pyramid.bazinga') + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue( + result[0][1].endswith( + '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' + ) + ) + self.assertTrue(result[0][1].startswith('auth_tkt=')) + + def test_remember_domain_supercedes_parent_and_wild_domain(self): + helper = self._makeOne( + 'secret', + domain='pyramid.bazinga', + parent_domain=True, + wild_domain=True, + ) + request = self._makeRequest() + request.domain = 'www.example.com' + result = helper.remember(request, 'other') + self.assertEqual(len(result), 1) + self.assertTrue( + result[0][1].endswith( + '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' + ) + ) + + def test_remember_binary_userid(self): + import base64 + + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, b'userid') + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual( + val['userid'], text_(base64.b64encode(b'userid').strip()) + ) + self.assertEqual(val['user_data'], 'userid_type:b64str') + + def test_remember_int_userid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 1) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual(val['userid'], '1') + self.assertEqual(val['user_data'], 'userid_type:int') + + def test_remember_unicode_userid(self): + import base64 + + helper = self._makeOne('secret') + request = self._makeRequest() + userid = text_(b'\xc2\xa9', 'utf-8') + result = helper.remember(request, userid) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + val = self._cookieValue(values[0]) + self.assertEqual( + val['userid'], text_(base64.b64encode(userid.encode('utf-8'))) + ) + self.assertEqual(val['user_data'], 'userid_type:b64unicode') + + def test_remember_insane_userid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + userid = object() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always', RuntimeWarning) + result = helper.remember(request, userid) + self.assertTrue(str(w[-1].message).startswith('userid is of type')) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + value = values[0] + self.assertTrue('userid' in value.value) + + def test_remember_max_age(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid', max_age=500) + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + + self.assertEqual(values[0]['max-age'], '500') + self.assertTrue(values[0]['expires']) + + def test_remember_str_max_age(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid', max_age='500') + values = self._parseHeaders(result) + self.assertEqual(len(result), 3) + + self.assertEqual(values[0]['max-age'], '500') + self.assertTrue(values[0]['expires']) + + def test_remember_str_max_age_invalid(self): + helper = self._makeOne('secret') + request = self._makeRequest() + self.assertRaises( + ValueError, + helper.remember, + request, + 'userid', + max_age='invalid value', + ) + + def test_remember_tokens(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'other', tokens=('foo', 'bar')) + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[0][1]) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[1][1]) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.assertTrue("/tokens=foo|bar/" in result[2][1]) + + def test_remember_samesite_nondefault(self): + helper = self._makeOne('secret', samesite='Strict') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + cookieval = result[0][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[1][0], 'Set-Cookie') + cookieval = result[1][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[2][0], 'Set-Cookie') + cookieval = result[2][1] + self.assertTrue( + 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + def test_remember_samesite_default(self): + helper = self._makeOne('secret') + request = self._makeRequest() + result = helper.remember(request, 'userid') + self.assertEqual(len(result), 3) + + self.assertEqual(result[0][0], 'Set-Cookie') + cookieval = result[0][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[1][0], 'Set-Cookie') + cookieval = result[1][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + self.assertEqual(result[2][0], 'Set-Cookie') + cookieval = result[2][1] + self.assertTrue( + 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], + cookieval, + ) + + def test_remember_unicode_but_ascii_token(self): + helper = self._makeOne('secret') + request = self._makeRequest() + la = text_(b'foo', 'utf-8') + result = helper.remember(request, 'other', tokens=(la,)) + # tokens must be str type on both Python 2 and 3 + self.assertTrue("/tokens=foo/" in result[0][1]) + + def test_remember_nonascii_token(self): + helper = self._makeOne('secret') + request = self._makeRequest() + la = text_(b'La Pe\xc3\xb1a', 'utf-8') + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=(la,) + ) + + def test_remember_invalid_token_format(self): + helper = self._makeOne('secret') + request = self._makeRequest() + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=('foo bar',) + ) + self.assertRaises( + ValueError, helper.remember, request, 'other', tokens=('1bar',) + ) + + def test_forget(self): + helper = self._makeOne('secret') + request = self._makeRequest() + headers = helper.forget(request) + self.assertEqual(len(headers), 3) + name, value = headers[0] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + name, value = headers[1] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + name, value = headers[2] + self.assertEqual(name, 'Set-Cookie') + self.assertEqual( + value, + 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; ' + 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', + ) + + +class TestAuthTicket(unittest.TestCase): + def _makeOne(self, *arg, **kw): + from pyramid.authentication import AuthTicket + + return AuthTicket(*arg, **kw) + + def test_ctor_with_tokens(self): + ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) + self.assertEqual(ticket.tokens, 'a,b') + + def test_ctor_with_time(self): + ticket = self._makeOne('secret', 'userid', 'ip', time='time') + self.assertEqual(ticket.time, 'time') + + def test_digest(self): + ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) + result = ticket.digest() + self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') + + def test_digest_sha512(self): + ticket = self._makeOne( + 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512' + ) + result = ticket.digest() + self.assertEqual( + result, + '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49' + '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9' + 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278', + ) + + def test_cookie_value(self): + ticket = self._makeOne( + 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b') + ) + result = ticket.cookie_value() + self.assertEqual( + result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' + ) + + def test_ipv4(self): + ticket = self._makeOne( + 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256' + ) + result = ticket.cookie_value() + self.assertEqual( + result, + 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b' + '798400ecdade8d76c530000000auserid!', + ) + + def test_ipv6(self): + ticket = self._makeOne( + 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256' + ) + result = ticket.cookie_value() + self.assertEqual( + result, + 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8' + '5becf8760cd7a2fa4910000000auserid!', + ) + + +class TestBadTicket(unittest.TestCase): + def _makeOne(self, msg, expected=None): + from pyramid.authentication import BadTicket + + return BadTicket(msg, expected) + + def test_it(self): + exc = self._makeOne('msg', expected=True) + self.assertEqual(exc.expected, True) + self.assertTrue(isinstance(exc, Exception)) + + +class Test_parse_ticket(unittest.TestCase): + def _callFUT(self, secret, ticket, ip, hashalg='md5'): + from pyramid.authentication import parse_ticket + + return parse_ticket(secret, ticket, ip, hashalg) + + def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): + from pyramid.authentication import BadTicket + + self.assertRaises( + BadTicket, self._callFUT, secret, ticket, ip, hashalg + ) + + def test_bad_timestamp(self): + ticket = 'x' * 64 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_bad_userid_or_data(self): + ticket = 'x' * 32 + '11111111' + 'x' * 10 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_digest_sig_incorrect(self): + ticket = 'x' * 32 + '11111111' + 'a!b!c' + self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') + + def test_correct_with_user_data(self): + ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') + result = self._callFUT('secret', ticket, '0.0.0.0') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) + + def test_correct_with_user_data_sha512(self): + ticket = text_( + '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1' + '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767' + 'ba8a26d02aaeae56599a0000000auserid!a,b!' + ) + result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) + + def test_ipv4(self): + ticket = text_( + 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd' + 'ade8d76c530000000auserid!' + ) + result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256') + self.assertEqual(result, (10, 'userid', [''], '')) + + def test_ipv6(self): + ticket = text_( + 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760' + 'cd7a2fa4910000000auserid!' + ) + result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256') + self.assertEqual(result, (10, 'userid', [''], '')) + + class TestSessionAuthenticationPolicy(unittest.TestCase): def _getTargetClass(self): from pyramid.authentication import SessionAuthenticationPolicy @@ -990,6 +1911,14 @@ class DummyContext: pass +class DummyCookies(object): + def __init__(self, cookie): + self.cookie = cookie + + def get(self, name): + return self.cookie + + class DummyRequest: domain = 'localhost' @@ -998,6 +1927,10 @@ class DummyRequest: self.session = session or {} self.registry = registry self.callbacks = [] + self.cookies = DummyCookies(cookie) + + def add_response_callback(self, callback): + self.callbacks.append(callback) class DummyWhoPlugin: @@ -1021,3 +1954,68 @@ class DummyCookieHelper: def forget(self, *arg): return [] + + +class DummyAuthTktModule(object): + def __init__( + self, + timestamp=0, + userid='userid', + tokens=(), + user_data='', + parse_raise=False, + hashalg="md5", + ): + self.timestamp = timestamp + self.userid = userid + self.tokens = tokens + self.user_data = user_data + self.parse_raise = parse_raise + self.hashalg = hashalg + + def parse_ticket(secret, value, remote_addr, hashalg): + self.secret = secret + self.value = value + self.remote_addr = remote_addr + if self.parse_raise: + raise self.BadTicket() + return self.timestamp, self.userid, self.tokens, self.user_data + + self.parse_ticket = parse_ticket + + class AuthTicket(object): + def __init__(self, secret, userid, remote_addr, **kw): + self.secret = secret + self.userid = userid + self.remote_addr = remote_addr + self.kw = kw + + def cookie_value(self): + result = { + 'secret': self.secret, + 'userid': self.userid, + 'remote_addr': self.remote_addr, + } + result.update(self.kw) + tokens = result.pop('tokens', None) + if tokens is not None: + tokens = '|'.join(tokens) + result['tokens'] = tokens + items = sorted(result.items()) + new_items = [] + for k, v in items: + if isinstance(v, bytes): + v = text_(v) + new_items.append((k, v)) + result = '/'.join(['%s=%s' % (k, v) for k, v in new_items]) + return result + + self.AuthTicket = AuthTicket + + class BadTicket(Exception): + pass + + +class DummyResponse: + def __init__(self): + self.headerlist = [] diff --git a/tests/test_security.py b/tests/test_security.py index 97aec42c6..f14159156 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -1,9 +1,6 @@ import unittest -import warnings -from http.cookies import SimpleCookie from pyramid import testing -from pyramid.util import text_ class TestAllPermissionsList(unittest.TestCase): @@ -940,1008 +937,3 @@ class TestSessionAuthenticationHelper(unittest.TestCase): result = helper.forget(request) self.assertEqual(request.session.get('userid'), None) self.assertEqual(result, []) - - -class TestAuthTicket(unittest.TestCase): - def _makeOne(self, *arg, **kw): - from pyramid.security import AuthTicket - - return AuthTicket(*arg, **kw) - - def test_ctor_with_tokens(self): - ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) - self.assertEqual(ticket.tokens, 'a,b') - - def test_ctor_with_time(self): - ticket = self._makeOne('secret', 'userid', 'ip', time='time') - self.assertEqual(ticket.time, 'time') - - def test_digest(self): - ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) - result = ticket.digest() - self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') - - def test_digest_sha512(self): - ticket = self._makeOne( - 'secret', 'userid', '0.0.0.0', time=10, hashalg='sha512' - ) - result = ticket.digest() - self.assertEqual( - result, - '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49' - '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9' - 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278', - ) - - def test_cookie_value(self): - ticket = self._makeOne( - 'secret', 'userid', '0.0.0.0', time=10, tokens=('a', 'b') - ) - result = ticket.cookie_value() - self.assertEqual( - result, '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' - ) - - def test_ipv4(self): - ticket = self._makeOne( - 'secret', 'userid', '198.51.100.1', time=10, hashalg='sha256' - ) - result = ticket.cookie_value() - self.assertEqual( - result, - 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b' - '798400ecdade8d76c530000000auserid!', - ) - - def test_ipv6(self): - ticket = self._makeOne( - 'secret', 'userid', '2001:db8::1', time=10, hashalg='sha256' - ) - result = ticket.cookie_value() - self.assertEqual( - result, - 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c8' - '5becf8760cd7a2fa4910000000auserid!', - ) - - -class TestBadTicket(unittest.TestCase): - def _makeOne(self, msg, expected=None): - from pyramid.security import BadTicket - - return BadTicket(msg, expected) - - def test_it(self): - exc = self._makeOne('msg', expected=True) - self.assertEqual(exc.expected, True) - self.assertTrue(isinstance(exc, Exception)) - - -class Test_parse_ticket(unittest.TestCase): - def _callFUT(self, secret, ticket, ip, hashalg='md5'): - from pyramid.security import parse_ticket - - return parse_ticket(secret, ticket, ip, hashalg) - - def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): - from pyramid.security import BadTicket - - self.assertRaises( - BadTicket, self._callFUT, secret, ticket, ip, hashalg - ) - - def test_bad_timestamp(self): - ticket = 'x' * 64 - self._assertRaisesBadTicket('secret', ticket, 'ip') - - def test_bad_userid_or_data(self): - ticket = 'x' * 32 + '11111111' + 'x' * 10 - self._assertRaisesBadTicket('secret', ticket, 'ip') - - def test_digest_sig_incorrect(self): - ticket = 'x' * 32 + '11111111' + 'a!b!c' - self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') - - def test_correct_with_user_data(self): - ticket = text_('66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') - result = self._callFUT('secret', ticket, '0.0.0.0') - self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) - - def test_correct_with_user_data_sha512(self): - ticket = text_( - '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1' - '160cab0ec0e6888faa41eba641a18522b26f19109f3ffafb769767' - 'ba8a26d02aaeae56599a0000000auserid!a,b!' - ) - result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') - self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) - - def test_ipv4(self): - ticket = text_( - 'b3e7156db4f8abde4439c4a6499a0668f9e7ffd7fa27b798400ecd' - 'ade8d76c530000000auserid!' - ) - result = self._callFUT('secret', ticket, '198.51.100.1', 'sha256') - self.assertEqual(result, (10, 'userid', [''], '')) - - def test_ipv6(self): - ticket = text_( - 'd025b601a0f12ca6d008aa35ff3a22b7d8f3d1c1456c85becf8760' - 'cd7a2fa4910000000auserid!' - ) - result = self._callFUT('secret', ticket, '2001:db8::1', 'sha256') - self.assertEqual(result, (10, 'userid', [''], '')) - - -class TestAuthTktCookieHelper(unittest.TestCase): - def _getTargetClass(self): - from pyramid.security import AuthTktCookieHelper - - return AuthTktCookieHelper - - def _makeOne(self, *arg, **kw): - helper = self._getTargetClass()(*arg, **kw) - auth_tkt = DummyAuthTktModule() - helper.auth_tkt = auth_tkt - helper.AuthTicket = auth_tkt.AuthTicket - helper.parse_ticket = auth_tkt.parse_ticket - helper.BadTicket = auth_tkt.BadTicket - return helper - - def _makeRequest(self, cookie=None, ipv6=False): - environ = {'wsgi.version': (1, 0)} - - if ipv6 is False: - environ['REMOTE_ADDR'] = '1.1.1.1' - else: - environ['REMOTE_ADDR'] = '::1' - environ['SERVER_NAME'] = 'localhost' - return DummyRequest(environ, cookie=cookie) - - def _cookieValue(self, cookie): - items = cookie.value.split('/') - D = {} - for item in items: - k, v = item.split('=', 1) - D[k] = v - return D - - def _parseHeaders(self, headers): - return [self._parseHeader(header) for header in headers] - - def _parseHeader(self, header): - cookie = self._parseCookie(header[1]) - return cookie - - def _parseCookie(self, cookie): - cookies = SimpleCookie() - cookies.load(cookie) - return cookies.get('auth_tkt') - - def test_init_cookie_str_reissue_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', reissue_time='invalid value' - ) - - def test_init_cookie_str_timeout_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', timeout='invalid value' - ) - - def test_init_cookie_str_max_age_invalid(self): - self.assertRaises( - ValueError, self._makeOne, 'secret', max_age='invalid value' - ) - - def test_identify_nocookie(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_cookie_value_is_None(self): - helper = self._makeOne('secret') - request = self._makeRequest(None) - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_good_cookie_include_ip(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_include_ipv6(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest('ticket', ipv6=True) - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '::1') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_dont_include_ip(self): - helper = self._makeOne('secret', include_ip=False) - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'userid') - self.assertEqual(result['userdata'], '') - self.assertEqual(result['timestamp'], 0) - self.assertEqual(helper.auth_tkt.value, 'ticket') - self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') - self.assertEqual(helper.auth_tkt.secret, 'secret') - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], '') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_int_useridtype(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = '1' - helper.auth_tkt.user_data = 'userid_type:int' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 1) - self.assertEqual(result['userdata'], 'userid_type:int') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:int') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_nonuseridtype_user_data(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = '1' - helper.auth_tkt.user_data = 'bogus:int' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], '1') - self.assertEqual(result['userdata'], 'bogus:int') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'bogus:int') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_unknown_useridtype(self): - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = 'abc' - helper.auth_tkt.user_data = 'userid_type:unknown' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], 'abc') - self.assertEqual(result['userdata'], 'userid_type:unknown') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:unknown') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_b64str_useridtype(self): - from base64 import b64encode - - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = b64encode(b'encoded').strip() - helper.auth_tkt.user_data = 'userid_type:b64str' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], b'encoded') - self.assertEqual(result['userdata'], 'userid_type:b64str') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64str') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_good_cookie_b64unicode_useridtype(self): - from base64 import b64encode - - helper = self._makeOne('secret', include_ip=False) - helper.auth_tkt.userid = b64encode(b'\xc3\xa9ncoded').strip() - helper.auth_tkt.user_data = 'userid_type:b64unicode' - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(len(result), 4) - self.assertEqual(result['tokens'], ()) - self.assertEqual(result['userid'], text_(b'\xc3\xa9ncoded', 'utf-8')) - self.assertEqual(result['userdata'], 'userid_type:b64unicode') - self.assertEqual(result['timestamp'], 0) - environ = request.environ - self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) - self.assertEqual(environ['REMOTE_USER_DATA'], 'userid_type:b64unicode') - self.assertEqual(environ['AUTH_TYPE'], 'cookie') - - def test_identify_bad_cookie(self): - helper = self._makeOne('secret', include_ip=True) - helper.auth_tkt.parse_raise = True - request = self._makeRequest('ticket') - result = helper.identify(request) - self.assertEqual(result, None) - - def test_identify_cookie_timeout(self): - helper = self._makeOne('secret', timeout=1) - self.assertEqual(helper.timeout, 1) - - def test_identify_cookie_str_timeout(self): - helper = self._makeOne('secret', timeout='1') - self.assertEqual(helper.timeout, 1) - - def test_identify_cookie_timeout_aged(self): - import time - - helper = self._makeOne('secret', timeout=10) - now = time.time() - helper.auth_tkt.timestamp = now - 1 - helper.now = now + 10 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertFalse(result) - - def test_identify_cookie_reissue(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - - def test_identify_cookie_str_reissue(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time='0') - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - helper.auth_tkt.tokens = (text_('a'),) - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - - def test_identify_cookie_reissue_already_reissued_this_request(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - request._authtkt_reissued = True - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 0) - - def test_identify_cookie_reissue_notyet(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=10) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 0) - - def test_identify_cookie_reissue_revoked_by_forget(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - result = helper.forget(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 0) - - def test_identify_cookie_reissue_revoked_by_remember(self): - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - result = helper.remember(request, 'bob') - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](request, response) - self.assertEqual(len(response.headerlist), 0) - - def test_identify_cookie_reissue_with_tokens_default(self): - # see https://github.com/Pylons/pyramid/issues#issue/108 - import time - - helper = self._makeOne('secret', timeout=10, reissue_time=0) - auth_tkt = DummyAuthTktModule(tokens=['']) - helper.auth_tkt = auth_tkt - helper.AuthTicket = auth_tkt.AuthTicket - helper.parse_ticket = auth_tkt.parse_ticket - helper.BadTicket = auth_tkt.BadTicket - now = time.time() - helper.auth_tkt.timestamp = now - helper.now = now + 1 - request = self._makeRequest('bogus') - result = helper.identify(request) - self.assertTrue(result) - self.assertEqual(len(request.callbacks), 1) - response = DummyResponse() - request.callbacks[0](None, response) - self.assertEqual(len(response.headerlist), 3) - self.assertEqual(response.headerlist[0][0], 'Set-Cookie') - self.assertTrue("/tokens=/" in response.headerlist[0][1]) - - def test_remember(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_nondefault_samesite(self): - helper = self._makeOne('secret', samesite='Strict') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Strict')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith( - '; Domain=localhost; Path=/; SameSite=Strict' - ) - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith( - '; Domain=.localhost; Path=/; SameSite=Strict' - ) - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_None_samesite(self): - helper = self._makeOne('secret', samesite=None) - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/')) # no samesite - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue(result[1][1].endswith('; Domain=localhost; Path=/')) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue(result[2][1].endswith('; Domain=.localhost; Path=/')) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_include_ip(self): - helper = self._makeOne('secret', include_ip=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith('; Domain=.localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_path(self): - helper = self._makeOne( - 'secret', include_ip=True, path="/cgi-bin/app.cgi/" - ) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith('; Path=/cgi-bin/app.cgi/; SameSite=Lax') - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith( - '; Domain=localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' - ) - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue( - result[2][1].endswith( - '; Domain=.localhost; Path=/cgi-bin/app.cgi/; SameSite=Lax' - ) - ) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_http_only(self): - helper = self._makeOne('secret', include_ip=True, http_only=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; HttpOnly; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue('; HttpOnly' in result[1][1]) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue('; HttpOnly' in result[2][1]) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_secure(self): - helper = self._makeOne('secret', include_ip=True, secure=True) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue('; secure' in result[0][1]) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue('; secure' in result[1][1]) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue('; secure' in result[2][1]) - self.assertTrue(result[2][1].startswith('auth_tkt=')) - - def test_remember_wild_domain_disabled(self): - helper = self._makeOne('secret', wild_domain=False) - request = self._makeRequest() - result = helper.remember(request, 'other') - self.assertEqual(len(result), 2) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue(result[0][1].endswith('; Path=/; SameSite=Lax')) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue( - result[1][1].endswith('; Domain=localhost; Path=/; SameSite=Lax') - ) - self.assertTrue(result[1][1].startswith('auth_tkt=')) - - def test_remember_parent_domain(self): - helper = self._makeOne('secret', parent_domain=True) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith( - '; Domain=.example.com; Path=/; SameSite=Lax' - ) - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - def test_remember_parent_domain_supercedes_wild_domain(self): - helper = self._makeOne('secret', parent_domain=True, wild_domain=True) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - self.assertTrue( - result[0][1].endswith( - '; Domain=.example.com; Path=/; SameSite=Lax' - ) - ) - - def test_remember_explicit_domain(self): - helper = self._makeOne('secret', domain='pyramid.bazinga') - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue( - result[0][1].endswith( - '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' - ) - ) - self.assertTrue(result[0][1].startswith('auth_tkt=')) - - def test_remember_domain_supercedes_parent_and_wild_domain(self): - helper = self._makeOne( - 'secret', - domain='pyramid.bazinga', - parent_domain=True, - wild_domain=True, - ) - request = self._makeRequest() - request.domain = 'www.example.com' - result = helper.remember(request, 'other') - self.assertEqual(len(result), 1) - self.assertTrue( - result[0][1].endswith( - '; Domain=pyramid.bazinga; Path=/; SameSite=Lax' - ) - ) - - def test_remember_binary_userid(self): - import base64 - - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, b'userid') - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual( - val['userid'], text_(base64.b64encode(b'userid').strip()) - ) - self.assertEqual(val['user_data'], 'userid_type:b64str') - - def test_remember_int_userid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 1) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual(val['userid'], '1') - self.assertEqual(val['user_data'], 'userid_type:int') - - def test_remember_unicode_userid(self): - import base64 - - helper = self._makeOne('secret') - request = self._makeRequest() - userid = text_(b'\xc2\xa9', 'utf-8') - result = helper.remember(request, userid) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - val = self._cookieValue(values[0]) - self.assertEqual( - val['userid'], text_(base64.b64encode(userid.encode('utf-8'))) - ) - self.assertEqual(val['user_data'], 'userid_type:b64unicode') - - def test_remember_insane_userid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - userid = object() - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', RuntimeWarning) - result = helper.remember(request, userid) - self.assertTrue(str(w[-1].message).startswith('userid is of type')) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - value = values[0] - self.assertTrue('userid' in value.value) - - def test_remember_max_age(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid', max_age=500) - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - - self.assertEqual(values[0]['max-age'], '500') - self.assertTrue(values[0]['expires']) - - def test_remember_str_max_age(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid', max_age='500') - values = self._parseHeaders(result) - self.assertEqual(len(result), 3) - - self.assertEqual(values[0]['max-age'], '500') - self.assertTrue(values[0]['expires']) - - def test_remember_str_max_age_invalid(self): - helper = self._makeOne('secret') - request = self._makeRequest() - self.assertRaises( - ValueError, - helper.remember, - request, - 'userid', - max_age='invalid value', - ) - - def test_remember_tokens(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'other', tokens=('foo', 'bar')) - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[0][1]) - - self.assertEqual(result[1][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[1][1]) - - self.assertEqual(result[2][0], 'Set-Cookie') - self.assertTrue("/tokens=foo|bar/" in result[2][1]) - - def test_remember_samesite_nondefault(self): - helper = self._makeOne('secret', samesite='Strict') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - cookieval = result[0][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[1][0], 'Set-Cookie') - cookieval = result[1][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[2][0], 'Set-Cookie') - cookieval = result[2][1] - self.assertTrue( - 'SameSite=Strict' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - def test_remember_samesite_default(self): - helper = self._makeOne('secret') - request = self._makeRequest() - result = helper.remember(request, 'userid') - self.assertEqual(len(result), 3) - - self.assertEqual(result[0][0], 'Set-Cookie') - cookieval = result[0][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[1][0], 'Set-Cookie') - cookieval = result[1][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - self.assertEqual(result[2][0], 'Set-Cookie') - cookieval = result[2][1] - self.assertTrue( - 'SameSite=Lax' in [x.strip() for x in cookieval.split(';')], - cookieval, - ) - - def test_remember_unicode_but_ascii_token(self): - helper = self._makeOne('secret') - request = self._makeRequest() - la = text_(b'foo', 'utf-8') - result = helper.remember(request, 'other', tokens=(la,)) - # tokens must be str type on both Python 2 and 3 - self.assertTrue("/tokens=foo/" in result[0][1]) - - def test_remember_nonascii_token(self): - helper = self._makeOne('secret') - request = self._makeRequest() - la = text_(b'La Pe\xc3\xb1a', 'utf-8') - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=(la,) - ) - - def test_remember_invalid_token_format(self): - helper = self._makeOne('secret') - request = self._makeRequest() - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=('foo bar',) - ) - self.assertRaises( - ValueError, helper.remember, request, 'other', tokens=('1bar',) - ) - - def test_forget(self): - helper = self._makeOne('secret') - request = self._makeRequest() - headers = helper.forget(request) - self.assertEqual(len(headers), 3) - name, value = headers[0] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - name, value = headers[1] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Domain=localhost; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - name, value = headers[2] - self.assertEqual(name, 'Set-Cookie') - self.assertEqual( - value, - 'auth_tkt=; Domain=.localhost; Max-Age=0; Path=/; ' - 'expires=Wed, 31-Dec-97 23:59:59 GMT; SameSite=Lax', - ) - - -class DummyAuthTktModule(object): - def __init__( - self, - timestamp=0, - userid='userid', - tokens=(), - user_data='', - parse_raise=False, - hashalg="md5", - ): - self.timestamp = timestamp - self.userid = userid - self.tokens = tokens - self.user_data = user_data - self.parse_raise = parse_raise - self.hashalg = hashalg - - def parse_ticket(secret, value, remote_addr, hashalg): - self.secret = secret - self.value = value - self.remote_addr = remote_addr - if self.parse_raise: - raise self.BadTicket() - return self.timestamp, self.userid, self.tokens, self.user_data - - self.parse_ticket = parse_ticket - - class AuthTicket(object): - def __init__(self, secret, userid, remote_addr, **kw): - self.secret = secret - self.userid = userid - self.remote_addr = remote_addr - self.kw = kw - - def cookie_value(self): - result = { - 'secret': self.secret, - 'userid': self.userid, - 'remote_addr': self.remote_addr, - } - result.update(self.kw) - tokens = result.pop('tokens', None) - if tokens is not None: - tokens = '|'.join(tokens) - result['tokens'] = tokens - items = sorted(result.items()) - new_items = [] - for k, v in items: - if isinstance(v, bytes): - v = text_(v) - new_items.append((k, v)) - result = '/'.join(['%s=%s' % (k, v) for k, v in new_items]) - return result - - self.AuthTicket = AuthTicket - - class BadTicket(Exception): - pass - - -class DummyCookies(object): - def __init__(self, cookie): - self.cookie = cookie - - def get(self, name): - return self.cookie - - -class DummyRequest: - domain = 'localhost' - - def __init__(self, environ=None, session=None, registry=None, cookie=None): - self.environ = environ or {} - self.session = session or {} - self.registry = registry - self.callbacks = [] - self.cookies = DummyCookies(cookie) - - def add_response_callback(self, callback): - self.callbacks.append(callback) - - -class DummyResponse: - def __init__(self): - self.headerlist = [] |
