From 9bea2637e5a10c52e49299b22598297b35412373 Mon Sep 17 00:00:00 2001 From: Daniel Schadt Date: Mon, 29 Dec 2025 13:09:58 +0100 Subject: initial filesystem transactions This should solve issues that arise when exceptions occur during upload. Hooks into the transaction/pyramid_tm machinery. --- fietsboek/__init__.py | 10 +- fietsboek/actions.py | 46 ++++----- fietsboek/data.py | 155 +++++++++---------------------- fietsboek/fstrans.py | 231 ++++++++++++++++++++++++++++++++++++++++++++++ fietsboek/views/upload.py | 9 +- 5 files changed, 308 insertions(+), 143 deletions(-) create mode 100644 fietsboek/fstrans.py diff --git a/fietsboek/__init__.py b/fietsboek/__init__.py index 2ac41e3..b533e97 100644 --- a/fietsboek/__init__.py +++ b/fietsboek/__init__.py @@ -31,6 +31,7 @@ from pyramid.response import Response from pyramid.session import SignedCookieSessionFactory from . import config as mod_config +from . import fstrans from . import jinja2 as mod_jinja2 from . import transformers from .data import DataManager @@ -92,7 +93,10 @@ def maintenance_mode( """ def tween(request: Request) -> Response: - maintenance = request.data_manager.maintenance_mode() + # We don't want to mess around with transactioned data mangers here, + # so we create a new one without transaction + data_manager = DataManager(Path(request.config.data_dir)) + maintenance = data_manager.maintenance_mode() if maintenance is None: return handler(request) @@ -155,7 +159,9 @@ def main(global_config, **settings): check_db_engine(parsed_config.sqlalchemy_url) def data_manager(request): - return DataManager(Path(request.config.data_dir)) + data_dir = Path(request.config.data_dir) + lock_file = data_dir / "lock" + return DataManager(data_dir, txn=fstrans.begin(lock_file, request.tm)) def redis_(request): return redis.from_url(request.config.redis_url) diff --git a/fietsboek/actions.py b/fietsboek/actions.py index 2d23d97..f9f36ac 100644 --- a/fietsboek/actions.py +++ b/fietsboek/actions.py @@ -93,29 +93,29 @@ def add_track( # Save the GPX data LOGGER.debug("Creating a new data folder for %d", track.id) assert track.id is not None - with data_manager.initialize(track.id) as manager: - LOGGER.debug("Saving backup to %s", manager.backup_path()) - manager.compress_backup(gpx_data) - - for transformer in transformers: - LOGGER.debug("Running %s with %r", transformer, transformer.parameters) - transformer.execute(path) - track.transformers = [ - [tfm.identifier(), tfm.parameters.model_dump()] for tfm in transformers - ] - - track.fast_set_path(path) - - # Best time to build the cache is right after the upload, but *after* the - # transformers have been applied! - track.ensure_cache(path) - dbsession.add(track.cache) - - LOGGER.debug("Building preview image for %s", track.id) - preview_image = trackmap.render(path, layer, tile_requester) - image_io = io.BytesIO() - preview_image.save(image_io, "PNG") - manager.set_preview(image_io.getvalue()) + manager = data_manager.initialize(track.id) + LOGGER.debug("Saving backup to %s", manager.backup_path()) + manager.compress_backup(gpx_data) + + for transformer in transformers: + LOGGER.debug("Running %s with %r", transformer, transformer.parameters) + transformer.execute(path) + track.transformers = [ + [tfm.identifier(), tfm.parameters.model_dump()] for tfm in transformers + ] + + track.fast_set_path(path) + + # Best time to build the cache is right after the upload, but *after* the + # transformers have been applied! + track.ensure_cache(path) + dbsession.add(track.cache) + + LOGGER.debug("Building preview image for %s", track.id) + preview_image = trackmap.render(path, layer, tile_requester) + image_io = io.BytesIO() + preview_image.save(image_io, "PNG") + manager.set_preview(image_io.getvalue()) return track diff --git a/fietsboek/data.py b/fietsboek/data.py index 3fcd922..c75a451 100644 --- a/fietsboek/data.py +++ b/fietsboek/data.py @@ -19,6 +19,7 @@ import brotli from filelock import FileLock from . import util +from .fstrans import Transaction LOGGER = logging.getLogger(__name__) @@ -50,8 +51,9 @@ class DataManager: :ivar data_dir: Path to the data folder. """ - def __init__(self, data_dir: Path): + def __init__(self, data_dir: Path, *, txn: Transaction | None = None): self.data_dir: Path = data_dir + self.txn = txn def _track_data_dir(self, track_id): return self.data_dir / "tracks" / str(track_id) @@ -81,8 +83,11 @@ class DataManager: :return: The manager that can be used to manage this track's data. """ path = self._track_data_dir(track_id) - path.mkdir(parents=True) - return TrackDataDir(track_id, path, journal=True, is_fresh=True) + if self.txn: + self.txn.make_dir(path) + else: + path.mkdir(parents=True) + return TrackDataDir(track_id, path, txn=self.txn) def initialize_user(self, user_id: int) -> "UserDataDir": """Creates the data directory for a user. @@ -92,8 +97,11 @@ class DataManager: :return: The manager that can be used to manage this user's data. """ path = self._user_data_dir(user_id) - path.mkdir(parents=True) - return UserDataDir(user_id, path) + if self.txn: + self.txn.make_dir(path) + else: + path.mkdir(parents=True) + return UserDataDir(user_id, path, txn=self.txn) def purge(self, track_id: int): """Forcefully purges all data from the given track. @@ -101,9 +109,9 @@ class DataManager: This function logs errors but raises no exception, as such it can always be used to clean up after a track. """ - TrackDataDir(track_id, self._track_data_dir(track_id)).purge() + TrackDataDir(track_id, self._track_data_dir(track_id), txn=self.txn).purge() - def open(self, track_id: int) -> "TrackDataDir": + def open(self, track_id: int, *, force: bool = False) -> "TrackDataDir": """Opens a track's data directory. :raises FileNotFoundError: If the track directory does not exist. @@ -111,9 +119,9 @@ class DataManager: :return: The manager that can be used to manage this track's data. """ path = self._track_data_dir(track_id) - if not path.is_dir(): + if not force and not path.is_dir(): raise FileNotFoundError(f"The path {path} is not a directory") from None - return TrackDataDir(track_id, path) + return TrackDataDir(track_id, path, txn=self.txn) def open_user(self, user_id: int) -> "UserDataDir": """Opens a user's data directory. @@ -125,7 +133,7 @@ class DataManager: path = self._user_data_dir(user_id) if not path.is_dir(): raise FileNotFoundError(f"The path {path} is not a directory") from None - return UserDataDir(user_id, path) + return UserDataDir(user_id, path, txn=self.txn) def size(self) -> int: """Returns the size of all data. @@ -164,86 +172,10 @@ class TrackDataDir: semantics, use ``journal = False``. """ - def __init__(self, track_id: int, path: Path, *, journal: bool = False, is_fresh: bool = False): + def __init__(self, track_id: int, path: Path, *, txn: Transaction | None = None): self.track_id: int = track_id self.path: Path = path - self.journal: Optional[list] = [] if journal else None - self.is_fresh = is_fresh - - def __enter__(self) -> "TrackDataDir": - if self.journal is None: - self.journal = [] - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]: - if exc_type is None and exc_val is None and exc_tb is None: - self.commit() - else: - self.rollback() - return False - - def rollback(self): - """Rolls back the journal, e.g. in case of error. - - :raises ValueError: If the data directory was opened without the - journal, this raises :exc:`ValueError`. - """ - LOGGER.debug("Rolling back state of %s", self.path) - - if self.journal is None: - raise ValueError("Rollback on a non-journalling data directory") - - if self.is_fresh: - # Shortcut if the directory is fresh, simply remove everything - self.journal = None - self.purge() - return - - for action, *rest in reversed(self.journal): - if action == "purge": - (new_name,) = rest - shutil.move(new_name, self.path) - elif action == "add_image": - (image_path,) = rest - image_path.unlink() - elif action == "delete_image": - path, data = rest - path.write_bytes(data) - elif action == "set_preview": - old_data = rest - if old_data is None: - self.preview_path().unlink() - else: - self.preview_path().write_bytes(old_data) - - self.journal = None - - def commit(self): - """Commits all changed and deletes the journal. - - Note that this function will do nothing if the journal is disabled, - meaning it can always be called. - """ - LOGGER.debug("Committing journal for %s", self.path) - - if self.journal is None: - return - - for action, *rest in reversed(self.journal): - if action == "purge": - (new_name,) = rest - shutil.rmtree(new_name, ignore_errors=False, onerror=self._log_deletion_error) - elif action == "add_image": - # Nothing to do here, the image is already saved - pass - elif action == "delete_image": - # Again, nothing to do here, we simply discard the in-memory image data - pass - elif action == "set_preview": - # Still nothing to do here - pass - - self.journal = None + self.txn = txn def lock(self) -> FileLock: """Returns a FileLock that can be used to lock access to the track's @@ -263,13 +195,11 @@ class TrackDataDir: This function logs errors but raises no exception, as such it can always be used to clean up after a track. """ - if self.journal is None: + if self.txn: + self.txn.purge(self.path) + else: if self.path.is_dir(): shutil.rmtree(self.path, ignore_errors=False, onerror=self._log_deletion_error) - else: - new_name = self.path.with_name("trash-" + self.path.name) - shutil.move(self.path, new_name) - self.journal.append(("purge", new_name)) def size(self) -> int: """Returns the size of the data that this track entails. @@ -289,7 +219,10 @@ class TrackDataDir: quality but slowest compression speed. """ compressed = brotli.compress(data, quality=quality) - self.backup_path().write_bytes(compressed) + if self.txn: + self.txn.write_bytes(self.backup_path(), compressed) + else: + self.backup_path().write_bytes(compressed) def decompress_backup(self) -> bytes: """Returns the backup bytes decompressed. @@ -337,15 +270,18 @@ class TrackDataDir: :return: The ID of the saved image. """ image_dir = self.path / "images" - image_dir.mkdir(parents=True, exist_ok=True) + if self.txn: + self.txn.make_dir(image_dir, exist_ok=True) + else: + image_dir.mkdir(parents=True, exist_ok=True) filename = generate_filename(filename) path = image_dir / filename - with open(path, "wb") as fobj: - shutil.copyfileobj(image, fobj) - - if self.journal is not None: - self.journal.append(("add_image", path)) + if self.txn: + self.txn.write_bytes(path, image.read()) + else: + with open(path, "wb") as fobj: + shutil.copyfileobj(image, fobj) return filename @@ -361,10 +297,10 @@ class TrackDataDir: return path = self.image_path(image_id) - if self.journal is not None: - self.journal.append(("delete_image", path, path.read_bytes())) - - path.unlink() + if self.txn: + self.txn.unlink(path) + else: + path.unlink() def preview_path(self) -> Path: """Gets the path to the "preview image". @@ -378,13 +314,10 @@ class TrackDataDir: :param data: The data of the preview image. """ - if self.journal is not None: - try: - previous_preview = self.preview_path().read_bytes() - except FileNotFoundError: - previous_preview = None - self.journal.append(("set_preview", previous_preview)) - self.preview_path().write_bytes(data) + if self.txn: + self.txn.write_bytes(self.preview_path(), data) + else: + self.preview_path().write_bytes(data) class UserDataDir: diff --git a/fietsboek/fstrans.py b/fietsboek/fstrans.py new file mode 100644 index 0000000..8c73f94 --- /dev/null +++ b/fietsboek/fstrans.py @@ -0,0 +1,231 @@ +"""Filesystem transactions.""" + +import enum +import logging +import shutil +import transaction +import uuid +from filelock import FileLock +from pathlib import Path + +LOGGER = logging.getLogger(__name__) + + +def _log_deletion_error(_, path, exc_info): + LOGGER.warning("Failed to remove %s", path, exc_info=exc_info) + + +class TransactionalError(Exception): + pass + + +class State(enum.Enum): + OPEN = enum.auto() + COMMITTING = enum.auto() + COMMITTED = enum.auto() + TAINTED = enum.auto() + + +class Action: + def commit_1(self): + pass + + def commit_2(self): + pass + + def undo(self): + pass + + +class WriteBytes(Action): + def __init__(self, path: Path, data: bytes): + self.path = path + self.data = data + self.old = None + + def __repr__(self): + return f"" + + def commit_1(self): + try: + self.old = self.path.read_bytes() + except FileNotFoundError: + self.old = None + self.path.write_bytes(self.data) + + def undo(self): + if self.old is None: + self.path.unlink() + else: + self.path.write_bytes(self.old) + + +class Unlink(Action): + def __init__(self, path: Path): + self.path = path + self.old = None + + def __repr__(self): + return f"" + + def commit_1(self): + self.old = self.path.read_bytes() + self.path.unlink() + + def undo(self): + self.path.write_bytes(self.old) + + +class MakeDir(Action): + def __init__(self, path: Path, exist_ok: bool = False): + self.path = path + self.exist_ok = exist_ok + self.existed = None + + def __repr__(self): + return f"" + + def commit_1(self): + self.existed = self.path.is_dir() + self.path.mkdir(exist_ok=self.exist_ok) + + def undo(self): + if not self.existed: + self.path.rmdir() + + +class RemoveDir(Action): + def __init__(self, path: Path): + self.path = path + + def __repr__(self): + return f"" + + def commit_1(self): + self.path.rmdir() + + def undo(self): + self.path.mkdir() + + +class Purge(Action): + def __init__(self, path: Path): + self.path = path + + def __repr__(self): + return f"" + + def commit_2(self): + shutil.rmtree(self.path, ignore_errors=False, onerror=_log_deletion_error) + + +class Transaction: + def __init__(self, lock_path: Path): + self.actions: list[Action] = [] + self.actions_done: list[Action] = [] + self.state = State.OPEN + self.lock_path = lock_path + self.id = uuid.uuid4().hex + + def tpc_begin(self, _trans): + pass + + def tpc_vote(self, _trans): + if not self.actions: + return + + with FileLock(self.lock_path): + self.commit_1() + + def tpc_finish(self, _trans): + if not self.actions: + return + + with FileLock(self.lock_path): + self.commit_2() + + def tpc_abort(self, _trans): + if not self.actions_done: + return + + with FileLock(self.lock_path): + self.undo() + + def sortKey(self): + return f"filesystem:{self.id}" + + def undo(self): + for action in reversed(self.actions_done): + LOGGER.debug("Undoing %s", action) + try: + action.undo() + except Exception as exc_inner: + LOGGER.debug("Exception ignored: %s", exc_inner) + + def commit(self, _trans): + pass + + def commit_1(self): + if self.state != State.OPEN: + raise TransactionalError(f"Transaction is not open but {self.state}") + + self.state = State.COMMITTING + + try: + for action in self.actions: + LOGGER.debug("Executing 1st phase %s", action) + action.commit_1() + self.actions_done.append(action) + except Exception as exc: + LOGGER.debug("Exception while committing") + self.state = State.TAINTED + raise exc + + def commit_2(self): + if self.state != State.COMMITTING: + raise TransactionalError(f"Transaction is not committing but {self.state}") + + self.state = State.TAINTED + for action in self.actions: + LOGGER.debug("Executing 2nd phase %s", action) + action.commit_2() + self.state = State.COMMITTED + + def abort(self, _trans=None): + self.actions.clear() + self.state = State.TAINTED + + def write_bytes(self, path: Path, data: bytes): + if self.state != State.OPEN: + raise TransactionalError(f"Transaction is not open but {self.state}") + self.actions.append(WriteBytes(path, data)) + + def unlink(self, path: Path): + if self.state != State.OPEN: + raise TransactionalError(f"Transaction is not open but {self.state}") + self.actions.append(Unlink(path)) + + def make_dir(self, path: Path, *, exist_ok: bool = False): + if self.state != State.OPEN: + raise TransactionalError(f"Transaction is not open but {self.state}") + self.actions.append(MakeDir(path, exist_ok=exist_ok)) + + def remove_dir(self, path: Path): + if self.state != State.OPEN: + raise TransactionalError(f"Transaction is not open but {self.state}") + self.actions.append(RemoveDir(path)) + + def purge(self, path: Path): + if self.state != State.OPEN: + raise TransactionalError(f"Transaction is not open but {self.state}") + self.actions.append(Purge(path)) + + +def begin(lock_path: Path, tm=None) -> Transaction: + """Begin and register a new transaction.""" + trans = Transaction(lock_path) + if tm: + tm.get().join(trans) + else: + transaction.manager.get().join(trans) + return trans diff --git a/fietsboek/views/upload.py b/fietsboek/views/upload.py index 5c86b29..7be5a42 100644 --- a/fietsboek/views/upload.py +++ b/fietsboek/views/upload.py @@ -170,14 +170,9 @@ def do_finish_upload(request): request.dbsession.delete(upload) # Don't forget to add the images + manager = request.data_manager.open(track.id, force=True) LOGGER.debug("Starting to edit images for the upload") - try: - actions.edit_images(request, track) - except Exception: - # We just created the folder, so we'll be fine deleting it - LOGGER.info("Deleting partially created folder for track %d", track.id) - request.data_manager.open(track.id).purge() - raise + actions.edit_images(request, track, manager=manager) request.session.flash(request.localizer.translate(_("flash.upload_success"))) -- cgit v1.2.3