diff options
| -rw-r--r-- | fietsboek/data.py | 2 | ||||
| -rw-r--r-- | fietsboek/fstrans.py | 197 |
2 files changed, 188 insertions, 11 deletions
diff --git a/fietsboek/data.py b/fietsboek/data.py index c75a451..70e52dc 100644 --- a/fietsboek/data.py +++ b/fietsboek/data.py @@ -13,7 +13,7 @@ import shutil import string import uuid from pathlib import Path -from typing import BinaryIO, Literal, Optional +from typing import BinaryIO, Optional import brotli from filelock import FileLock diff --git a/fietsboek/fstrans.py b/fietsboek/fstrans.py index 8c73f94..c68ca48 100644 --- a/fietsboek/fstrans.py +++ b/fietsboek/fstrans.py @@ -1,13 +1,76 @@ -"""Filesystem transactions.""" +"""Filesystem transactions. + +Motivation +########## + +Fietsboek does a lot of filesystem stuff, such as saving images and track +backups. Like database actions, we want to ensure that these actions happen +"atomically" -- if an error occurs during one action, we want to undo the +previous ones. Similarly, if an error occurs after things have been sent to the +database/filesystem, we want to ensure that we "clean up" (see `issue 98`_). + +.. _issue 98: https://gitlab.com/dunj3/fietsboek/-/issues/98 + +By having "transactionized" file system actions, we can ensure that we do not +not have such issues: + +* Actions are reversible in case changes need to be rolled back. +* Actions are done all-or-nothing. +* Transactions are combined with other transactions (such as SQLAlchemy); if + one fails, the others will be rolled back. + +Implementation +############## + +The main mechanism is :class:`Transaction`. It provides the commit/abort +interface as required by the transaction_ module, and user-facing methods to +enqueue filesystem modifications. + +.. _transaction: https://github.com/zopefoundation/transaction + +A transaction is started by :func:`begin`, which also joins the transaction +to the other transactions managed by ``transaction``. + +A transaction records :class:`Action`, which represent modifications to be done +to the filesystem. Each action knows how it should be executed. Additionally, +each action records how to undo it -- the :class:`WriteBytes` action, for +example, records the previous file state so it can be restored. + +This implementation has some drawbacks: + +* Modifications are kept in-memory, which might be an issue for larger files. +* Extra time is needed to record previous states (read a file before it is + overwritten). +* Changes are not visible to other programs until they are committed. + +But the advantage is the ease of implementation: Cancelling a transaction just +involves clearing the in-memory buffer, and there are no additional temporary +files needed. + +To further ensure that the filesystem is in a consistent state, the +transactions use a lock file to get exclusive access. Currently, this lock +spans the complete data directory. In the future, more fine-grained locking can +be implemented. + +Usage +##### + +The transaction is automatically used by the +:class:`fietsboek.data.DataManager`. + +Interface +######### +""" import enum import logging import shutil -import transaction import uuid -from filelock import FileLock from pathlib import Path +import transaction +from filelock import FileLock + LOGGER = logging.getLogger(__name__) @@ -16,28 +79,52 @@ def _log_deletion_error(_, path, exc_info): class TransactionalError(Exception): - pass + """An exception that occurs when committing filesystem transactions.""" class State(enum.Enum): + """State of the transaction.""" + OPEN = enum.auto() + """Transaction is open for further actions.""" + COMMITTING = enum.auto() + """Transaction is in the process of being committed.""" + COMMITTED = enum.auto() + """Transaction has been committed.""" + TAINTED = enum.auto() + """Transaction is tainted.""" class Action: + """Base class for any actions that can be applied to the filesystem.""" + def commit_1(self): - pass + """Commit this action (phase 1). + + This corresponds to ``tpc_vote``, and may raise an exception if + committing should be cancelled. + """ def commit_2(self): - pass + """Commit this action (phase 2). + + This corresponds to ``tpc_finish``, and should not raise an exception. + """ def undo(self): - pass + """Undo this action. + + This is called if commiting fails, to undo any changes that were + already committed. + """ class WriteBytes(Action): + """Write bytes to the given file.""" + def __init__(self, path: Path, data: bytes): self.path = path self.data = data @@ -61,6 +148,8 @@ class WriteBytes(Action): class Unlink(Action): + """Remove the given file.""" + def __init__(self, path: Path): self.path = path self.old = None @@ -77,6 +166,8 @@ class Unlink(Action): class MakeDir(Action): + """Create the given directory.""" + def __init__(self, path: Path, exist_ok: bool = False): self.path = path self.exist_ok = exist_ok @@ -95,6 +186,8 @@ class MakeDir(Action): class RemoveDir(Action): + """Remove the given (empty) directory.""" + def __init__(self, path: Path): self.path = path @@ -109,6 +202,8 @@ class RemoveDir(Action): class Purge(Action): + """Purge (recursively remove) the given directory.""" + def __init__(self, path: Path): self.path = path @@ -116,10 +211,13 @@ class Purge(Action): return f"<Purge {self.path}>" def commit_2(self): + # pylint: disable=deprecated-argument shutil.rmtree(self.path, ignore_errors=False, onerror=_log_deletion_error) class Transaction: + """A transaction, recording pending filesystem changes.""" + def __init__(self, lock_path: Path): self.actions: list[Action] = [] self.actions_done: list[Action] = [] @@ -128,9 +226,19 @@ class Transaction: self.id = uuid.uuid4().hex def tpc_begin(self, _trans): - pass + """Begin the transaction. + + This is required by the two-phase commit protocol of ``transaction``. + """ def tpc_vote(self, _trans): + """Commit (phase 1) the pending transaction. + + This is required by the two-phase commit protocol of ``transaction``. + + This method may raise exceptions to signal that the transaction (and + all linked transactions) should be aborted. + """ if not self.actions: return @@ -138,6 +246,12 @@ class Transaction: self.commit_1() def tpc_finish(self, _trans): + """Commit (phase 2) the pending transaction. + + This is required by the two-phase commit protocol of ``transaction``. + + This method should not raise an exception. + """ if not self.actions: return @@ -145,16 +259,25 @@ class Transaction: self.commit_2() def tpc_abort(self, _trans): + """Abort the transaction, undoing all previously done changes. + + This is required by the two-phase commit protocol of ``transaction``. + """ if not self.actions_done: return with FileLock(self.lock_path): self.undo() + # Needs to conform to transaction API: + # pylint: disable=invalid-name def sortKey(self): + """Returns the sort key to sort this transaction in relation to others. + """ return f"filesystem:{self.id}" def undo(self): + """Undo all actions that have already been applied.""" for action in reversed(self.actions_done): LOGGER.debug("Undoing %s", action) try: @@ -163,9 +286,16 @@ class Transaction: LOGGER.debug("Exception ignored: %s", exc_inner) def commit(self, _trans): - pass + """Commit this transaction. + + This is required by the interface of ``transaction``. + """ def commit_1(self): + """Start the first phase of committing. + + This method is called automatically by the transaction manager. + """ if self.state != State.OPEN: raise TransactionalError(f"Transaction is not open but {self.state}") @@ -182,6 +312,10 @@ class Transaction: raise exc def commit_2(self): + """Start the second phase of committing. + + This method is called automatically by the transaction manager. + """ if self.state != State.COMMITTING: raise TransactionalError(f"Transaction is not committing but {self.state}") @@ -192,37 +326,80 @@ class Transaction: self.state = State.COMMITTED def abort(self, _trans=None): + """Abort this transaction.""" self.actions.clear() self.state = State.TAINTED def write_bytes(self, path: Path, data: bytes): + """Write the given bytes to the given path. + + This is a transactioned version of :meth:`Path.write_bytes`. + + :param path: Path where to write the bytes to. + :param data: The data to write. + """ if self.state != State.OPEN: raise TransactionalError(f"Transaction is not open but {self.state}") self.actions.append(WriteBytes(path, data)) def unlink(self, path: Path): + """Unlinks (removes) the given file. + + This is a transactioned version of :math:`Path.unlink`. + + :param path: The path to the file to unlink. + """ if self.state != State.OPEN: raise TransactionalError(f"Transaction is not open but {self.state}") self.actions.append(Unlink(path)) def make_dir(self, path: Path, *, exist_ok: bool = False): + """Creates the directory. + + This is a transactioned version of :meth:`Path.mkdir`. + + :param path: The directory to create. + :param exist_ok: If ``True``, no error will be raised if the directory + already exists. + """ if self.state != State.OPEN: raise TransactionalError(f"Transaction is not open but {self.state}") self.actions.append(MakeDir(path, exist_ok=exist_ok)) def remove_dir(self, path: Path): + """Removes the (empty) directory. + + This is a transactioned version of :meth:`Path.rmdir`. + + :param path: The directory to remove. + """ if self.state != State.OPEN: raise TransactionalError(f"Transaction is not open but {self.state}") self.actions.append(RemoveDir(path)) def purge(self, path: Path): + """Completely remove (recursively) the given path. + + This uses :func:`shutil.rmtree` to delete the path. + + Unlike other actions, this cannot be undone! + + :param path: The directory to remove. + """ if self.state != State.OPEN: raise TransactionalError(f"Transaction is not open but {self.state}") self.actions.append(Purge(path)) def begin(lock_path: Path, tm=None) -> Transaction: - """Begin and register a new transaction.""" + """Begin a new transaction and register it. + + :param lock_path: The path to the lock file to use in order to synchronize + the transaction. + :param tm: The transaction manager from ``transaction`` in which to join + this transaction. + :return: The :class:`Transaction`. + """ trans = Transaction(lock_path) if tm: tm.get().join(trans) |
