diff options
-rw-r--r-- | fietsboek/alembic/versions/20230914_4566843039d6.py | 37 | ||||
-rw-r--r-- | fietsboek/models/user.py | 37 |
2 files changed, 63 insertions, 11 deletions
diff --git a/fietsboek/alembic/versions/20230914_4566843039d6.py b/fietsboek/alembic/versions/20230914_4566843039d6.py new file mode 100644 index 0000000..28dcbda --- /dev/null +++ b/fietsboek/alembic/versions/20230914_4566843039d6.py @@ -0,0 +1,37 @@ +"""Add session_secret column. + +Revision ID: 4566843039d6 +Revises: 8f4e4eae5eb2 +Create Date: 2023-09-14 19:33:51.646943 + +""" +import secrets + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '4566843039d6' +down_revision = '8f4e4eae5eb2' +branch_labels = None +depends_on = None + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('users', sa.Column('session_secret', sa.LargeBinary(length=32), nullable=True)) + bind = op.get_bind() + user_ids = bind.execute(sa.text("SELECT id FROM users;")).scalars() + for user_id in user_ids: + bind.execute( + sa.text("UPDATE users SET session_secret=:secret WHERE id=:user_id;"), + { + "secret": secrets.token_bytes(32), + "user_id": user_id, + }, + ) + # ### end Alembic commands ### + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('users', 'session_secret') + # ### end Alembic commands ### diff --git a/fietsboek/models/user.py b/fietsboek/models/user.py index 432c61d..7d327b7 100644 --- a/fietsboek/models/user.py +++ b/fietsboek/models/user.py @@ -59,6 +59,8 @@ SCRYPT_PARAMETERS = { "p": 1, } SALT_LENGTH = 32 +SESSION_SECRET_LENGTH = 32 +"""Length of the secret bytes for the user's session.""" TOKEN_LIFETIME = datetime.timedelta(hours=24) """Maximum validity time of a token.""" @@ -90,6 +92,8 @@ class User(Base): :vartype salt: bytes :ivar email: Email address of the user. :vartype email: str + :ivar session_secret: Secret bytes for the session fingerprint. + :vartype session_secret: bytes :ivar is_admin: Flag determining whether this user has admin access. :vartype is_admin: bool :ivar is_verified: Flag determining whether this user has been verified. @@ -112,6 +116,7 @@ class User(Base): password = Column(LargeBinary) salt = Column(LargeBinary) email = Column(Text) + session_secret = Column(LargeBinary(SESSION_SECRET_LENGTH)) is_admin = Column(Boolean, default=False) is_verified = Column(Boolean, default=False) @@ -187,6 +192,17 @@ class User(Base): ] return acl + def _fingerprint(self) -> str: + # We're not interested in hiding the data by all means, we just want a + # good check to see whether the email of the represented user actually + # matches with the database entry. Therefore, we use a short SHAKE hash. + email = self.email or "" + shaker = hashlib.shake_128() + shaker.update(email.encode("utf-8")) + shaker.update(self.password or b"") + shaker.update(self.session_secret or b"") + return shaker.hexdigest(FINGERPRINT_SHAKE_BYTES) + def authenticated_user_id(self) -> str: """Returns a string suitable to re-identify this user, e.g. in a cookie or session. @@ -196,12 +212,7 @@ class User(Base): :return: The string used to re-identify this user. """ - # We're not interested in hiding the email by all means, we just want a - # good check to see whether the email of the represented user actually - # matches with the database entry. Therefore, we use a short SHAKE hash. - email = self.email or "" - shaker = hashlib.shake_128(email.encode("utf-8")) - fingerprint = shaker.hexdigest(FINGERPRINT_SHAKE_BYTES) + fingerprint = self._fingerprint() return f"{self.id}-{fingerprint}" @classmethod @@ -229,11 +240,7 @@ class User(Base): user = session.execute(query).scalar_one() except NoResultFound: return None - email = user.email or "" - user_fingerprint = hashlib.shake_128(email.encode("utf-8")).hexdigest( - FINGERPRINT_SHAKE_BYTES - ) - if user_fingerprint != fingerprint: + if user._fingerprint() != fingerprint: return None return user @@ -270,6 +277,14 @@ class User(Base): except InvalidKey: raise PasswordMismatch from None + def roll_session_secret(self): + """Rolls a new session secret for the user. + + This function automatically generates the right amount of random bytes + from a secure source. + """ + self.session_secret = secrets.token_bytes(SESSION_SECRET_LENGTH) + def principals(self): """Returns all principals that this user fulfills. |