From 20ddfb7a1d39b55a6d8044a083d2d151bddec78d Mon Sep 17 00:00:00 2001 From: Daniel Schadt Date: Wed, 9 Aug 2023 21:01:06 +0200 Subject: include email in user fingerprint --- fietsboek/models/user.py | 59 ++++++++++++++++++++++++++++++++++++++++++-- fietsboek/security.py | 6 ++--- fietsboek/views/default.py | 5 ++-- tests/playwright/conftest.py | 4 +-- 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/fietsboek/models/user.py b/fietsboek/models/user.py index 96a31c0..9dd2298 100644 --- a/fietsboek/models/user.py +++ b/fietsboek/models/user.py @@ -1,10 +1,11 @@ """User models for fietsboek.""" import datetime import enum +import hashlib import secrets import uuid from functools import reduce -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from cryptography.exceptions import InvalidKey from cryptography.hazmat.primitives.kdf.scrypt import Scrypt @@ -28,7 +29,8 @@ from sqlalchemy import ( select, union, ) -from sqlalchemy.orm import Mapped, relationship, with_parent +from sqlalchemy.exc import NoResultFound +from sqlalchemy.orm import Mapped, Session, relationship, with_parent from sqlalchemy.orm.attributes import flag_dirty from sqlalchemy.orm.session import object_session @@ -61,6 +63,9 @@ SALT_LENGTH = 32 TOKEN_LIFETIME = datetime.timedelta(hours=24) """Maximum validity time of a token.""" +FINGERPRINT_SHAKE_BYTES = 10 +"""Number of bytes to include in the SHAKE digest of the fingerprint.""" + friends_assoc = Table( "friends_assoc", @@ -179,6 +184,56 @@ class User(Base): ] return acl + def authenticated_user_id(self) -> str: + """Returns a string suitable to re-identify this user, e.g. in a cookie + or session. + + The returned ID contains a "fingerprint" to ensure that the + re-identified user matches. + + :return: The string used to re-identify this user. + """ + # We're not interested in hiding the email by all means, we just want a + # good check to see whether the email of the represented user actually + # matches with the database entry. Therefore, we use a short SHAKE hash. + email = self.email or "" + shaker = hashlib.shake_128(email.encode("utf-8")) + fingerprint = shaker.hexdigest(FINGERPRINT_SHAKE_BYTES) + return f"{self.id}-{fingerprint}" + + @classmethod + def get_by_authenticated_user_id(cls, session: Session, user_id: str) -> Optional["User"]: + """Retrieves a user by their authenticated user ID (as returned by + :meth:`authenticated_user_id`). + + This method returns ``None`` on any of the following conditions: + + * The user ID is malformed. + * The user with the given ID was not found. + * The fingerprint of the ``user_id`` and the database user do not match. + + :param session: The database session which to retrieve the user from. + :param user_id: The user ID. + :return: The user, if found. + """ + try: + numeric_id, fingerprint = user_id.split("-") + except ValueError: + # Malformed ID + return None + query = select(cls).filter_by(id=numeric_id) + try: + user = session.execute(query).scalar_one() + except NoResultFound: + return None + email = user.email or "" + user_fingerprint = hashlib.shake_128(email.encode("utf-8")).hexdigest( + FINGERPRINT_SHAKE_BYTES + ) + if user_fingerprint != fingerprint: + return None + return user + def set_password(self, new_password): """Sets a new password for the user. diff --git a/fietsboek/security.py b/fietsboek/security.py index 1dd2390..13ce369 100644 --- a/fietsboek/security.py +++ b/fietsboek/security.py @@ -4,7 +4,6 @@ from pyramid.authorization import ACLHelper, Authenticated, Everyone from pyramid.interfaces import ISecurityPolicy from pyramid.security import Allowed, Denied from pyramid.traversal import DefaultRootFactory -from sqlalchemy import select from zope.interface import implementer from . import models @@ -36,15 +35,14 @@ class SecurityPolicy: if userid is None: return None - query = select(models.User).filter_by(id=int(userid)) - return request.dbsession.execute(query).scalar_one_or_none() + return models.User.get_by_authenticated_user_id(request.dbsession, userid) def authenticated_userid(self, request): """See :meth:`pyramid.interfaces.ISecurityPolicy.authenticated_userid`""" identity = self.identity(request) if identity is None: return None - return str(identity.id) + return identity.authenticated_user_id() def permits(self, request, context, permission): """See :meth:`pyramid.interfaces.ISecurityPolicy.permits`""" diff --git a/fietsboek/views/default.py b/fietsboek/views/default.py index f9942c3..75a9522 100644 --- a/fietsboek/views/default.py +++ b/fietsboek/views/default.py @@ -120,13 +120,14 @@ def do_login(request: Request) -> Response: return HTTPFound(request.route_url("login")) request.session.flash(request.localizer.translate(_("flash.logged_in"))) - headers = remember(request, str(user.id)) + user_id = user.authenticated_user_id() + headers = remember(request, user_id) if request.params.get("remember-me") == "on": # We don't want this logic to be the default in # SecurityPolicy.remember, so we manually fake it here: policy = request.registry.getUtility(ISecurityPolicy) - headers += policy.remember_cookie(request, str(user.id)) + headers += policy.remember_cookie(request, user_id) response = HTTPFound("/", headers=headers) return response diff --git a/tests/playwright/conftest.py b/tests/playwright/conftest.py index f57aca7..b17d457 100644 --- a/tests/playwright/conftest.py +++ b/tests/playwright/conftest.py @@ -81,7 +81,7 @@ class Helper: user.set_password("password") self.dbaccess.add(user) self.dbaccess.commit() - self.dbaccess.refresh(user, ["id"]) + self.dbaccess.refresh(user, ["id", "email"]) self.dbaccess.expunge(user) self._johnny = user return user @@ -93,7 +93,7 @@ class Helper: config = Config.construct(session_key=self.app_settings["session_key"]) secret = config.derive_secret("auth-cookie") helper = AuthTktCookieHelper(secret) - headers = helper.remember(DummyRequest(), str(user.id)) + headers = helper.remember(DummyRequest(), user.authenticated_user_id()) for _, header_val in headers: cookie = header_val.split(";")[0] name, value = cookie.split("=", 1) -- cgit v1.2.3