aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fietsboek/data.py2
-rw-r--r--fietsboek/fstrans.py197
2 files changed, 188 insertions, 11 deletions
diff --git a/fietsboek/data.py b/fietsboek/data.py
index c75a451..70e52dc 100644
--- a/fietsboek/data.py
+++ b/fietsboek/data.py
@@ -13,7 +13,7 @@ import shutil
import string
import uuid
from pathlib import Path
-from typing import BinaryIO, Literal, Optional
+from typing import BinaryIO, Optional
import brotli
from filelock import FileLock
diff --git a/fietsboek/fstrans.py b/fietsboek/fstrans.py
index 8c73f94..c68ca48 100644
--- a/fietsboek/fstrans.py
+++ b/fietsboek/fstrans.py
@@ -1,13 +1,76 @@
-"""Filesystem transactions."""
+"""Filesystem transactions.
+
+Motivation
+##########
+
+Fietsboek does a lot of filesystem stuff, such as saving images and track
+backups. Like database actions, we want to ensure that these actions happen
+"atomically" -- if an error occurs during one action, we want to undo the
+previous ones. Similarly, if an error occurs after things have been sent to the
+database/filesystem, we want to ensure that we "clean up" (see `issue 98`_).
+
+.. _issue 98: https://gitlab.com/dunj3/fietsboek/-/issues/98
+
+By having "transactionized" file system actions, we can ensure that we do not
+not have such issues:
+
+* Actions are reversible in case changes need to be rolled back.
+* Actions are done all-or-nothing.
+* Transactions are combined with other transactions (such as SQLAlchemy); if
+ one fails, the others will be rolled back.
+
+Implementation
+##############
+
+The main mechanism is :class:`Transaction`. It provides the commit/abort
+interface as required by the transaction_ module, and user-facing methods to
+enqueue filesystem modifications.
+
+.. _transaction: https://github.com/zopefoundation/transaction
+
+A transaction is started by :func:`begin`, which also joins the transaction
+to the other transactions managed by ``transaction``.
+
+A transaction records :class:`Action`, which represent modifications to be done
+to the filesystem. Each action knows how it should be executed. Additionally,
+each action records how to undo it -- the :class:`WriteBytes` action, for
+example, records the previous file state so it can be restored.
+
+This implementation has some drawbacks:
+
+* Modifications are kept in-memory, which might be an issue for larger files.
+* Extra time is needed to record previous states (read a file before it is
+ overwritten).
+* Changes are not visible to other programs until they are committed.
+
+But the advantage is the ease of implementation: Cancelling a transaction just
+involves clearing the in-memory buffer, and there are no additional temporary
+files needed.
+
+To further ensure that the filesystem is in a consistent state, the
+transactions use a lock file to get exclusive access. Currently, this lock
+spans the complete data directory. In the future, more fine-grained locking can
+be implemented.
+
+Usage
+#####
+
+The transaction is automatically used by the
+:class:`fietsboek.data.DataManager`.
+
+Interface
+#########
+"""
import enum
import logging
import shutil
-import transaction
import uuid
-from filelock import FileLock
from pathlib import Path
+import transaction
+from filelock import FileLock
+
LOGGER = logging.getLogger(__name__)
@@ -16,28 +79,52 @@ def _log_deletion_error(_, path, exc_info):
class TransactionalError(Exception):
- pass
+ """An exception that occurs when committing filesystem transactions."""
class State(enum.Enum):
+ """State of the transaction."""
+
OPEN = enum.auto()
+ """Transaction is open for further actions."""
+
COMMITTING = enum.auto()
+ """Transaction is in the process of being committed."""
+
COMMITTED = enum.auto()
+ """Transaction has been committed."""
+
TAINTED = enum.auto()
+ """Transaction is tainted."""
class Action:
+ """Base class for any actions that can be applied to the filesystem."""
+
def commit_1(self):
- pass
+ """Commit this action (phase 1).
+
+ This corresponds to ``tpc_vote``, and may raise an exception if
+ committing should be cancelled.
+ """
def commit_2(self):
- pass
+ """Commit this action (phase 2).
+
+ This corresponds to ``tpc_finish``, and should not raise an exception.
+ """
def undo(self):
- pass
+ """Undo this action.
+
+ This is called if commiting fails, to undo any changes that were
+ already committed.
+ """
class WriteBytes(Action):
+ """Write bytes to the given file."""
+
def __init__(self, path: Path, data: bytes):
self.path = path
self.data = data
@@ -61,6 +148,8 @@ class WriteBytes(Action):
class Unlink(Action):
+ """Remove the given file."""
+
def __init__(self, path: Path):
self.path = path
self.old = None
@@ -77,6 +166,8 @@ class Unlink(Action):
class MakeDir(Action):
+ """Create the given directory."""
+
def __init__(self, path: Path, exist_ok: bool = False):
self.path = path
self.exist_ok = exist_ok
@@ -95,6 +186,8 @@ class MakeDir(Action):
class RemoveDir(Action):
+ """Remove the given (empty) directory."""
+
def __init__(self, path: Path):
self.path = path
@@ -109,6 +202,8 @@ class RemoveDir(Action):
class Purge(Action):
+ """Purge (recursively remove) the given directory."""
+
def __init__(self, path: Path):
self.path = path
@@ -116,10 +211,13 @@ class Purge(Action):
return f"<Purge {self.path}>"
def commit_2(self):
+ # pylint: disable=deprecated-argument
shutil.rmtree(self.path, ignore_errors=False, onerror=_log_deletion_error)
class Transaction:
+ """A transaction, recording pending filesystem changes."""
+
def __init__(self, lock_path: Path):
self.actions: list[Action] = []
self.actions_done: list[Action] = []
@@ -128,9 +226,19 @@ class Transaction:
self.id = uuid.uuid4().hex
def tpc_begin(self, _trans):
- pass
+ """Begin the transaction.
+
+ This is required by the two-phase commit protocol of ``transaction``.
+ """
def tpc_vote(self, _trans):
+ """Commit (phase 1) the pending transaction.
+
+ This is required by the two-phase commit protocol of ``transaction``.
+
+ This method may raise exceptions to signal that the transaction (and
+ all linked transactions) should be aborted.
+ """
if not self.actions:
return
@@ -138,6 +246,12 @@ class Transaction:
self.commit_1()
def tpc_finish(self, _trans):
+ """Commit (phase 2) the pending transaction.
+
+ This is required by the two-phase commit protocol of ``transaction``.
+
+ This method should not raise an exception.
+ """
if not self.actions:
return
@@ -145,16 +259,25 @@ class Transaction:
self.commit_2()
def tpc_abort(self, _trans):
+ """Abort the transaction, undoing all previously done changes.
+
+ This is required by the two-phase commit protocol of ``transaction``.
+ """
if not self.actions_done:
return
with FileLock(self.lock_path):
self.undo()
+ # Needs to conform to transaction API:
+ # pylint: disable=invalid-name
def sortKey(self):
+ """Returns the sort key to sort this transaction in relation to others.
+ """
return f"filesystem:{self.id}"
def undo(self):
+ """Undo all actions that have already been applied."""
for action in reversed(self.actions_done):
LOGGER.debug("Undoing %s", action)
try:
@@ -163,9 +286,16 @@ class Transaction:
LOGGER.debug("Exception ignored: %s", exc_inner)
def commit(self, _trans):
- pass
+ """Commit this transaction.
+
+ This is required by the interface of ``transaction``.
+ """
def commit_1(self):
+ """Start the first phase of committing.
+
+ This method is called automatically by the transaction manager.
+ """
if self.state != State.OPEN:
raise TransactionalError(f"Transaction is not open but {self.state}")
@@ -182,6 +312,10 @@ class Transaction:
raise exc
def commit_2(self):
+ """Start the second phase of committing.
+
+ This method is called automatically by the transaction manager.
+ """
if self.state != State.COMMITTING:
raise TransactionalError(f"Transaction is not committing but {self.state}")
@@ -192,37 +326,80 @@ class Transaction:
self.state = State.COMMITTED
def abort(self, _trans=None):
+ """Abort this transaction."""
self.actions.clear()
self.state = State.TAINTED
def write_bytes(self, path: Path, data: bytes):
+ """Write the given bytes to the given path.
+
+ This is a transactioned version of :meth:`Path.write_bytes`.
+
+ :param path: Path where to write the bytes to.
+ :param data: The data to write.
+ """
if self.state != State.OPEN:
raise TransactionalError(f"Transaction is not open but {self.state}")
self.actions.append(WriteBytes(path, data))
def unlink(self, path: Path):
+ """Unlinks (removes) the given file.
+
+ This is a transactioned version of :math:`Path.unlink`.
+
+ :param path: The path to the file to unlink.
+ """
if self.state != State.OPEN:
raise TransactionalError(f"Transaction is not open but {self.state}")
self.actions.append(Unlink(path))
def make_dir(self, path: Path, *, exist_ok: bool = False):
+ """Creates the directory.
+
+ This is a transactioned version of :meth:`Path.mkdir`.
+
+ :param path: The directory to create.
+ :param exist_ok: If ``True``, no error will be raised if the directory
+ already exists.
+ """
if self.state != State.OPEN:
raise TransactionalError(f"Transaction is not open but {self.state}")
self.actions.append(MakeDir(path, exist_ok=exist_ok))
def remove_dir(self, path: Path):
+ """Removes the (empty) directory.
+
+ This is a transactioned version of :meth:`Path.rmdir`.
+
+ :param path: The directory to remove.
+ """
if self.state != State.OPEN:
raise TransactionalError(f"Transaction is not open but {self.state}")
self.actions.append(RemoveDir(path))
def purge(self, path: Path):
+ """Completely remove (recursively) the given path.
+
+ This uses :func:`shutil.rmtree` to delete the path.
+
+ Unlike other actions, this cannot be undone!
+
+ :param path: The directory to remove.
+ """
if self.state != State.OPEN:
raise TransactionalError(f"Transaction is not open but {self.state}")
self.actions.append(Purge(path))
def begin(lock_path: Path, tm=None) -> Transaction:
- """Begin and register a new transaction."""
+ """Begin a new transaction and register it.
+
+ :param lock_path: The path to the lock file to use in order to synchronize
+ the transaction.
+ :param tm: The transaction manager from ``transaction`` in which to join
+ this transaction.
+ :return: The :class:`Transaction`.
+ """
trans = Transaction(lock_path)
if tm:
tm.get().join(trans)