aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Schadt <kingdread@gmx.de>2025-12-29 13:09:58 +0100
committerDaniel Schadt <kingdread@gmx.de>2025-12-29 13:09:58 +0100
commit9bea2637e5a10c52e49299b22598297b35412373 (patch)
treec08eca6c3693f338c2223d76cc84624160552f70
parenta200d26e79b40836de3e424f95d03eefcb2115f0 (diff)
downloadfietsboek-9bea2637e5a10c52e49299b22598297b35412373.tar.gz
fietsboek-9bea2637e5a10c52e49299b22598297b35412373.tar.bz2
fietsboek-9bea2637e5a10c52e49299b22598297b35412373.zip
initial filesystem transactionsfs-transactions
This should solve issues that arise when exceptions occur during upload. Hooks into the transaction/pyramid_tm machinery.
-rw-r--r--fietsboek/__init__.py10
-rw-r--r--fietsboek/actions.py46
-rw-r--r--fietsboek/data.py155
-rw-r--r--fietsboek/fstrans.py231
-rw-r--r--fietsboek/views/upload.py9
5 files changed, 308 insertions, 143 deletions
diff --git a/fietsboek/__init__.py b/fietsboek/__init__.py
index 2ac41e3..b533e97 100644
--- a/fietsboek/__init__.py
+++ b/fietsboek/__init__.py
@@ -31,6 +31,7 @@ from pyramid.response import Response
from pyramid.session import SignedCookieSessionFactory
from . import config as mod_config
+from . import fstrans
from . import jinja2 as mod_jinja2
from . import transformers
from .data import DataManager
@@ -92,7 +93,10 @@ def maintenance_mode(
"""
def tween(request: Request) -> Response:
- maintenance = request.data_manager.maintenance_mode()
+ # We don't want to mess around with transactioned data mangers here,
+ # so we create a new one without transaction
+ data_manager = DataManager(Path(request.config.data_dir))
+ maintenance = data_manager.maintenance_mode()
if maintenance is None:
return handler(request)
@@ -155,7 +159,9 @@ def main(global_config, **settings):
check_db_engine(parsed_config.sqlalchemy_url)
def data_manager(request):
- return DataManager(Path(request.config.data_dir))
+ data_dir = Path(request.config.data_dir)
+ lock_file = data_dir / "lock"
+ return DataManager(data_dir, txn=fstrans.begin(lock_file, request.tm))
def redis_(request):
return redis.from_url(request.config.redis_url)
diff --git a/fietsboek/actions.py b/fietsboek/actions.py
index 2d23d97..f9f36ac 100644
--- a/fietsboek/actions.py
+++ b/fietsboek/actions.py
@@ -93,29 +93,29 @@ def add_track(
# Save the GPX data
LOGGER.debug("Creating a new data folder for %d", track.id)
assert track.id is not None
- with data_manager.initialize(track.id) as manager:
- LOGGER.debug("Saving backup to %s", manager.backup_path())
- manager.compress_backup(gpx_data)
-
- for transformer in transformers:
- LOGGER.debug("Running %s with %r", transformer, transformer.parameters)
- transformer.execute(path)
- track.transformers = [
- [tfm.identifier(), tfm.parameters.model_dump()] for tfm in transformers
- ]
-
- track.fast_set_path(path)
-
- # Best time to build the cache is right after the upload, but *after* the
- # transformers have been applied!
- track.ensure_cache(path)
- dbsession.add(track.cache)
-
- LOGGER.debug("Building preview image for %s", track.id)
- preview_image = trackmap.render(path, layer, tile_requester)
- image_io = io.BytesIO()
- preview_image.save(image_io, "PNG")
- manager.set_preview(image_io.getvalue())
+ manager = data_manager.initialize(track.id)
+ LOGGER.debug("Saving backup to %s", manager.backup_path())
+ manager.compress_backup(gpx_data)
+
+ for transformer in transformers:
+ LOGGER.debug("Running %s with %r", transformer, transformer.parameters)
+ transformer.execute(path)
+ track.transformers = [
+ [tfm.identifier(), tfm.parameters.model_dump()] for tfm in transformers
+ ]
+
+ track.fast_set_path(path)
+
+ # Best time to build the cache is right after the upload, but *after* the
+ # transformers have been applied!
+ track.ensure_cache(path)
+ dbsession.add(track.cache)
+
+ LOGGER.debug("Building preview image for %s", track.id)
+ preview_image = trackmap.render(path, layer, tile_requester)
+ image_io = io.BytesIO()
+ preview_image.save(image_io, "PNG")
+ manager.set_preview(image_io.getvalue())
return track
diff --git a/fietsboek/data.py b/fietsboek/data.py
index 3fcd922..c75a451 100644
--- a/fietsboek/data.py
+++ b/fietsboek/data.py
@@ -19,6 +19,7 @@ import brotli
from filelock import FileLock
from . import util
+from .fstrans import Transaction
LOGGER = logging.getLogger(__name__)
@@ -50,8 +51,9 @@ class DataManager:
:ivar data_dir: Path to the data folder.
"""
- def __init__(self, data_dir: Path):
+ def __init__(self, data_dir: Path, *, txn: Transaction | None = None):
self.data_dir: Path = data_dir
+ self.txn = txn
def _track_data_dir(self, track_id):
return self.data_dir / "tracks" / str(track_id)
@@ -81,8 +83,11 @@ class DataManager:
:return: The manager that can be used to manage this track's data.
"""
path = self._track_data_dir(track_id)
- path.mkdir(parents=True)
- return TrackDataDir(track_id, path, journal=True, is_fresh=True)
+ if self.txn:
+ self.txn.make_dir(path)
+ else:
+ path.mkdir(parents=True)
+ return TrackDataDir(track_id, path, txn=self.txn)
def initialize_user(self, user_id: int) -> "UserDataDir":
"""Creates the data directory for a user.
@@ -92,8 +97,11 @@ class DataManager:
:return: The manager that can be used to manage this user's data.
"""
path = self._user_data_dir(user_id)
- path.mkdir(parents=True)
- return UserDataDir(user_id, path)
+ if self.txn:
+ self.txn.make_dir(path)
+ else:
+ path.mkdir(parents=True)
+ return UserDataDir(user_id, path, txn=self.txn)
def purge(self, track_id: int):
"""Forcefully purges all data from the given track.
@@ -101,9 +109,9 @@ class DataManager:
This function logs errors but raises no exception, as such it can
always be used to clean up after a track.
"""
- TrackDataDir(track_id, self._track_data_dir(track_id)).purge()
+ TrackDataDir(track_id, self._track_data_dir(track_id), txn=self.txn).purge()
- def open(self, track_id: int) -> "TrackDataDir":
+ def open(self, track_id: int, *, force: bool = False) -> "TrackDataDir":
"""Opens a track's data directory.
:raises FileNotFoundError: If the track directory does not exist.
@@ -111,9 +119,9 @@ class DataManager:
:return: The manager that can be used to manage this track's data.
"""
path = self._track_data_dir(track_id)
- if not path.is_dir():
+ if not force and not path.is_dir():
raise FileNotFoundError(f"The path {path} is not a directory") from None
- return TrackDataDir(track_id, path)
+ return TrackDataDir(track_id, path, txn=self.txn)
def open_user(self, user_id: int) -> "UserDataDir":
"""Opens a user's data directory.
@@ -125,7 +133,7 @@ class DataManager:
path = self._user_data_dir(user_id)
if not path.is_dir():
raise FileNotFoundError(f"The path {path} is not a directory") from None
- return UserDataDir(user_id, path)
+ return UserDataDir(user_id, path, txn=self.txn)
def size(self) -> int:
"""Returns the size of all data.
@@ -164,86 +172,10 @@ class TrackDataDir:
semantics, use ``journal = False``.
"""
- def __init__(self, track_id: int, path: Path, *, journal: bool = False, is_fresh: bool = False):
+ def __init__(self, track_id: int, path: Path, *, txn: Transaction | None = None):
self.track_id: int = track_id
self.path: Path = path
- self.journal: Optional[list] = [] if journal else None
- self.is_fresh = is_fresh
-
- def __enter__(self) -> "TrackDataDir":
- if self.journal is None:
- self.journal = []
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb) -> Literal[False]:
- if exc_type is None and exc_val is None and exc_tb is None:
- self.commit()
- else:
- self.rollback()
- return False
-
- def rollback(self):
- """Rolls back the journal, e.g. in case of error.
-
- :raises ValueError: If the data directory was opened without the
- journal, this raises :exc:`ValueError`.
- """
- LOGGER.debug("Rolling back state of %s", self.path)
-
- if self.journal is None:
- raise ValueError("Rollback on a non-journalling data directory")
-
- if self.is_fresh:
- # Shortcut if the directory is fresh, simply remove everything
- self.journal = None
- self.purge()
- return
-
- for action, *rest in reversed(self.journal):
- if action == "purge":
- (new_name,) = rest
- shutil.move(new_name, self.path)
- elif action == "add_image":
- (image_path,) = rest
- image_path.unlink()
- elif action == "delete_image":
- path, data = rest
- path.write_bytes(data)
- elif action == "set_preview":
- old_data = rest
- if old_data is None:
- self.preview_path().unlink()
- else:
- self.preview_path().write_bytes(old_data)
-
- self.journal = None
-
- def commit(self):
- """Commits all changed and deletes the journal.
-
- Note that this function will do nothing if the journal is disabled,
- meaning it can always be called.
- """
- LOGGER.debug("Committing journal for %s", self.path)
-
- if self.journal is None:
- return
-
- for action, *rest in reversed(self.journal):
- if action == "purge":
- (new_name,) = rest
- shutil.rmtree(new_name, ignore_errors=False, onerror=self._log_deletion_error)
- elif action == "add_image":
- # Nothing to do here, the image is already saved
- pass
- elif action == "delete_image":
- # Again, nothing to do here, we simply discard the in-memory image data
- pass
- elif action == "set_preview":
- # Still nothing to do here
- pass
-
- self.journal = None
+ self.txn = txn
def lock(self) -> FileLock:
"""Returns a FileLock that can be used to lock access to the track's
@@ -263,13 +195,11 @@ class TrackDataDir:
This function logs errors but raises no exception, as such it can
always be used to clean up after a track.
"""
- if self.journal is None:
+ if self.txn:
+ self.txn.purge(self.path)
+ else:
if self.path.is_dir():
shutil.rmtree(self.path, ignore_errors=False, onerror=self._log_deletion_error)
- else:
- new_name = self.path.with_name("trash-" + self.path.name)
- shutil.move(self.path, new_name)
- self.journal.append(("purge", new_name))
def size(self) -> int:
"""Returns the size of the data that this track entails.
@@ -289,7 +219,10 @@ class TrackDataDir:
quality but slowest compression speed.
"""
compressed = brotli.compress(data, quality=quality)
- self.backup_path().write_bytes(compressed)
+ if self.txn:
+ self.txn.write_bytes(self.backup_path(), compressed)
+ else:
+ self.backup_path().write_bytes(compressed)
def decompress_backup(self) -> bytes:
"""Returns the backup bytes decompressed.
@@ -337,15 +270,18 @@ class TrackDataDir:
:return: The ID of the saved image.
"""
image_dir = self.path / "images"
- image_dir.mkdir(parents=True, exist_ok=True)
+ if self.txn:
+ self.txn.make_dir(image_dir, exist_ok=True)
+ else:
+ image_dir.mkdir(parents=True, exist_ok=True)
filename = generate_filename(filename)
path = image_dir / filename
- with open(path, "wb") as fobj:
- shutil.copyfileobj(image, fobj)
-
- if self.journal is not None:
- self.journal.append(("add_image", path))
+ if self.txn:
+ self.txn.write_bytes(path, image.read())
+ else:
+ with open(path, "wb") as fobj:
+ shutil.copyfileobj(image, fobj)
return filename
@@ -361,10 +297,10 @@ class TrackDataDir:
return
path = self.image_path(image_id)
- if self.journal is not None:
- self.journal.append(("delete_image", path, path.read_bytes()))
-
- path.unlink()
+ if self.txn:
+ self.txn.unlink(path)
+ else:
+ path.unlink()
def preview_path(self) -> Path:
"""Gets the path to the "preview image".
@@ -378,13 +314,10 @@ class TrackDataDir:
:param data: The data of the preview image.
"""
- if self.journal is not None:
- try:
- previous_preview = self.preview_path().read_bytes()
- except FileNotFoundError:
- previous_preview = None
- self.journal.append(("set_preview", previous_preview))
- self.preview_path().write_bytes(data)
+ if self.txn:
+ self.txn.write_bytes(self.preview_path(), data)
+ else:
+ self.preview_path().write_bytes(data)
class UserDataDir:
diff --git a/fietsboek/fstrans.py b/fietsboek/fstrans.py
new file mode 100644
index 0000000..8c73f94
--- /dev/null
+++ b/fietsboek/fstrans.py
@@ -0,0 +1,231 @@
+"""Filesystem transactions."""
+
+import enum
+import logging
+import shutil
+import transaction
+import uuid
+from filelock import FileLock
+from pathlib import Path
+
+LOGGER = logging.getLogger(__name__)
+
+
+def _log_deletion_error(_, path, exc_info):
+ LOGGER.warning("Failed to remove %s", path, exc_info=exc_info)
+
+
+class TransactionalError(Exception):
+ pass
+
+
+class State(enum.Enum):
+ OPEN = enum.auto()
+ COMMITTING = enum.auto()
+ COMMITTED = enum.auto()
+ TAINTED = enum.auto()
+
+
+class Action:
+ def commit_1(self):
+ pass
+
+ def commit_2(self):
+ pass
+
+ def undo(self):
+ pass
+
+
+class WriteBytes(Action):
+ def __init__(self, path: Path, data: bytes):
+ self.path = path
+ self.data = data
+ self.old = None
+
+ def __repr__(self):
+ return f"<WriteBytes {len(self.data)} to {self.path}>"
+
+ def commit_1(self):
+ try:
+ self.old = self.path.read_bytes()
+ except FileNotFoundError:
+ self.old = None
+ self.path.write_bytes(self.data)
+
+ def undo(self):
+ if self.old is None:
+ self.path.unlink()
+ else:
+ self.path.write_bytes(self.old)
+
+
+class Unlink(Action):
+ def __init__(self, path: Path):
+ self.path = path
+ self.old = None
+
+ def __repr__(self):
+ return f"<Unlink {self.path}>"
+
+ def commit_1(self):
+ self.old = self.path.read_bytes()
+ self.path.unlink()
+
+ def undo(self):
+ self.path.write_bytes(self.old)
+
+
+class MakeDir(Action):
+ def __init__(self, path: Path, exist_ok: bool = False):
+ self.path = path
+ self.exist_ok = exist_ok
+ self.existed = None
+
+ def __repr__(self):
+ return f"<MakeDir {self.path}>"
+
+ def commit_1(self):
+ self.existed = self.path.is_dir()
+ self.path.mkdir(exist_ok=self.exist_ok)
+
+ def undo(self):
+ if not self.existed:
+ self.path.rmdir()
+
+
+class RemoveDir(Action):
+ def __init__(self, path: Path):
+ self.path = path
+
+ def __repr__(self):
+ return f"<RemoveDir {self.path}>"
+
+ def commit_1(self):
+ self.path.rmdir()
+
+ def undo(self):
+ self.path.mkdir()
+
+
+class Purge(Action):
+ def __init__(self, path: Path):
+ self.path = path
+
+ def __repr__(self):
+ return f"<Purge {self.path}>"
+
+ def commit_2(self):
+ shutil.rmtree(self.path, ignore_errors=False, onerror=_log_deletion_error)
+
+
+class Transaction:
+ def __init__(self, lock_path: Path):
+ self.actions: list[Action] = []
+ self.actions_done: list[Action] = []
+ self.state = State.OPEN
+ self.lock_path = lock_path
+ self.id = uuid.uuid4().hex
+
+ def tpc_begin(self, _trans):
+ pass
+
+ def tpc_vote(self, _trans):
+ if not self.actions:
+ return
+
+ with FileLock(self.lock_path):
+ self.commit_1()
+
+ def tpc_finish(self, _trans):
+ if not self.actions:
+ return
+
+ with FileLock(self.lock_path):
+ self.commit_2()
+
+ def tpc_abort(self, _trans):
+ if not self.actions_done:
+ return
+
+ with FileLock(self.lock_path):
+ self.undo()
+
+ def sortKey(self):
+ return f"filesystem:{self.id}"
+
+ def undo(self):
+ for action in reversed(self.actions_done):
+ LOGGER.debug("Undoing %s", action)
+ try:
+ action.undo()
+ except Exception as exc_inner:
+ LOGGER.debug("Exception ignored: %s", exc_inner)
+
+ def commit(self, _trans):
+ pass
+
+ def commit_1(self):
+ if self.state != State.OPEN:
+ raise TransactionalError(f"Transaction is not open but {self.state}")
+
+ self.state = State.COMMITTING
+
+ try:
+ for action in self.actions:
+ LOGGER.debug("Executing 1st phase %s", action)
+ action.commit_1()
+ self.actions_done.append(action)
+ except Exception as exc:
+ LOGGER.debug("Exception while committing")
+ self.state = State.TAINTED
+ raise exc
+
+ def commit_2(self):
+ if self.state != State.COMMITTING:
+ raise TransactionalError(f"Transaction is not committing but {self.state}")
+
+ self.state = State.TAINTED
+ for action in self.actions:
+ LOGGER.debug("Executing 2nd phase %s", action)
+ action.commit_2()
+ self.state = State.COMMITTED
+
+ def abort(self, _trans=None):
+ self.actions.clear()
+ self.state = State.TAINTED
+
+ def write_bytes(self, path: Path, data: bytes):
+ if self.state != State.OPEN:
+ raise TransactionalError(f"Transaction is not open but {self.state}")
+ self.actions.append(WriteBytes(path, data))
+
+ def unlink(self, path: Path):
+ if self.state != State.OPEN:
+ raise TransactionalError(f"Transaction is not open but {self.state}")
+ self.actions.append(Unlink(path))
+
+ def make_dir(self, path: Path, *, exist_ok: bool = False):
+ if self.state != State.OPEN:
+ raise TransactionalError(f"Transaction is not open but {self.state}")
+ self.actions.append(MakeDir(path, exist_ok=exist_ok))
+
+ def remove_dir(self, path: Path):
+ if self.state != State.OPEN:
+ raise TransactionalError(f"Transaction is not open but {self.state}")
+ self.actions.append(RemoveDir(path))
+
+ def purge(self, path: Path):
+ if self.state != State.OPEN:
+ raise TransactionalError(f"Transaction is not open but {self.state}")
+ self.actions.append(Purge(path))
+
+
+def begin(lock_path: Path, tm=None) -> Transaction:
+ """Begin and register a new transaction."""
+ trans = Transaction(lock_path)
+ if tm:
+ tm.get().join(trans)
+ else:
+ transaction.manager.get().join(trans)
+ return trans
diff --git a/fietsboek/views/upload.py b/fietsboek/views/upload.py
index 5c86b29..7be5a42 100644
--- a/fietsboek/views/upload.py
+++ b/fietsboek/views/upload.py
@@ -170,14 +170,9 @@ def do_finish_upload(request):
request.dbsession.delete(upload)
# Don't forget to add the images
+ manager = request.data_manager.open(track.id, force=True)
LOGGER.debug("Starting to edit images for the upload")
- try:
- actions.edit_images(request, track)
- except Exception:
- # We just created the folder, so we'll be fine deleting it
- LOGGER.info("Deleting partially created folder for track %d", track.id)
- request.data_manager.open(track.id).purge()
- raise
+ actions.edit_images(request, track, manager=manager)
request.session.flash(request.localizer.translate(_("flash.upload_success")))