diff options
-rw-r--r-- | doc/user/images/fixed_elevation_jump.png | bin | 0 -> 61739 bytes | |||
-rw-r--r-- | doc/user/images/wrong_elevation_jump.png | bin | 0 -> 39593 bytes | |||
-rw-r--r-- | doc/user/transformers.rst | 29 | ||||
-rw-r--r-- | fietsboek/transformers/__init__.py | 100 | ||||
-rw-r--r-- | fietsboek/transformers/elevation.py | 147 | ||||
-rw-r--r-- | tests/assets/Elevation_Jump.gpx.gz | bin | 0 -> 4417 bytes | |||
-rw-r--r-- | tests/playwright/test_transformers.py | 29 |
7 files changed, 209 insertions, 96 deletions
diff --git a/doc/user/images/fixed_elevation_jump.png b/doc/user/images/fixed_elevation_jump.png Binary files differnew file mode 100644 index 0000000..4d1334a --- /dev/null +++ b/doc/user/images/fixed_elevation_jump.png diff --git a/doc/user/images/wrong_elevation_jump.png b/doc/user/images/wrong_elevation_jump.png Binary files differnew file mode 100644 index 0000000..03d454d --- /dev/null +++ b/doc/user/images/wrong_elevation_jump.png diff --git a/doc/user/transformers.rst b/doc/user/transformers.rst index d053052..7ce4bc7 100644 --- a/doc/user/transformers.rst +++ b/doc/user/transformers.rst @@ -19,6 +19,11 @@ In other applications, transformers are sometimes called "filters". That term however has many different meanings (like the filters on the "Browse" page), and as such, Fietsboek calls them transformers. +Keep in mind that the transformers provide a "quick and convenient" way to +apply a predefined set of changes to a track. If you need to do fine-grained +edits to a GPX file, you need to use a different tool and edit the file before +uploading it to Fietsboek. + Fix Null Elevation ------------------ @@ -44,6 +49,30 @@ point. To fix those points, the transformer will find the first correct point, and copy its elevation to the wrong points. +Fix Elevation Jumps +------------------- + +The *fix elevation jumps* transformer eliminates big elevation jumps in the +middle of a track. This is useful to deal with "stitched" GPX files, where the +elevation is consistent within a single track, but the absolute value might not +be correct (e.g. if the device recalibrates): + +.. image:: images/wrong_elevation_jump.png + :width: 600 + :alt: The elevation profile having a big jump in the middle. + +In this track, the device was re-calibrated mid-track. The transformer will +adjust the elevation values: + +.. image:: images/fixed_elevation_jump.png + :width: 600 + :alt: The same elevation profile with the jump removed. + +The detection of jumps work similarly to the *fix null elevation* transformer, +with the difference that it works in the middle of tracks. It will consider the +earlier points as anchor, and then adjust the later points such that the first +point after the jump has the same elevation as the last point before the jump. + Remove Breaks ------------- diff --git a/fietsboek/transformers/__init__.py b/fietsboek/transformers/__init__.py index 330e699..b1a0245 100644 --- a/fietsboek/transformers/__init__.py +++ b/fietsboek/transformers/__init__.py @@ -10,11 +10,10 @@ function to load and apply transformers. """ from abc import ABC, abstractmethod -from collections.abc import Callable, Iterable, Mapping -from itertools import islice +from collections.abc import Mapping from typing import Literal, NamedTuple, TypeVar -from gpxpy.gpx import GPX, GPXTrackPoint +from gpxpy.gpx import GPX from pydantic import BaseModel from pyramid.i18n import TranslationString from pyramid.request import Request @@ -128,99 +127,6 @@ class Transformer(ABC): """ -class FixNullElevation(Transformer): - """A transformer that fixes points with zero elevation.""" - - @classmethod - def identifier(cls) -> str: - return "fix-null-elevation" - - @classmethod - def name(cls) -> TranslationString: - return _("transformers.fix-null-elevation.title") - - @classmethod - def description(cls) -> TranslationString: - return _("transformers.fix-null-elevation.description") - - @classmethod - def parameter_model(cls) -> type[Parameters]: - return Parameters - - @property - def parameters(self) -> Parameters: - return Parameters() - - @parameters.setter - def parameters(self, value): - pass - - def execute(self, gpx: GPX): - def all_points(): - return gpx.walk(only_points=True) - - def rev_points(): - # We cannot use reversed(gpx.walk(...)) since that is not a - # generator, so we do it manually. - return ( - point - for track in reversed(gpx.tracks) - for segment in reversed(track.segments) - for point in reversed(segment.points) - ) - - # First, from the front - self.fixup(all_points) - # Then, from the back - self.fixup(rev_points) - - @classmethod - def fixup(cls, points: Callable[[], Iterable[GPXTrackPoint]]): - """Fixes the given GPX points. - - This iterates over the points and checks for the first point that has a - non-zero elevation, and a slope that doesn't exceed 100%. All previous - points will have their elevation adjusted to match this first "good - point". - - :param points: A function that generates the iterable of points. - """ - max_slope = 1.0 - - bad_until = 0 - final_elevation = 0.0 - for i, (point, next_point) in enumerate(zip(points(), islice(points(), 1, None))): - if ( - point.elevation is not None - and point.elevation != 0.0 - and cls.slope(point, next_point) < max_slope - ): - bad_until = i - final_elevation = point.elevation - break - - for point in islice(points(), None, bad_until): - point.elevation = final_elevation - - @staticmethod - def slope(point_a: GPXTrackPoint, point_b: GPXTrackPoint) -> float: - """Returns the slope between two GPX points. - - This is defined as delta_h / euclid_distance. - - :param point_a: First point. - :param point_b: Second point. - :return: The slope, as percentage. - """ - if point_a.elevation is None or point_b.elevation is None: - return 0.0 - delta_h = abs(point_a.elevation - point_b.elevation) - dist = point_a.distance_2d(point_b) - if dist == 0.0 or dist is None: - return 0.0 - return delta_h / dist - - def list_transformers() -> list[type[Transformer]]: """Returns a list of all available transformers. @@ -228,9 +134,11 @@ def list_transformers() -> list[type[Transformer]]: """ # pylint: disable=import-outside-toplevel,cyclic-import from .breaks import RemoveBreaks + from .elevation import FixElevationJumps, FixNullElevation return [ FixNullElevation, + FixElevationJumps, RemoveBreaks, ] diff --git a/fietsboek/transformers/elevation.py b/fietsboek/transformers/elevation.py new file mode 100644 index 0000000..0af5161 --- /dev/null +++ b/fietsboek/transformers/elevation.py @@ -0,0 +1,147 @@ +"""Transformers that deal with elevation changes in the track.""" +from collections.abc import Callable, Iterable +from itertools import islice, zip_longest + +from gpxpy.gpx import GPX, GPXTrackPoint +from pyramid.i18n import TranslationString + +from . import Parameters, Transformer + +_ = TranslationString + +MAX_ORGANIC_SLOPE: float = 1.0 + + +def slope(point_a: GPXTrackPoint, point_b: GPXTrackPoint) -> float: + """Returns the slope between two GPX points. + + This is defined as delta_h / euclid_distance. + + :param point_a: First point. + :param point_b: Second point. + :return: The slope, as percentage. + """ + if point_a.elevation is None or point_b.elevation is None: + return 0.0 + delta_h = abs(point_a.elevation - point_b.elevation) + dist = point_a.distance_2d(point_b) + if dist == 0.0 or dist is None: + return 0.0 + return delta_h / dist + + +class FixNullElevation(Transformer): + """A transformer that fixes points with zero elevation.""" + + @classmethod + def identifier(cls) -> str: + return "fix-null-elevation" + + @classmethod + def name(cls) -> TranslationString: + return _("transformers.fix-null-elevation.title") + + @classmethod + def description(cls) -> TranslationString: + return _("transformers.fix-null-elevation.description") + + @classmethod + def parameter_model(cls) -> type[Parameters]: + return Parameters + + @property + def parameters(self) -> Parameters: + return Parameters() + + @parameters.setter + def parameters(self, value): + pass + + def execute(self, gpx: GPX): + def all_points(): + return gpx.walk(only_points=True) + + def rev_points(): + # We cannot use reversed(gpx.walk(...)) since that is not a + # generator, so we do it manually. + return ( + point + for track in reversed(gpx.tracks) + for segment in reversed(track.segments) + for point in reversed(segment.points) + ) + + # First, from the front + self.fixup(all_points) + # Then, from the back + self.fixup(rev_points) + + @classmethod + def fixup(cls, points: Callable[[], Iterable[GPXTrackPoint]]): + """Fixes the given GPX points. + + This iterates over the points and checks for the first point that has a + non-zero elevation, and a slope that doesn't exceed 100%. All previous + points will have their elevation adjusted to match this first "good + point". + + :param points: A function that generates the iterable of points. + """ + bad_until = 0 + final_elevation = 0.0 + for i, (point, next_point) in enumerate(zip(points(), islice(points(), 1, None))): + if ( + point.elevation is not None + and point.elevation != 0.0 + and slope(point, next_point) < MAX_ORGANIC_SLOPE + ): + bad_until = i + final_elevation = point.elevation + break + + for point in islice(points(), None, bad_until): + point.elevation = final_elevation + + +class FixElevationJumps(Transformer): + """A transformer that fixes big jumps in the elevation.""" + + @classmethod + def identifier(cls) -> str: + return "fix-elevation-jumps" + + @classmethod + def name(cls) -> TranslationString: + return _("transformers.fix-elevation-jumps") + + @classmethod + def description(cls) -> TranslationString: + return _("transformers.fix-elevation-jumps.description") + + @classmethod + def parameter_model(cls) -> type[Parameters]: + return Parameters + + @property + def parameters(self) -> Parameters: + return Parameters() + + @parameters.setter + def parameters(self, value): + pass + + def execute(self, gpx: GPX): + current_adjustment = 0.0 + + points = gpx.walk(only_points=True) + next_points = gpx.walk(only_points=True) + + for current_point, next_point in zip_longest(points, islice(next_points, 1, None)): + point_adjustment = current_adjustment + if next_point and slope(current_point, next_point) > MAX_ORGANIC_SLOPE: + current_adjustment += current_point.elevation - next_point.elevation + print(f"{current_adjustment=}") + current_point.elevation += point_adjustment + + +__all__ = ["FixNullElevation", "FixElevationJumps"] diff --git a/tests/assets/Elevation_Jump.gpx.gz b/tests/assets/Elevation_Jump.gpx.gz Binary files differnew file mode 100644 index 0000000..836ddb1 --- /dev/null +++ b/tests/assets/Elevation_Jump.gpx.gz diff --git a/tests/playwright/test_transformers.py b/tests/playwright/test_transformers.py index d4f07e1..fc89afb 100644 --- a/tests/playwright/test_transformers.py +++ b/tests/playwright/test_transformers.py @@ -1,3 +1,4 @@ +import gpxpy from playwright.sync_api import Page, expect from sqlalchemy import select @@ -141,3 +142,31 @@ def test_transformer_steep_slope_edited(page: Page, playwright_helper, tmp_path, track = dbaccess.execute(select(models.Track).filter_by(id=track_id)).scalar_one() assert track.cache.uphill < 2 + + +def test_transformer_elevation_jump_enabled(page: Page, playwright_helper, tmp_path, data_manager): + playwright_helper.login() + + page.goto("/") + page.get_by_text("Upload").click() + + extract_and_upload(page, "Elevation_Jump.gpx.gz", tmp_path) + + page.locator("#transformer-heading-2 .accordion-button").click() + page.locator("#transformer-2.collapse.show").wait_for() + page.locator("#transformer-enabled-2").check() + + page.locator(".btn", has_text="Upload").click() + + page.locator(".alert", has_text="Upload successful").wait_for() + + new_track_id = int(page.url.rsplit("/", 1)[1]) + data = data_manager.open(new_track_id) + + gpx = gpxpy.parse(data.decompress_gpx()) + points = iter(gpx.walk(only_points=True)) + next(points) + for prev_point, point in zip(gpx.walk(only_points=True), points): + # The given GPX has a jump of 94 between two consecutive points, so + # here we assert that that jump is gone. + assert abs(prev_point.elevation - point.elevation) < 10.0 |