diff options
| -rw-r--r-- | fietsboek/actions.py | 157 | ||||
| -rw-r--r-- | fietsboek/models/track.py | 4 | ||||
| -rw-r--r-- | fietsboek/views/edit.py | 69 | ||||
| -rw-r--r-- | fietsboek/views/upload.py | 38 | ||||
| -rw-r--r-- | tests/playwright/conftest.py | 23 | 
5 files changed, 183 insertions, 108 deletions
diff --git a/fietsboek/actions.py b/fietsboek/actions.py new file mode 100644 index 0000000..dff512f --- /dev/null +++ b/fietsboek/actions.py @@ -0,0 +1,157 @@ +"""High-level actions. + +This module implements the basic logic for high level intents such as "add a +track", "delete a track", ... It combines the low-level APIs of the ORM and the +data manager, and provides functions that can be used by the views, the API and +the test functions. +""" +import datetime +import logging +import re +from typing import List + +from pyramid.request import Request +from sqlalchemy import select +from sqlalchemy.orm.session import Session + +from fietsboek import models, util +from fietsboek.models.track import Visibility, TrackType +from fietsboek.data import DataManager + + +LOGGER = logging.getLogger(__name__) + + +def add_track( +    dbsession: Session, +    data_manager: DataManager, +    owner: models.User, +    title: str, +    date: datetime.datetime, +    visibility: Visibility, +    track_type: TrackType, +    description: str, +    badges: List[models.Badge], +    tagged_people: List[models.User], +    tags: List[str], +    gpx_data: bytes, +) -> models.Track: +    """Adds a track to the database. + +    Note that this function does not do any authorization checking, and as +    such, expects the caller to ensure that everything is in order. + +    Most of the parameters correspond to the attributes of +    :class:`~fietsboek.models.track.Track` objects. + +    :param dbsession: The database session. +    :param data_manager: The data manager. +    :param owner: The owner of the track. +    :param title: Title of the track. +    :param date: Date of the track, should be timezone-aware. +    :param visibility: Track visibility. +    :param track_type: Type of the track. +    :param description: Track description. +    :param badges: Badges to attach to the track. +    :param tagged_people: List of people to tag. +    :param tags: List of text tags for the track. +    :param gpx_data: Actual GPX data (uncompressed, straight from the source). +    :return: The track object that has been inserted into the database. Useful +        for its ``id`` attribute. +    """ +    # pylint: disable=too-many-arguments +    LOGGER.debug("Inserting new track...") +    track = models.Track( +        owner=owner, +        title=title, +        visibility=visibility, +        type=track_type, +        description=description, +        badges=badges, +        link_secret=util.random_link_secret(), +        tagged_people=tagged_people, +    ) +    track.date = date +    track.sync_tags(tags) +    dbsession.add(track) +    dbsession.flush() + +    # Best time to build the cache is right after the upload +    track.ensure_cache(gpx_data) +    dbsession.add(track.cache) + +    # Save the GPX data +    LOGGER.debug("Creating a new data folder for %d", track.id) +    manager = data_manager.initialize(track.id) +    LOGGER.debug("Saving GPX to %s", manager.gpx_path()) +    manager.compress_gpx(gpx_data) +    manager.backup() + +    manager.engrave_metadata( +        title=track.title, +        description=track.description, +        author_name=track.owner.name, +        time=track.date, +    ) + +    return track + + +def edit_images(request: Request, track: models.Track): +    """Edit the images according to the given request. + +    This deletes and adds images and image descriptions as needed, based on the +    ``image[...]`` and ``image-description[...]`` fields. + +    :param request: The request. +    :param track: The track to edit. +    """ +    LOGGER.debug("Editing images for %d", track.id) +    manager = request.data_manager.open(track.id) + +    # Delete requested images +    for image in request.params.getall("delete-image[]"): +        manager.delete_image(image) +        image_meta = request.dbsession.execute( +            select(models.ImageMetadata).filter_by(track_id=track.id, image_name=image) +        ).scalar_one_or_none() +        LOGGER.debug("Deleted image %s %s (metadata: %s)", track.id, image, image_meta) +        if image_meta: +            request.dbsession.delete(image_meta) + +    # Add new images +    set_descriptions = set() +    for param_name, image in request.params.items(): +        match = re.match("image\\[(\\d+)\\]$", param_name) +        if not match: +            continue +        # Sent for the multi input +        if image == b"": +            continue + +        upload_id = match.group(1) +        image_name = manager.add_image(image.file, image.filename) +        image_meta = models.ImageMetadata(track=track, image_name=image_name) +        image_meta.description = request.params.get(f"image-description[{upload_id}]", "") +        request.dbsession.add(image_meta) +        LOGGER.debug("Uploaded image %s %s", track.id, image_name) +        set_descriptions.add(upload_id) + +    images = manager.images() +    # Set image descriptions +    for param_name, description in request.params.items(): +        match = re.match("image-description\\[(.+)\\]$", param_name) +        if not match: +            continue +        image_id = match.group(1) +        # Descriptions that we already set while adding new images can be +        # ignored +        if image_id in set_descriptions: +            continue +        # Did someone give us a wrong ID?! +        if image_id not in images: +            LOGGER.info("Got a ghost image description for track %s: %s", track.id, image_id) +            continue +        image_meta = models.ImageMetadata.get_or_create(request.dbsession, track, image_id) +        image_meta.description = description +        request.dbsession.add(image_meta) diff --git a/fietsboek/models/track.py b/fietsboek/models/track.py index cb4979f..93e37e7 100644 --- a/fietsboek/models/track.py +++ b/fietsboek/models/track.py @@ -15,7 +15,7 @@ import enum  import gzip  import datetime  import logging -from typing import Optional, List, Set, TYPE_CHECKING +from typing import Optional, List, Set, TYPE_CHECKING, Union  from itertools import chain @@ -328,7 +328,7 @@ class Track(Base):          result = ACLHelper().permits(self, principals, "track.view")          return isinstance(result, ACLAllowed) -    def ensure_cache(self, gpx_data: str): +    def ensure_cache(self, gpx_data: Union[str, bytes]):          """Ensure that a cached version of this track's metadata exists.          :param gpx_data: GPX data (uncompressed) from which to build the cache. diff --git a/fietsboek/views/edit.py b/fietsboek/views/edit.py index 6d9cfc6..e6a6796 100644 --- a/fietsboek/views/edit.py +++ b/fietsboek/views/edit.py @@ -1,5 +1,4 @@  """Views for editing a track.""" -import re  import logging  import datetime  from collections import namedtuple @@ -9,7 +8,7 @@ from pyramid.httpexceptions import HTTPFound, HTTPBadRequest  from sqlalchemy import select -from .. import models, util +from .. import models, util, actions  from ..data import TrackDataDir  from ..models.track import Visibility, TrackType @@ -97,7 +96,7 @@ def do_edit(request):          tags = request.params.getall("tag[]")          track.sync_tags(tags) -        edit_images(request, request.context) +        actions.edit_images(request, request.context)          data.engrave_metadata(              title=track.title,              description=track.description, @@ -106,67 +105,3 @@ def do_edit(request):          )      return HTTPFound(request.route_url("details", track_id=track.id)) - - -def edit_images(request, track): -    """Edit the images from the given request. - -    This deletes and adds images and image descriptions as needed, based on the -    image[...] and image-description[...] fields. - -    :param request: The request: -    :type request: pyramid.request.Request -    :param track: The track to edit. -    :type track: fietsboek.models.track.Track -    """ - -    try: -        manager = request.data_manager.open(track.id) -    except FileNotFoundError: -        manager = request.data_manager.initialize(track.id) -    # Delete requested images -    for image in request.params.getall("delete-image[]"): -        manager.delete_image(image) -        image_meta = request.dbsession.execute( -            select(models.ImageMetadata).filter_by(track_id=track.id, image_name=image) -        ).scalar_one_or_none() -        LOGGER.debug("Deleted image %s %s (metadata: %s)", track.id, image, image_meta) -        if image_meta: -            request.dbsession.delete(image_meta) - -    # Add new images -    set_descriptions = set() -    for param_name, image in request.params.items(): -        match = re.match("image\\[(\\d+)\\]$", param_name) -        if not match: -            continue -        # Sent for the multi input -        if image == b"": -            continue - -        upload_id = match.group(1) -        image_name = manager.add_image(image.file, image.filename) -        image_meta = models.ImageMetadata(track=track, image_name=image_name) -        image_meta.description = request.params.get(f"image-description[{upload_id}]", "") -        request.dbsession.add(image_meta) -        LOGGER.debug("Uploaded image %s %s", track.id, image_name) -        set_descriptions.add(upload_id) - -    images = manager.images() -    # Set image descriptions -    for param_name, description in request.params.items(): -        match = re.match("image-description\\[(.+)\\]$", param_name) -        if not match: -            continue -        image_id = match.group(1) -        # Descriptions that we already set while adding new images can be -        # ignored -        if image_id in set_descriptions: -            continue -        # Did someone give us a wrong ID?! -        if image_id not in images: -            LOGGER.info("Got a ghost image description for track %s: %s", track.id, image_id) -            continue -        image_meta = models.ImageMetadata.get_or_create(request.dbsession, track, image_id) -        image_meta.description = description -        request.dbsession.add(image_meta) diff --git a/fietsboek/views/upload.py b/fietsboek/views/upload.py index 7f7820f..98aad3b 100644 --- a/fietsboek/views/upload.py +++ b/fietsboek/views/upload.py @@ -11,8 +11,7 @@ from sqlalchemy import select  import gpxpy -from . import edit -from .. import models, util +from .. import models, util, actions  from ..models.track import Visibility, TrackType @@ -160,44 +159,25 @@ def do_finish_upload(request):      date = datetime.datetime.fromisoformat(request.params["date"])      date = date.replace(tzinfo=datetime.timezone(tz_offset)) -    track = models.Track( +    track = actions.add_track( +        request.dbsession, +        request.data_manager,          owner=request.identity,          title=request.params["title"],          visibility=Visibility[request.params["visibility"]], -        type=TrackType[request.params["type"]], +        track_type=TrackType[request.params["type"]],          description=request.params["description"],          badges=badges, -        link_secret=util.random_link_secret(),          tagged_people=tagged_people, +        date=date, +        tags=request.params.getall("tag[]"), +        gpx_data=upload.gpx_data,      ) -    track.date = date -    tags = request.params.getall("tag[]") -    track.sync_tags(tags) -    request.dbsession.add(track)      request.dbsession.delete(upload) -    request.dbsession.flush() - -    # Best time to build the cache is right after the upload -    track.ensure_cache(upload.gpx_data) -    request.dbsession.add(track.cache) - -    # Save the GPX data -    LOGGER.debug("Creating a new data folder for %d", track.id) -    manager = request.data_manager.initialize(track.id) -    LOGGER.debug("Saving GPX to %s", manager.gpx_path()) -    manager.compress_gpx(upload.gpx_data) -    manager.backup()      # Don't forget to add the images      LOGGER.debug("Starting to edit images for the upload") -    edit.edit_images(request, track) - -    manager.engrave_metadata( -        title=track.title, -        description=track.description, -        author_name=track.owner.name, -        time=track.date, -    ) +    actions.edit_images(request, track)      request.session.flash(request.localizer.translate(_("flash.upload_success"))) diff --git a/tests/playwright/conftest.py b/tests/playwright/conftest.py index 21edca8..18b7ad0 100644 --- a/tests/playwright/conftest.py +++ b/tests/playwright/conftest.py @@ -8,7 +8,7 @@ from pyramid.authentication import AuthTktCookieHelper  from pyramid.testing import DummyRequest  from testutils import load_gpx_asset -from fietsboek import models, util +from fietsboek import models, util, actions  from fietsboek.models.track import Visibility, TrackType  from fietsboek.config import Config @@ -114,20 +114,23 @@ class Helper:              user = self.john_doe()          with self.dbaccess:              user = self.dbaccess.merge(user) -            track = models.Track( +            track = actions.add_track( +                self.dbaccess, +                self.data_manager, +                owner=user,                  title="Another awesome track",                  visibility=Visibility.PRIVATE,                  description="Another description", -                type=TrackType.ORGANIC, -                link_secret=util.random_link_secret(), +                track_type=TrackType.ORGANIC, +                date=datetime.datetime(2022, 12, 21, 17, 5, tzinfo=datetime.timezone.utc), +                tags=[], +                badges=[], +                tagged_people=[], +                gpx_data=load_gpx_asset(track_name),              ) -            track.date = datetime.datetime(2022, 12, 21, 17, 5, tzinfo=datetime.timezone.utc) -            user.tracks.append(track) -            self.dbaccess.flush() -            self.dbaccess.refresh(track, ["id"]) -            self.dbaccess.expunge(track) -            self.data_manager.initialize(track.id).compress_gpx(load_gpx_asset(track_name))              self.dbaccess.commit() +            self.dbaccess.refresh(track, ["id", "link_secret"]) +            self.dbaccess.expunge(track)              return track  | 
