diff options
| -rw-r--r-- | fietsboek/actions.py | 36 | ||||
| -rw-r--r-- | fietsboek/convert.py | 86 | ||||
| -rw-r--r-- | fietsboek/data.py | 84 | ||||
| -rw-r--r-- | fietsboek/geo.py | 21 | ||||
| -rw-r--r-- | fietsboek/views/upload.py | 27 |
5 files changed, 100 insertions, 154 deletions
diff --git a/fietsboek/actions.py b/fietsboek/actions.py index 3f14308..9c1142b 100644 --- a/fietsboek/actions.py +++ b/fietsboek/actions.py @@ -19,7 +19,7 @@ from pyramid.request import Request from sqlalchemy import select from sqlalchemy.orm.session import Session -from . import email, models, trackmap +from . import convert, email, geo, models, trackmap from . import transformers as mod_transformers from . import util from .config import TileLayerConfig @@ -74,16 +74,15 @@ def add_track( """ # pylint: disable=too-many-positional-arguments,too-many-locals,too-many-arguments LOGGER.debug("Inserting new track...") - track = models.Track( - owner=owner, - title=title, - visibility=visibility, - type=track_type, - description=description, - badges=badges, - link_secret=util.random_link_secret(), - tagged_people=tagged_people, - ) + track = convert.smart_convert(gpx_data) + track.owner = owner + track.title = title + track.visibility = visibility + track.type = track_type + track.description = description + track.badges = badges + track.link_secret = util.random_link_secret() + track.tagged_people = tagged_people track.date = date track.sync_tags(tags) dbsession.add(track) @@ -93,11 +92,10 @@ def add_track( LOGGER.debug("Creating a new data folder for %d", track.id) assert track.id is not None with data_manager.initialize(track.id) as manager: - LOGGER.debug("Saving GPX to %s", manager.gpx_path()) - manager.compress_gpx(gpx_data) - manager.backup() + LOGGER.debug("Saving backup to %s", manager.backup_path()) + manager.compress_backup(gpx_data) - gpx = gpxpy.parse(gpx_data) + gpx = gpxpy.parse(track.gpx_xml()) for transformer in transformers: LOGGER.debug("Running %s with %r", transformer, transformer.parameters) transformer.execute(gpx) @@ -116,14 +114,6 @@ def add_track( preview_image.save(image_io, "PNG") manager.set_preview(image_io.getvalue()) - manager.engrave_metadata( - title=track.title, - description=track.description, - author_name=track.owner.name, - time=track.date, - gpx=gpx, - ) - return track diff --git a/fietsboek/convert.py b/fietsboek/convert.py index d3bfb22..2e8b5db 100644 --- a/fietsboek/convert.py +++ b/fietsboek/convert.py @@ -1,7 +1,12 @@ """Conversion functions to convert between various recording formats.""" +import datetime +from typing import Optional import fitparse -from gpxpy.gpx import GPX, GPXTrack, GPXTrackPoint, GPXTrackSegment +import gpxpy + +from . import geo, util +from .models import Track FIT_RECORD_FIELDS = ["position_lat", "position_long", "altitude", "timestamp"] @@ -15,8 +20,8 @@ def semicircles_to_deg(circles: int) -> float: return circles * (180 / 2**31) -def from_fit(data: bytes) -> GPX: - """Reads a .fit as GPX data. +def from_fit(data: bytes) -> Track: + """Reads a .fit as track data. This uses the fitparse_ library under the hood. @@ -26,30 +31,81 @@ def from_fit(data: bytes) -> GPX: :return: The converted structure. """ fitfile = fitparse.FitFile(data) + start_time = None points = [] for record in fitfile.get_messages("record"): values = record.get_values() try: if any(values[field] is None for field in FIT_RECORD_FIELDS): continue - point = GPXTrackPoint( + time = values["timestamp"] + if start_time is None: + start_time = time + point = geo.Point( latitude=semicircles_to_deg(values["position_lat"]), longitude=semicircles_to_deg(values["position_long"]), elevation=values["altitude"], - time=values["timestamp"], + time_offset=time - start_time, ) except KeyError: pass else: points.append(point) - track = GPXTrack() - track.segments = [GPXTrackSegment(points)] - gpx = GPX() - gpx.tracks = [track] - return gpx + path = geo.Path(points) + track = Track() + track.set_path(path) + return track + + +def from_gpx(data: bytes) -> Track: + """Reads a .gpx as track data. + + This uses the gpxpy_ library under the hood. + .. _gpxpy: https://github.com/tkrajina/gpxpy -def smart_convert(data: bytes) -> bytes: + :param data: The input bytes. + :return: The converted structure. + """ + gpx = gpxpy.parse(data) + points = [] + start_time = None + + for track in gpx.tracks: + for segment in track.segments: + for point in segment.points: + if start_time is None: + start_time = point.time + + time_offset = (point.time - start_time).total_seconds() + points.append(geo.Point( + longitude=point.longitude, + latitude=point.latitude, + elevation=point.elevation, + time_offset=time_offset, + )) + + timezone = util.guess_gpx_timezone(gpx) + date = gpx.time or gpx.get_time_bounds().start_time or datetime.datetime.now() + date = date.astimezone(timezone) + track_name = gpx.name + track_desc = gpx.description + for track in gpx.tracks: + if not track_name and track.name: + track_name = track.name + if not track_desc and track.description: + track_desc = track.description + + path = geo.Path(points) + track = Track() + track.set_path(path) + track.title = track_name + track.description = track_desc + track.date = date + return track + + +def smart_convert(data: bytes) -> Optional[Track]: """Tries to be smart in converting the input bytes. This function automatically applies the correct conversion if possible. @@ -61,8 +117,10 @@ def smart_convert(data: bytes) -> bytes: :return: The converted content. """ if len(data) > 11 and data[9:12] == b"FIT": - return from_fit(data).to_xml().encode("utf-8") - return data + return from_fit(data) + if data.startswith(b"<?xml") and b"<gpx" in data[:200]: + return from_gpx(data) + return None -__all__ = ["from_fit", "smart_convert"] +__all__ = ["from_fit", "from_gpx", "smart_convert"] diff --git a/fietsboek/data.py b/fietsboek/data.py index 9d5a133..c7fa7f4 100644 --- a/fietsboek/data.py +++ b/fietsboek/data.py @@ -199,12 +199,6 @@ class TrackDataDir: if action == "purge": (new_name,) = rest shutil.move(new_name, self.path) - elif action == "compress_gpx": - (old_data,) = rest - if old_data is None: - self.gpx_path().unlink() - else: - self.gpx_path().write_bytes(old_data) elif action == "add_image": (image_path,) = rest image_path.unlink() @@ -235,9 +229,6 @@ class TrackDataDir: if action == "purge": (new_name,) = rest shutil.rmtree(new_name, ignore_errors=False, onerror=self._log_deletion_error) - elif action == "compress_gpx": - # Nothing to do here, the new data is already on the disk - pass elif action == "add_image": # Nothing to do here, the image is already saved pass @@ -283,86 +274,25 @@ class TrackDataDir: """ return util.recursive_size(self.path) - def gpx_path(self) -> Path: - """Returns the path of the GPX file. - - This file contains the (brotli) compressed GPX data. - - :return: The path where the GPX is supposed to be. - """ - return self.path / "track.gpx.br" - - def compress_gpx(self, data: bytes, quality: int = 4): - """Set the GPX content to the compressed form of data. + def compress_backup(self, data: bytes, quality: int = 4): + """Set the content of the backup to the compressed form of data. - If you want to write compressed data directly, use :meth:`gpx_path` to + If you want to write compressed data directly, use :meth:`backup_path` to get the path of the GPX file. :param data: The GPX data (uncompressed). :param quality: Compression quality, from 0 to 11 - 11 is highest quality but slowest compression speed. """ - if self.journal is not None: - # First, we check if we already saved an old state of the GPX data - for action, *_ in self.journal: - if action == "compress_gpx": - break - else: - # We did not save a state yet - old_data = None if not self.gpx_path().is_file() else self.gpx_path().read_bytes() - self.journal.append(("compress_gpx", old_data)) - compressed = brotli.compress(data, quality=quality) - self.gpx_path().write_bytes(compressed) + self.backup_path().write_bytes(compressed) - def decompress_gpx(self) -> bytes: - """Returns the GPX bytes decompressed. + def decompress_backup(self) -> bytes: + """Returns the backup bytes decompressed. :return: The saved GPX file, decompressed. """ - return brotli.decompress(self.gpx_path().read_bytes()) - - def engrave_metadata( - self, - title: Optional[str], - description: Optional[str], - author_name: Optional[str], - time: Optional[datetime.datetime], - *, - gpx: Optional[gpxpy.gpx.GPX] = None, - ): - """Engrave the given metadata into the GPX file. - - Note that this will overwrite all existing metadata in the given - fields. - - If ``None`` is given, it will erase that specific part of the metadata. - - :param title: The title of the track. - :param description: The description of the track. - :param creator: Name of the track's creator. - :param time: Time of the track. - :param gpx: The pre-parsed GPX track, to save time if it is already parsed. - """ - # pylint: disable=too-many-arguments - if gpx is None: - gpx = gpxpy.parse(self.decompress_gpx()) - # First we delete the existing metadata - for track in gpx.tracks: - track.name = None - track.description = None - - # Now we add the new metadata - gpx.author_name = author_name - gpx.name = title - gpx.description = description - gpx.time = time - - self.compress_gpx(util.encode_gpx(gpx)) - - def backup(self): - """Create a backup of the GPX file.""" - shutil.copy(self.gpx_path(), self.backup_path()) + return brotli.decompress(self.backup_path().read_bytes()) def backup_path(self) -> Path: """Path of the GPX backup file. diff --git a/fietsboek/geo.py b/fietsboek/geo.py index c665705..348a4b9 100644 --- a/fietsboek/geo.py +++ b/fietsboek/geo.py @@ -81,27 +81,6 @@ class Point: class Path: - @classmethod - def from_gpx(cls, gpx: gpxpy.gpx.GPX) -> "Path": - points = [] - start_time = None - - for track in gpx.tracks: - for segment in track.segments: - for point in segment.points: - if start_time is None: - start_time = point.time - - time_offset = (point.time - start_time).total_seconds() - points.append(Point( - longitude=point.longitude, - latitude=point.latitude, - elevation=point.elevation, - time_offset=time_offset, - )) - - return cls(points) - def __init__(self, points: list[Point]): self.points = points diff --git a/fietsboek/views/upload.py b/fietsboek/views/upload.py index 3eb1099..84eb95f 100644 --- a/fietsboek/views/upload.py +++ b/fietsboek/views/upload.py @@ -54,14 +54,12 @@ def do_upload(request): request.session.flash(request.localizer.translate(_("flash.no_file_selected"))) return HTTPFound(request.route_url("upload")) - gpx = convert.smart_convert(gpx) - # Before we do anything, we check if we can parse the file. # gpxpy might throw different exceptions, so we simply catch `Exception` # here - if we can't parse it, we don't care too much why at this point. # pylint: disable=broad-except try: - gpxpy.parse(gpx) + track = convert.smart_convert(gpx) except Exception as exc: request.session.flash(request.localizer.translate(_("flash.invalid_file"))) LOGGER.info("Could not parse gpx: %s", exc) @@ -73,7 +71,7 @@ def do_upload(request): owner=request.identity, uploaded_at=now, ) - upload.gpx_data = gpx + upload.gpx_data = track.gpx_xml() request.dbsession.add(upload) request.dbsession.flush() @@ -111,28 +109,19 @@ def finish_upload(request): upload = request.context badges = request.dbsession.execute(select(models.Badge)).scalars() badges = [(False, badge) for badge in badges] - gpx = gpxpy.parse(upload.gpx_data) - timezone = util.guess_gpx_timezone(gpx) - date = gpx.time or gpx.get_time_bounds().start_time or datetime.datetime.now() - date = date.astimezone(timezone) - tz_offset = timezone.utcoffset(date) + track = convert.smart_convert(upload.gpx_data) + timezone = track.date.tzinfo + tz_offset = timezone.utcoffset(track.date) tz_offset = 0 if tz_offset is None else tz_offset.total_seconds() - track_name = "" - track_desc = "" - for track in gpx.tracks: - if not track_name and track.name: - track_name = track.name - if not track_desc and track.description: - track_desc = track.description return { "preview_id": upload.id, - "upload_title": gpx.name or track_name, - "upload_date": date, + "upload_title": track.title, + "upload_date": track.date, "upload_date_tz": int(tz_offset // 60), "upload_visibility": Visibility.PRIVATE, "upload_type": TrackType.ORGANIC, - "upload_description": gpx.description or track_desc, + "upload_description": track.description, "upload_tags": set(), "upload_tagged_people": [], "badges": badges, |
