diff options
| -rw-r--r-- | fietsboek/models/user.py | 59 | ||||
| -rw-r--r-- | fietsboek/security.py | 6 | ||||
| -rw-r--r-- | fietsboek/views/default.py | 5 | ||||
| -rw-r--r-- | tests/playwright/conftest.py | 4 | 
4 files changed, 64 insertions, 10 deletions
diff --git a/fietsboek/models/user.py b/fietsboek/models/user.py index 96a31c0..9dd2298 100644 --- a/fietsboek/models/user.py +++ b/fietsboek/models/user.py @@ -1,10 +1,11 @@  """User models for fietsboek."""  import datetime  import enum +import hashlib  import secrets  import uuid  from functools import reduce -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional  from cryptography.exceptions import InvalidKey  from cryptography.hazmat.primitives.kdf.scrypt import Scrypt @@ -28,7 +29,8 @@ from sqlalchemy import (      select,      union,  ) -from sqlalchemy.orm import Mapped, relationship, with_parent +from sqlalchemy.exc import NoResultFound +from sqlalchemy.orm import Mapped, Session, relationship, with_parent  from sqlalchemy.orm.attributes import flag_dirty  from sqlalchemy.orm.session import object_session @@ -61,6 +63,9 @@ SALT_LENGTH = 32  TOKEN_LIFETIME = datetime.timedelta(hours=24)  """Maximum validity time of a token.""" +FINGERPRINT_SHAKE_BYTES = 10 +"""Number of bytes to include in the SHAKE digest of the fingerprint.""" +  friends_assoc = Table(      "friends_assoc", @@ -179,6 +184,56 @@ class User(Base):          ]          return acl +    def authenticated_user_id(self) -> str: +        """Returns a string suitable to re-identify this user, e.g. in a cookie +        or session. + +        The returned ID contains a "fingerprint" to ensure that the +        re-identified user matches. + +        :return: The string used to re-identify this user. +        """ +        # We're not interested in hiding the email by all means, we just want a +        # good check to see whether the email of the represented user actually +        # matches with the database entry. Therefore, we use a short SHAKE hash. +        email = self.email or "" +        shaker = hashlib.shake_128(email.encode("utf-8")) +        fingerprint = shaker.hexdigest(FINGERPRINT_SHAKE_BYTES) +        return f"{self.id}-{fingerprint}" + +    @classmethod +    def get_by_authenticated_user_id(cls, session: Session, user_id: str) -> Optional["User"]: +        """Retrieves a user by their authenticated user ID (as returned by +        :meth:`authenticated_user_id`). + +        This method returns ``None`` on any of the following conditions: + +        * The user ID is malformed. +        * The user with the given ID was not found. +        * The fingerprint of the ``user_id`` and the database user do not match. + +        :param session: The database session which to retrieve the user from. +        :param user_id: The user ID. +        :return: The user, if found. +        """ +        try: +            numeric_id, fingerprint = user_id.split("-") +        except ValueError: +            # Malformed ID +            return None +        query = select(cls).filter_by(id=numeric_id) +        try: +            user = session.execute(query).scalar_one() +        except NoResultFound: +            return None +        email = user.email or "" +        user_fingerprint = hashlib.shake_128(email.encode("utf-8")).hexdigest( +            FINGERPRINT_SHAKE_BYTES +        ) +        if user_fingerprint != fingerprint: +            return None +        return user +      def set_password(self, new_password):          """Sets a new password for the user. diff --git a/fietsboek/security.py b/fietsboek/security.py index 1dd2390..13ce369 100644 --- a/fietsboek/security.py +++ b/fietsboek/security.py @@ -4,7 +4,6 @@ from pyramid.authorization import ACLHelper, Authenticated, Everyone  from pyramid.interfaces import ISecurityPolicy  from pyramid.security import Allowed, Denied  from pyramid.traversal import DefaultRootFactory -from sqlalchemy import select  from zope.interface import implementer  from . import models @@ -36,15 +35,14 @@ class SecurityPolicy:          if userid is None:              return None -        query = select(models.User).filter_by(id=int(userid)) -        return request.dbsession.execute(query).scalar_one_or_none() +        return models.User.get_by_authenticated_user_id(request.dbsession, userid)      def authenticated_userid(self, request):          """See :meth:`pyramid.interfaces.ISecurityPolicy.authenticated_userid`"""          identity = self.identity(request)          if identity is None:              return None -        return str(identity.id) +        return identity.authenticated_user_id()      def permits(self, request, context, permission):          """See :meth:`pyramid.interfaces.ISecurityPolicy.permits`""" diff --git a/fietsboek/views/default.py b/fietsboek/views/default.py index f9942c3..75a9522 100644 --- a/fietsboek/views/default.py +++ b/fietsboek/views/default.py @@ -120,13 +120,14 @@ def do_login(request: Request) -> Response:          return HTTPFound(request.route_url("login"))      request.session.flash(request.localizer.translate(_("flash.logged_in"))) -    headers = remember(request, str(user.id)) +    user_id = user.authenticated_user_id() +    headers = remember(request, user_id)      if request.params.get("remember-me") == "on":          # We don't want this logic to be the default in          # SecurityPolicy.remember, so we manually fake it here:          policy = request.registry.getUtility(ISecurityPolicy) -        headers += policy.remember_cookie(request, str(user.id)) +        headers += policy.remember_cookie(request, user_id)      response = HTTPFound("/", headers=headers)      return response diff --git a/tests/playwright/conftest.py b/tests/playwright/conftest.py index f57aca7..b17d457 100644 --- a/tests/playwright/conftest.py +++ b/tests/playwright/conftest.py @@ -81,7 +81,7 @@ class Helper:              user.set_password("password")              self.dbaccess.add(user)              self.dbaccess.commit() -            self.dbaccess.refresh(user, ["id"]) +            self.dbaccess.refresh(user, ["id", "email"])              self.dbaccess.expunge(user)              self._johnny = user              return user @@ -93,7 +93,7 @@ class Helper:          config = Config.construct(session_key=self.app_settings["session_key"])          secret = config.derive_secret("auth-cookie")          helper = AuthTktCookieHelper(secret) -        headers = helper.remember(DummyRequest(), str(user.id)) +        headers = helper.remember(DummyRequest(), user.authenticated_user_id())          for _, header_val in headers:              cookie = header_val.split(";")[0]              name, value = cookie.split("=", 1)  | 
