summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/pyramid/authentication.py444
-rw-r--r--src/pyramid/security.py558
2 files changed, 432 insertions, 570 deletions
diff --git a/src/pyramid/authentication.py b/src/pyramid/authentication.py
index 12f31e5dd..21cfc0c0e 100644
--- a/src/pyramid/authentication.py
+++ b/src/pyramid/authentication.py
@@ -1,23 +1,26 @@
import binascii
+from codecs import utf_8_decode
+from codecs import utf_8_encode
from collections import namedtuple
+import hashlib
+import base64
+import re
+import time as time_mod
+from urllib.parse import quote, unquote
+import warnings
from zope.interface import implementer
+from webob.cookies import CookieProfile
+
from pyramid.interfaces import IAuthenticationPolicy, IDebugLogger
-from pyramid.security import Authenticated, Everyone, AuthTktCookieHelper
-
-# bw compat after moving AuthTktHelper and friends to pyramid.security
-from pyramid.security import ( # noqa
- VALID_TOKEN,
- b64encode,
- b64decode,
- AuthTicket,
- BadTicket,
- parse_ticket,
- calculate_digest,
- encode_ip_timestamp,
-)
+from pyramid.security import Authenticated, Everyone
+
+from pyramid.util import strings_differ, bytes_, ascii_, text_
+from pyramid.util import SimpleSerializer
+
+VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
class CallbackAuthenticationPolicy(object):
@@ -648,6 +651,421 @@ class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
return self.cookie.forget(request)
+def b64encode(v):
+ return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'')
+
+
+def b64decode(v):
+ return base64.b64decode(bytes_(v))
+
+
+# this class licensed under the MIT license (stolen from Paste)
+class AuthTicket(object):
+ """
+ This class represents an authentication token. You must pass in
+ the shared secret, the userid, and the IP address. Optionally you
+ can include tokens (a list of strings, representing role names),
+ 'user_data', which is arbitrary data available for your own use in
+ later scripts. Lastly, you can override the cookie name and
+ timestamp.
+
+ Once you provide all the arguments, use .cookie_value() to
+ generate the appropriate authentication ticket.
+
+ Usage::
+
+ token = AuthTicket('sharedsecret', 'username',
+ os.environ['REMOTE_ADDR'], tokens=['admin'])
+ val = token.cookie_value()
+
+ """
+
+ def __init__(
+ self,
+ secret,
+ userid,
+ ip,
+ tokens=(),
+ user_data='',
+ time=None,
+ cookie_name='auth_tkt',
+ secure=False,
+ hashalg='md5',
+ ):
+ self.secret = secret
+ self.userid = userid
+ self.ip = ip
+ self.tokens = ','.join(tokens)
+ self.user_data = user_data
+ if time is None:
+ self.time = time_mod.time()
+ else:
+ self.time = time
+ self.cookie_name = cookie_name
+ self.secure = secure
+ self.hashalg = hashalg
+
+ def digest(self):
+ return calculate_digest(
+ self.ip,
+ self.time,
+ self.secret,
+ self.userid,
+ self.tokens,
+ self.user_data,
+ self.hashalg,
+ )
+
+ def cookie_value(self):
+ v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid))
+ if self.tokens:
+ v += self.tokens + '!'
+ v += self.user_data
+ return v
+
+
+# this class licensed under the MIT license (stolen from Paste)
+class BadTicket(Exception):
+ """
+ Exception raised when a ticket can't be parsed. If we get far enough to
+ determine what the expected digest should have been, expected is set.
+ This should not be shown by default, but can be useful for debugging.
+ """
+
+ def __init__(self, msg, expected=None):
+ self.expected = expected
+ Exception.__init__(self, msg)
+
+
+# this function licensed under the MIT license (stolen from Paste)
+def parse_ticket(secret, ticket, ip, hashalg='md5'):
+ """
+ Parse the ticket, returning (timestamp, userid, tokens, user_data).
+
+ If the ticket cannot be parsed, a ``BadTicket`` exception will be raised
+ with an explanation.
+ """
+ ticket = text_(ticket).strip('"')
+ digest_size = hashlib.new(hashalg).digest_size * 2
+ digest = ticket[:digest_size]
+ try:
+ timestamp = int(ticket[digest_size : digest_size + 8], 16)
+ except ValueError as e:
+ raise BadTicket('Timestamp is not a hex integer: %s' % e)
+ try:
+ userid, data = ticket[digest_size + 8 :].split('!', 1)
+ except ValueError:
+ raise BadTicket('userid is not followed by !')
+ userid = unquote(userid)
+ if '!' in data:
+ tokens, user_data = data.split('!', 1)
+ else: # pragma: no cover (never generated)
+ # @@: Is this the right order?
+ tokens = ''
+ user_data = data
+
+ expected = calculate_digest(
+ ip, timestamp, secret, userid, tokens, user_data, hashalg
+ )
+
+ # Avoid timing attacks (see
+ # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
+ if strings_differ(expected, digest):
+ raise BadTicket(
+ 'Digest signature is not correct', expected=(expected, digest)
+ )
+
+ tokens = tokens.split(',')
+
+ return (timestamp, userid, tokens, user_data)
+
+
+# this function licensed under the MIT license (stolen from Paste)
+def calculate_digest(
+ ip, timestamp, secret, userid, tokens, user_data, hashalg='md5'
+):
+ secret = bytes_(secret, 'utf-8')
+ userid = bytes_(userid, 'utf-8')
+ tokens = bytes_(tokens, 'utf-8')
+ user_data = bytes_(user_data, 'utf-8')
+ hash_obj = hashlib.new(hashalg)
+
+ # Check to see if this is an IPv6 address
+ if ':' in ip:
+ ip_timestamp = ip + str(int(timestamp))
+ ip_timestamp = bytes_(ip_timestamp)
+ else:
+ # encode_ip_timestamp not required, left in for backwards compatibility
+ ip_timestamp = encode_ip_timestamp(ip, timestamp)
+
+ hash_obj.update(
+ ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data
+ )
+ digest = hash_obj.hexdigest()
+ hash_obj2 = hashlib.new(hashalg)
+ hash_obj2.update(bytes_(digest) + secret)
+ return hash_obj2.hexdigest()
+
+
+# this function licensed under the MIT license (stolen from Paste)
+def encode_ip_timestamp(ip, timestamp):
+ ip_chars = ''.join(map(chr, map(int, ip.split('.'))))
+ t = int(timestamp)
+ ts = (
+ (t & 0xFF000000) >> 24,
+ (t & 0xFF0000) >> 16,
+ (t & 0xFF00) >> 8,
+ t & 0xFF,
+ )
+ ts_chars = ''.join(map(chr, ts))
+ return bytes_(ip_chars + ts_chars)
+
+
+class AuthTktCookieHelper(object):
+ """
+ A helper class for use in third-party authentication policy
+ implementations. See
+ :class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the
+ meanings of the constructor arguments.
+ """
+
+ parse_ticket = staticmethod(parse_ticket) # for tests
+ AuthTicket = AuthTicket # for tests
+ BadTicket = BadTicket # for tests
+ now = None # for tests
+
+ userid_type_decoders = {
+ 'int': int,
+ 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies
+ 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
+ 'b64str': lambda x: b64decode(x),
+ }
+
+ userid_type_encoders = {
+ int: ('int', str),
+ str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
+ bytes: ('b64str', lambda x: b64encode(x)),
+ }
+
+ def __init__(
+ self,
+ secret,
+ cookie_name='auth_tkt',
+ secure=False,
+ include_ip=False,
+ timeout=None,
+ reissue_time=None,
+ max_age=None,
+ http_only=False,
+ path="/",
+ wild_domain=True,
+ hashalg='md5',
+ parent_domain=False,
+ domain=None,
+ samesite='Lax',
+ ):
+ self.cookie_profile = CookieProfile(
+ cookie_name=cookie_name,
+ secure=secure,
+ max_age=max_age,
+ httponly=http_only,
+ path=path,
+ serializer=SimpleSerializer(),
+ samesite=samesite,
+ )
+
+ self.secret = secret
+ self.cookie_name = cookie_name
+ self.secure = secure
+ self.include_ip = include_ip
+ self.timeout = timeout if timeout is None else int(timeout)
+ self.reissue_time = (
+ reissue_time if reissue_time is None else int(reissue_time)
+ )
+ self.max_age = max_age if max_age is None else int(max_age)
+ self.wild_domain = wild_domain
+ self.parent_domain = parent_domain
+ self.domain = domain
+ self.hashalg = hashalg
+
+ def _get_cookies(self, request, value, max_age=None):
+ cur_domain = request.domain
+
+ domains = []
+ if self.domain:
+ domains.append(self.domain)
+ else:
+ if self.parent_domain and cur_domain.count('.') > 1:
+ domains.append('.' + cur_domain.split('.', 1)[1])
+ else:
+ domains.append(None)
+ domains.append(cur_domain)
+ if self.wild_domain:
+ domains.append('.' + cur_domain)
+
+ profile = self.cookie_profile(request)
+
+ kw = {}
+ kw['domains'] = domains
+ if max_age is not None:
+ kw['max_age'] = max_age
+
+ headers = profile.get_headers(value, **kw)
+ return headers
+
+ def identify(self, request):
+ """ Return a dictionary with authentication information, or ``None``
+ if no valid auth_tkt is attached to ``request``"""
+ environ = request.environ
+ cookie = request.cookies.get(self.cookie_name)
+
+ if cookie is None:
+ return None
+
+ if self.include_ip:
+ remote_addr = environ['REMOTE_ADDR']
+ else:
+ remote_addr = '0.0.0.0'
+
+ try:
+ timestamp, userid, tokens, user_data = self.parse_ticket(
+ self.secret, cookie, remote_addr, self.hashalg
+ )
+ except self.BadTicket:
+ return None
+
+ now = self.now # service tests
+
+ if now is None:
+ now = time_mod.time()
+
+ if self.timeout and ((timestamp + self.timeout) < now):
+ # the auth_tkt data has expired
+ return None
+
+ userid_typename = 'userid_type:'
+ user_data_info = user_data.split('|')
+ for datum in filter(None, user_data_info):
+ if datum.startswith(userid_typename):
+ userid_type = datum[len(userid_typename) :]
+ decoder = self.userid_type_decoders.get(userid_type)
+ if decoder:
+ userid = decoder(userid)
+
+ reissue = self.reissue_time is not None
+
+ if reissue and not hasattr(request, '_authtkt_reissued'):
+ if (now - timestamp) > self.reissue_time:
+ # See https://github.com/Pylons/pyramid/issues#issue/108
+ tokens = list(filter(None, tokens))
+ headers = self.remember(
+ request, userid, max_age=self.max_age, tokens=tokens
+ )
+
+ def reissue_authtkt(request, response):
+ if not hasattr(request, '_authtkt_reissue_revoked'):
+ for k, v in headers:
+ response.headerlist.append((k, v))
+
+ request.add_response_callback(reissue_authtkt)
+ request._authtkt_reissued = True
+
+ environ['REMOTE_USER_TOKENS'] = tokens
+ environ['REMOTE_USER_DATA'] = user_data
+ environ['AUTH_TYPE'] = 'cookie'
+
+ identity = {}
+ identity['timestamp'] = timestamp
+ identity['userid'] = userid
+ identity['tokens'] = tokens
+ identity['userdata'] = user_data
+ return identity
+
+ def forget(self, request):
+ """ Return a set of expires Set-Cookie headers, which will destroy
+ any existing auth_tkt cookie when attached to a response"""
+ request._authtkt_reissue_revoked = True
+ return self._get_cookies(request, None)
+
+ def remember(self, request, userid, max_age=None, tokens=()):
+ """ Return a set of Set-Cookie headers; when set into a response,
+ these headers will represent a valid authentication ticket.
+
+ ``max_age``
+ The max age of the auth_tkt cookie, in seconds. When this value is
+ set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
+ allowing the auth_tkt cookie to last between browser sessions. If
+ this value is ``None``, the ``max_age`` value provided to the
+ helper itself will be used as the ``max_age`` value. Default:
+ ``None``.
+
+ ``tokens``
+ A sequence of strings that will be placed into the auth_tkt tokens
+ field. Each string in the sequence must be of the Python ``str``
+ type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
+ Tokens are available in the returned identity when an auth_tkt is
+ found in the request and unpacked. Default: ``()``.
+ """
+ max_age = self.max_age if max_age is None else int(max_age)
+
+ environ = request.environ
+
+ if self.include_ip:
+ remote_addr = environ['REMOTE_ADDR']
+ else:
+ remote_addr = '0.0.0.0'
+
+ user_data = ''
+
+ encoding_data = self.userid_type_encoders.get(type(userid))
+
+ if encoding_data:
+ encoding, encoder = encoding_data
+ else:
+ warnings.warn(
+ "userid is of type {}, and is not supported by the "
+ "AuthTktAuthenticationPolicy. Explicitly converting to string "
+ "and storing as base64. Subsequent requests will receive a "
+ "string as the userid, it will not be decoded back to the "
+ "type provided.".format(type(userid)),
+ RuntimeWarning,
+ )
+ encoding, encoder = self.userid_type_encoders.get(str)
+ userid = str(userid)
+
+ userid = encoder(userid)
+ user_data = 'userid_type:%s' % encoding
+
+ new_tokens = []
+ for token in tokens:
+ if isinstance(token, str):
+ try:
+ token = ascii_(token)
+ except UnicodeEncodeError:
+ raise ValueError("Invalid token %r" % (token,))
+ if not (isinstance(token, str) and VALID_TOKEN.match(token)):
+ raise ValueError("Invalid token %r" % (token,))
+ new_tokens.append(token)
+ tokens = tuple(new_tokens)
+
+ if hasattr(request, '_authtkt_reissued'):
+ request._authtkt_reissue_revoked = True
+
+ ticket = self.AuthTicket(
+ self.secret,
+ userid,
+ remote_addr,
+ tokens=tokens,
+ user_data=user_data,
+ cookie_name=self.cookie_name,
+ secure=self.secure,
+ hashalg=self.hashalg,
+ )
+
+ cookie_value = ticket.cookie_value()
+ return self._get_cookies(request, cookie_value, max_age)
+
+
@implementer(IAuthenticationPolicy)
class SessionAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` authentication policy which gets its data from the
diff --git a/src/pyramid/security.py b/src/pyramid/security.py
index a55320ce6..dda61ef27 100644
--- a/src/pyramid/security.py
+++ b/src/pyramid/security.py
@@ -1,14 +1,3 @@
-from codecs import utf_8_decode
-from codecs import utf_8_encode
-import hashlib
-import base64
-import time as time_mod
-from urllib.parse import quote, unquote
-import warnings
-import re
-
-from webob.cookies import CookieProfile
-
from zope.interface import implementer, providedBy
from pyramid.interfaces import (
@@ -22,14 +11,10 @@ from pyramid.interfaces import (
from pyramid.location import lineage
-from pyramid.util import is_nonstr_iter, strings_differ, bytes_, ascii_, text_
-
-from pyramid.util import SimpleSerializer
+from pyramid.util import is_nonstr_iter
from pyramid.threadlocal import get_current_registry
-VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
-
Everyone = 'system.Everyone'
Authenticated = 'system.Authenticated'
Allow = 'Allow'
@@ -586,544 +571,3 @@ class SessionAuthenticationHelper:
def identify(self, request):
return request.session.get(self.userid_key)
-
-
-def b64encode(v):
- return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'')
-
-
-def b64decode(v):
- return base64.b64decode(bytes_(v))
-
-
-# this class licensed under the MIT license (stolen from Paste)
-class AuthTicket(object):
- """
- This class represents an authentication token. You must pass in
- the shared secret, the userid, and the IP address. Optionally you
- can include tokens (a list of strings, representing role names),
- 'user_data', which is arbitrary data available for your own use in
- later scripts. Lastly, you can override the cookie name and
- timestamp.
-
- Once you provide all the arguments, use .cookie_value() to
- generate the appropriate authentication ticket.
-
- Usage::
-
- token = AuthTicket('sharedsecret', 'username',
- os.environ['REMOTE_ADDR'], tokens=['admin'])
- val = token.cookie_value()
-
- """
-
- def __init__(
- self,
- secret,
- userid,
- ip,
- tokens=(),
- user_data='',
- time=None,
- cookie_name='auth_tkt',
- secure=False,
- hashalg='md5',
- ):
- self.secret = secret
- self.userid = userid
- self.ip = ip
- self.tokens = ','.join(tokens)
- self.user_data = user_data
- if time is None:
- self.time = time_mod.time()
- else:
- self.time = time
- self.cookie_name = cookie_name
- self.secure = secure
- self.hashalg = hashalg
-
- def digest(self):
- return calculate_digest(
- self.ip,
- self.time,
- self.secret,
- self.userid,
- self.tokens,
- self.user_data,
- self.hashalg,
- )
-
- def cookie_value(self):
- v = '%s%08x%s!' % (self.digest(), int(self.time), quote(self.userid))
- if self.tokens:
- v += self.tokens + '!'
- v += self.user_data
- return v
-
-
-# this class licensed under the MIT license (stolen from Paste)
-class BadTicket(Exception):
- """
- Exception raised when a ticket can't be parsed. If we get far enough to
- determine what the expected digest should have been, expected is set.
- This should not be shown by default, but can be useful for debugging.
- """
-
- def __init__(self, msg, expected=None):
- self.expected = expected
- Exception.__init__(self, msg)
-
-
-# this function licensed under the MIT license (stolen from Paste)
-def parse_ticket(secret, ticket, ip, hashalg='md5'):
- """
- Parse the ticket, returning (timestamp, userid, tokens, user_data).
-
- If the ticket cannot be parsed, a ``BadTicket`` exception will be raised
- with an explanation.
- """
- ticket = text_(ticket).strip('"')
- digest_size = hashlib.new(hashalg).digest_size * 2
- digest = ticket[:digest_size]
- try:
- timestamp = int(ticket[digest_size : digest_size + 8], 16)
- except ValueError as e:
- raise BadTicket('Timestamp is not a hex integer: %s' % e)
- try:
- userid, data = ticket[digest_size + 8 :].split('!', 1)
- except ValueError:
- raise BadTicket('userid is not followed by !')
- userid = unquote(userid)
- if '!' in data:
- tokens, user_data = data.split('!', 1)
- else: # pragma: no cover (never generated)
- # @@: Is this the right order?
- tokens = ''
- user_data = data
-
- expected = calculate_digest(
- ip, timestamp, secret, userid, tokens, user_data, hashalg
- )
-
- # Avoid timing attacks (see
- # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
- if strings_differ(expected, digest):
- raise BadTicket(
- 'Digest signature is not correct', expected=(expected, digest)
- )
-
- tokens = tokens.split(',')
-
- return (timestamp, userid, tokens, user_data)
-
-
-# this function licensed under the MIT license (stolen from Paste)
-def calculate_digest(
- ip, timestamp, secret, userid, tokens, user_data, hashalg='md5'
-):
- secret = bytes_(secret, 'utf-8')
- userid = bytes_(userid, 'utf-8')
- tokens = bytes_(tokens, 'utf-8')
- user_data = bytes_(user_data, 'utf-8')
- hash_obj = hashlib.new(hashalg)
-
- # Check to see if this is an IPv6 address
- if ':' in ip:
- ip_timestamp = ip + str(int(timestamp))
- ip_timestamp = bytes_(ip_timestamp)
- else:
- # encode_ip_timestamp not required, left in for backwards compatibility
- ip_timestamp = encode_ip_timestamp(ip, timestamp)
-
- hash_obj.update(
- ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data
- )
- digest = hash_obj.hexdigest()
- hash_obj2 = hashlib.new(hashalg)
- hash_obj2.update(bytes_(digest) + secret)
- return hash_obj2.hexdigest()
-
-
-# this function licensed under the MIT license (stolen from Paste)
-def encode_ip_timestamp(ip, timestamp):
- ip_chars = ''.join(map(chr, map(int, ip.split('.'))))
- t = int(timestamp)
- ts = (
- (t & 0xFF000000) >> 24,
- (t & 0xFF0000) >> 16,
- (t & 0xFF00) >> 8,
- t & 0xFF,
- )
- ts_chars = ''.join(map(chr, ts))
- return bytes_(ip_chars + ts_chars)
-
-
-class AuthTktCookieHelper:
- """
- A helper class used for constructing a :term:`security policy` with stores
- the user identity in a signed cookie.
-
- Constructor Arguments
-
- ``secret``
-
- The secret (a string) used for auth_tkt cookie signing. This value
- should be unique across all values provided to Pyramid for various
- subsystem secrets (see :ref:`admonishment_against_secret_sharing`).
- Required.
-
- ``cookie_name``
-
- Default: ``auth_tkt``. The cookie name used
- (string). Optional.
-
- ``secure``
-
- Default: ``False``. Only send the cookie back over a secure
- conn. Optional.
-
- ``include_ip``
-
- Default: ``False``. Make the requesting IP address part of
- the authentication data in the cookie. Optional.
-
- For IPv6 this option is not recommended. The ``mod_auth_tkt``
- specification does not specify how to handle IPv6 addresses, so using
- this option in combination with IPv6 addresses may cause an
- incompatible cookie. It ties the authentication ticket to that
- individual's IPv6 address.
-
- ``timeout``
-
- Default: ``None``. Maximum number of seconds which a newly
- issued ticket will be considered valid. After this amount of
- time, the ticket will expire (effectively logging the user
- out). If this value is ``None``, the ticket never expires.
- Optional.
-
- ``reissue_time``
-
- Default: ``None``. If this parameter is set, it represents the number
- of seconds that must pass before an authentication token cookie is
- automatically reissued as the result of a request which requires
- authentication. The duration is measured as the number of seconds
- since the last auth_tkt cookie was issued and 'now'. If this value is
- ``0``, a new ticket cookie will be reissued on every request which
- requires authentication.
-
- A good rule of thumb: if you want auto-expired cookies based on
- inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
- ``reissue_time`` value to perhaps a tenth of the ``timeout`` value
- (120 or 2 mins). It's nonsensical to set the ``timeout`` value lower
- than the ``reissue_time`` value, as the ticket will never be reissued
- if so. However, such a configuration is not explicitly prevented.
-
- Optional.
-
- ``max_age``
-
- Default: ``None``. The max age of the auth_tkt cookie, in
- seconds. This differs from ``timeout`` inasmuch as ``timeout``
- represents the lifetime of the ticket contained in the cookie,
- while this value represents the lifetime of the cookie itself.
- When this value is set, the cookie's ``Max-Age`` and
- ``Expires`` settings will be set, allowing the auth_tkt cookie
- to last between browser sessions. It is typically nonsensical
- to set this to a value that is lower than ``timeout`` or
- ``reissue_time``, although it is not explicitly prevented.
- Optional.
-
- ``path``
-
- Default: ``/``. The path for which the auth_tkt cookie is valid.
- May be desirable if the application only serves part of a domain.
- Optional.
-
- ``http_only``
-
- Default: ``False``. Hide cookie from JavaScript by setting the
- HttpOnly flag. Not honored by all browsers.
- Optional.
-
- ``wild_domain``
-
- Default: ``True``. An auth_tkt cookie will be generated for the
- wildcard domain. If your site is hosted as ``example.com`` this
- will make the cookie available for sites underneath ``example.com``
- such as ``www.example.com``.
- Optional.
-
- ``parent_domain``
-
- Default: ``False``. An auth_tkt cookie will be generated for the
- parent domain of the current site. For example if your site is
- hosted under ``www.example.com`` a cookie will be generated for
- ``.example.com``. This can be useful if you have multiple sites
- sharing the same domain. This option supercedes the ``wild_domain``
- option.
- Optional.
-
- ``domain``
-
- Default: ``None``. If provided the auth_tkt cookie will only be
- set for this domain. This option is not compatible with ``wild_domain``
- and ``parent_domain``.
- Optional.
-
- ``hashalg``
-
- Default: ``sha512`` (the literal string).
-
- Any hash algorithm supported by Python's ``hashlib.new()`` function
- can be used as the ``hashalg``.
-
- Cookies generated by different instances of AuthTktAuthenticationPolicy
- using different ``hashalg`` options are not compatible. Switching the
- ``hashalg`` will imply that all existing users with a valid cookie will
- be required to re-login.
-
- Optional.
-
- ``samesite``
-
- Default: ``'Lax'``. The 'samesite' option of the session cookie. Set
- the value to ``None`` to turn off the samesite option.
-
- This option is available as of :app:`Pyramid` 1.10.
- """
-
- parse_ticket = staticmethod(parse_ticket) # for tests
- AuthTicket = AuthTicket # for tests
- BadTicket = BadTicket # for tests
- now = None # for tests
-
- userid_type_decoders = {
- 'int': int,
- 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies
- 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
- 'b64str': lambda x: b64decode(x),
- }
-
- userid_type_encoders = {
- int: ('int', str),
- str: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
- bytes: ('b64str', lambda x: b64encode(x)),
- }
-
- def __init__(
- self,
- secret,
- cookie_name='auth_tkt',
- secure=False,
- include_ip=False,
- timeout=None,
- reissue_time=None,
- max_age=None,
- http_only=False,
- path="/",
- wild_domain=True,
- hashalg='md5',
- parent_domain=False,
- domain=None,
- samesite='Lax',
- ):
- self.cookie_profile = CookieProfile(
- cookie_name=cookie_name,
- secure=secure,
- max_age=max_age,
- httponly=http_only,
- path=path,
- serializer=SimpleSerializer(),
- samesite=samesite,
- )
-
- self.secret = secret
- self.cookie_name = cookie_name
- self.secure = secure
- self.include_ip = include_ip
- self.timeout = timeout if timeout is None else int(timeout)
- self.reissue_time = (
- reissue_time if reissue_time is None else int(reissue_time)
- )
- self.max_age = max_age if max_age is None else int(max_age)
- self.wild_domain = wild_domain
- self.parent_domain = parent_domain
- self.domain = domain
- self.hashalg = hashalg
-
- def _get_cookies(self, request, value, max_age=None):
- cur_domain = request.domain
-
- domains = []
- if self.domain:
- domains.append(self.domain)
- else:
- if self.parent_domain and cur_domain.count('.') > 1:
- domains.append('.' + cur_domain.split('.', 1)[1])
- else:
- domains.append(None)
- domains.append(cur_domain)
- if self.wild_domain:
- domains.append('.' + cur_domain)
-
- profile = self.cookie_profile(request)
-
- kw = {}
- kw['domains'] = domains
- if max_age is not None:
- kw['max_age'] = max_age
-
- headers = profile.get_headers(value, **kw)
- return headers
-
- def identify(self, request):
- """ Return a dictionary with authentication information, or ``None``
- if no valid auth_tkt is attached to ``request``"""
- environ = request.environ
- cookie = request.cookies.get(self.cookie_name)
-
- if cookie is None:
- return None
-
- if self.include_ip:
- remote_addr = environ['REMOTE_ADDR']
- else:
- remote_addr = '0.0.0.0'
-
- try:
- timestamp, userid, tokens, user_data = self.parse_ticket(
- self.secret, cookie, remote_addr, self.hashalg
- )
- except self.BadTicket:
- return None
-
- now = self.now # service tests
-
- if now is None:
- now = time_mod.time()
-
- if self.timeout and ((timestamp + self.timeout) < now):
- # the auth_tkt data has expired
- return None
-
- userid_typename = 'userid_type:'
- user_data_info = user_data.split('|')
- for datum in filter(None, user_data_info):
- if datum.startswith(userid_typename):
- userid_type = datum[len(userid_typename) :]
- decoder = self.userid_type_decoders.get(userid_type)
- if decoder:
- userid = decoder(userid)
-
- reissue = self.reissue_time is not None
-
- if reissue and not hasattr(request, '_authtkt_reissued'):
- if (now - timestamp) > self.reissue_time:
- # See https://github.com/Pylons/pyramid/issues#issue/108
- tokens = list(filter(None, tokens))
- headers = self.remember(
- request, userid, max_age=self.max_age, tokens=tokens
- )
-
- def reissue_authtkt(request, response):
- if not hasattr(request, '_authtkt_reissue_revoked'):
- for k, v in headers:
- response.headerlist.append((k, v))
-
- request.add_response_callback(reissue_authtkt)
- request._authtkt_reissued = True
-
- environ['REMOTE_USER_TOKENS'] = tokens
- environ['REMOTE_USER_DATA'] = user_data
- environ['AUTH_TYPE'] = 'cookie'
-
- identity = {}
- identity['timestamp'] = timestamp
- identity['userid'] = userid
- identity['tokens'] = tokens
- identity['userdata'] = user_data
- return identity
-
- def forget(self, request):
- """ Return a set of expires Set-Cookie headers, which will destroy
- any existing auth_tkt cookie when attached to a response"""
- request._authtkt_reissue_revoked = True
- return self._get_cookies(request, None)
-
- def remember(self, request, userid, max_age=None, tokens=()):
- """ Return a set of Set-Cookie headers; when set into a response,
- these headers will represent a valid authentication ticket.
-
- ``max_age``
- The max age of the auth_tkt cookie, in seconds. When this value is
- set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
- allowing the auth_tkt cookie to last between browser sessions. If
- this value is ``None``, the ``max_age`` value provided to the
- helper itself will be used as the ``max_age`` value. Default:
- ``None``.
-
- ``tokens``
- A sequence of strings that will be placed into the auth_tkt tokens
- field. Each string in the sequence must be of the Python ``str``
- type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
- Tokens are available in the returned identity when an auth_tkt is
- found in the request and unpacked. Default: ``()``.
- """
- max_age = self.max_age if max_age is None else int(max_age)
-
- environ = request.environ
-
- if self.include_ip:
- remote_addr = environ['REMOTE_ADDR']
- else:
- remote_addr = '0.0.0.0'
-
- user_data = ''
-
- encoding_data = self.userid_type_encoders.get(type(userid))
-
- if encoding_data:
- encoding, encoder = encoding_data
- else:
- warnings.warn(
- "userid is of type {}, and is not supported by the "
- "AuthTktAuthenticationPolicy. Explicitly converting to string "
- "and storing as base64. Subsequent requests will receive a "
- "string as the userid, it will not be decoded back to the "
- "type provided.".format(type(userid)),
- RuntimeWarning,
- )
- encoding, encoder = self.userid_type_encoders.get(str)
- userid = str(userid)
-
- userid = encoder(userid)
- user_data = 'userid_type:%s' % encoding
-
- new_tokens = []
- for token in tokens:
- if isinstance(token, str):
- try:
- token = ascii_(token)
- except UnicodeEncodeError:
- raise ValueError("Invalid token %r" % (token,))
- if not (isinstance(token, str) and VALID_TOKEN.match(token)):
- raise ValueError("Invalid token %r" % (token,))
- new_tokens.append(token)
- tokens = tuple(new_tokens)
-
- if hasattr(request, '_authtkt_reissued'):
- request._authtkt_reissue_revoked = True
-
- ticket = self.AuthTicket(
- self.secret,
- userid,
- remote_addr,
- tokens=tokens,
- user_data=user_data,
- cookie_name=self.cookie_name,
- secure=self.secure,
- hashalg=self.hashalg,
- )
-
- cookie_value = ticket.cookie_value()
- return self._get_cookies(request, cookie_value, max_age)