diff options
| author | Daniel Schadt <kingdread@gmx.de> | 2023-05-15 20:19:48 +0200 | 
|---|---|---|
| committer | Daniel Schadt <kingdread@gmx.de> | 2023-05-15 20:19:48 +0200 | 
| commit | e6e0271278765eed288e9f7da27ffaa423d64d53 (patch) | |
| tree | 8bd212655d2d0daf81e00158cca625dddfb330d7 | |
| parent | 844a652e47b3a9d5fd44eee8ec0d6a49b9bde91c (diff) | |
| parent | c635dcc671810f46cf4c2783da7d11b6cd0b97ec (diff) | |
| download | fietsboek-e6e0271278765eed288e9f7da27ffaa423d64d53.tar.gz fietsboek-e6e0271278765eed288e9f7da27ffaa423d64d53.tar.bz2 fietsboek-e6e0271278765eed288e9f7da27ffaa423d64d53.zip  | |
Merge branch 'elevation-jumper'
| -rw-r--r-- | doc/user/images/fixed_elevation_jump.png | bin | 0 -> 61739 bytes | |||
| -rw-r--r-- | doc/user/images/wrong_elevation_jump.png | bin | 0 -> 39593 bytes | |||
| -rw-r--r-- | doc/user/transformers.rst | 29 | ||||
| -rw-r--r-- | fietsboek/transformers/__init__.py | 100 | ||||
| -rw-r--r-- | fietsboek/transformers/elevation.py | 147 | ||||
| -rw-r--r-- | tests/assets/Elevation_Jump.gpx.gz | bin | 0 -> 4417 bytes | |||
| -rw-r--r-- | tests/playwright/test_transformers.py | 29 | 
7 files changed, 209 insertions, 96 deletions
diff --git a/doc/user/images/fixed_elevation_jump.png b/doc/user/images/fixed_elevation_jump.png Binary files differnew file mode 100644 index 0000000..4d1334a --- /dev/null +++ b/doc/user/images/fixed_elevation_jump.png diff --git a/doc/user/images/wrong_elevation_jump.png b/doc/user/images/wrong_elevation_jump.png Binary files differnew file mode 100644 index 0000000..03d454d --- /dev/null +++ b/doc/user/images/wrong_elevation_jump.png diff --git a/doc/user/transformers.rst b/doc/user/transformers.rst index d053052..7ce4bc7 100644 --- a/doc/user/transformers.rst +++ b/doc/user/transformers.rst @@ -19,6 +19,11 @@ In other applications, transformers are sometimes called "filters". That term  however has many different meanings (like the filters on the "Browse" page),  and as such, Fietsboek calls them transformers. +Keep in mind that the transformers provide a "quick and convenient" way to +apply a predefined set of changes to a track. If you need to do fine-grained +edits to a GPX file, you need to use a different tool and edit the file before +uploading it to Fietsboek. +  Fix Null Elevation  ------------------ @@ -44,6 +49,30 @@ point.  To fix those points, the transformer will find the first correct point, and  copy its elevation to the wrong points. +Fix Elevation Jumps +------------------- + +The *fix elevation jumps* transformer eliminates big elevation jumps in the +middle of a track. This is useful to deal with "stitched" GPX files, where the +elevation is consistent within a single track, but the absolute value might not +be correct (e.g. if the device recalibrates): + +.. image:: images/wrong_elevation_jump.png +   :width: 600 +   :alt: The elevation profile having a big jump in the middle. + +In this track, the device was re-calibrated mid-track. The transformer will +adjust the elevation values: + +.. image:: images/fixed_elevation_jump.png +   :width: 600 +   :alt: The same elevation profile with the jump removed. + +The detection of jumps work similarly to the *fix null elevation* transformer, +with the difference that it works in the middle of tracks. It will consider the +earlier points as anchor, and then adjust the later points such that the first +point after the jump has the same elevation as the last point before the jump. +  Remove Breaks  ------------- diff --git a/fietsboek/transformers/__init__.py b/fietsboek/transformers/__init__.py index 330e699..b1a0245 100644 --- a/fietsboek/transformers/__init__.py +++ b/fietsboek/transformers/__init__.py @@ -10,11 +10,10 @@ function to load and apply transformers.  """  from abc import ABC, abstractmethod -from collections.abc import Callable, Iterable, Mapping -from itertools import islice +from collections.abc import Mapping  from typing import Literal, NamedTuple, TypeVar -from gpxpy.gpx import GPX, GPXTrackPoint +from gpxpy.gpx import GPX  from pydantic import BaseModel  from pyramid.i18n import TranslationString  from pyramid.request import Request @@ -128,99 +127,6 @@ class Transformer(ABC):          """ -class FixNullElevation(Transformer): -    """A transformer that fixes points with zero elevation.""" - -    @classmethod -    def identifier(cls) -> str: -        return "fix-null-elevation" - -    @classmethod -    def name(cls) -> TranslationString: -        return _("transformers.fix-null-elevation.title") - -    @classmethod -    def description(cls) -> TranslationString: -        return _("transformers.fix-null-elevation.description") - -    @classmethod -    def parameter_model(cls) -> type[Parameters]: -        return Parameters - -    @property -    def parameters(self) -> Parameters: -        return Parameters() - -    @parameters.setter -    def parameters(self, value): -        pass - -    def execute(self, gpx: GPX): -        def all_points(): -            return gpx.walk(only_points=True) - -        def rev_points(): -            # We cannot use reversed(gpx.walk(...)) since that is not a -            # generator, so we do it manually. -            return ( -                point -                for track in reversed(gpx.tracks) -                for segment in reversed(track.segments) -                for point in reversed(segment.points) -            ) - -        # First, from the front -        self.fixup(all_points) -        # Then, from the back -        self.fixup(rev_points) - -    @classmethod -    def fixup(cls, points: Callable[[], Iterable[GPXTrackPoint]]): -        """Fixes the given GPX points. - -        This iterates over the points and checks for the first point that has a -        non-zero elevation, and a slope that doesn't exceed 100%. All previous -        points will have their elevation adjusted to match this first "good -        point". - -        :param points: A function that generates the iterable of points. -        """ -        max_slope = 1.0 - -        bad_until = 0 -        final_elevation = 0.0 -        for i, (point, next_point) in enumerate(zip(points(), islice(points(), 1, None))): -            if ( -                point.elevation is not None -                and point.elevation != 0.0 -                and cls.slope(point, next_point) < max_slope -            ): -                bad_until = i -                final_elevation = point.elevation -                break - -        for point in islice(points(), None, bad_until): -            point.elevation = final_elevation - -    @staticmethod -    def slope(point_a: GPXTrackPoint, point_b: GPXTrackPoint) -> float: -        """Returns the slope between two GPX points. - -        This is defined as delta_h / euclid_distance. - -        :param point_a: First point. -        :param point_b: Second point. -        :return: The slope, as percentage. -        """ -        if point_a.elevation is None or point_b.elevation is None: -            return 0.0 -        delta_h = abs(point_a.elevation - point_b.elevation) -        dist = point_a.distance_2d(point_b) -        if dist == 0.0 or dist is None: -            return 0.0 -        return delta_h / dist - -  def list_transformers() -> list[type[Transformer]]:      """Returns a list of all available transformers. @@ -228,9 +134,11 @@ def list_transformers() -> list[type[Transformer]]:      """      # pylint: disable=import-outside-toplevel,cyclic-import      from .breaks import RemoveBreaks +    from .elevation import FixElevationJumps, FixNullElevation      return [          FixNullElevation, +        FixElevationJumps,          RemoveBreaks,      ] diff --git a/fietsboek/transformers/elevation.py b/fietsboek/transformers/elevation.py new file mode 100644 index 0000000..0af5161 --- /dev/null +++ b/fietsboek/transformers/elevation.py @@ -0,0 +1,147 @@ +"""Transformers that deal with elevation changes in the track.""" +from collections.abc import Callable, Iterable +from itertools import islice, zip_longest + +from gpxpy.gpx import GPX, GPXTrackPoint +from pyramid.i18n import TranslationString + +from . import Parameters, Transformer + +_ = TranslationString + +MAX_ORGANIC_SLOPE: float = 1.0 + + +def slope(point_a: GPXTrackPoint, point_b: GPXTrackPoint) -> float: +    """Returns the slope between two GPX points. + +    This is defined as delta_h / euclid_distance. + +    :param point_a: First point. +    :param point_b: Second point. +    :return: The slope, as percentage. +    """ +    if point_a.elevation is None or point_b.elevation is None: +        return 0.0 +    delta_h = abs(point_a.elevation - point_b.elevation) +    dist = point_a.distance_2d(point_b) +    if dist == 0.0 or dist is None: +        return 0.0 +    return delta_h / dist + + +class FixNullElevation(Transformer): +    """A transformer that fixes points with zero elevation.""" + +    @classmethod +    def identifier(cls) -> str: +        return "fix-null-elevation" + +    @classmethod +    def name(cls) -> TranslationString: +        return _("transformers.fix-null-elevation.title") + +    @classmethod +    def description(cls) -> TranslationString: +        return _("transformers.fix-null-elevation.description") + +    @classmethod +    def parameter_model(cls) -> type[Parameters]: +        return Parameters + +    @property +    def parameters(self) -> Parameters: +        return Parameters() + +    @parameters.setter +    def parameters(self, value): +        pass + +    def execute(self, gpx: GPX): +        def all_points(): +            return gpx.walk(only_points=True) + +        def rev_points(): +            # We cannot use reversed(gpx.walk(...)) since that is not a +            # generator, so we do it manually. +            return ( +                point +                for track in reversed(gpx.tracks) +                for segment in reversed(track.segments) +                for point in reversed(segment.points) +            ) + +        # First, from the front +        self.fixup(all_points) +        # Then, from the back +        self.fixup(rev_points) + +    @classmethod +    def fixup(cls, points: Callable[[], Iterable[GPXTrackPoint]]): +        """Fixes the given GPX points. + +        This iterates over the points and checks for the first point that has a +        non-zero elevation, and a slope that doesn't exceed 100%. All previous +        points will have their elevation adjusted to match this first "good +        point". + +        :param points: A function that generates the iterable of points. +        """ +        bad_until = 0 +        final_elevation = 0.0 +        for i, (point, next_point) in enumerate(zip(points(), islice(points(), 1, None))): +            if ( +                point.elevation is not None +                and point.elevation != 0.0 +                and slope(point, next_point) < MAX_ORGANIC_SLOPE +            ): +                bad_until = i +                final_elevation = point.elevation +                break + +        for point in islice(points(), None, bad_until): +            point.elevation = final_elevation + + +class FixElevationJumps(Transformer): +    """A transformer that fixes big jumps in the elevation.""" + +    @classmethod +    def identifier(cls) -> str: +        return "fix-elevation-jumps" + +    @classmethod +    def name(cls) -> TranslationString: +        return _("transformers.fix-elevation-jumps") + +    @classmethod +    def description(cls) -> TranslationString: +        return _("transformers.fix-elevation-jumps.description") + +    @classmethod +    def parameter_model(cls) -> type[Parameters]: +        return Parameters + +    @property +    def parameters(self) -> Parameters: +        return Parameters() + +    @parameters.setter +    def parameters(self, value): +        pass + +    def execute(self, gpx: GPX): +        current_adjustment = 0.0 + +        points = gpx.walk(only_points=True) +        next_points = gpx.walk(only_points=True) + +        for current_point, next_point in zip_longest(points, islice(next_points, 1, None)): +            point_adjustment = current_adjustment +            if next_point and slope(current_point, next_point) > MAX_ORGANIC_SLOPE: +                current_adjustment += current_point.elevation - next_point.elevation +                print(f"{current_adjustment=}") +            current_point.elevation += point_adjustment + + +__all__ = ["FixNullElevation", "FixElevationJumps"] diff --git a/tests/assets/Elevation_Jump.gpx.gz b/tests/assets/Elevation_Jump.gpx.gz Binary files differnew file mode 100644 index 0000000..836ddb1 --- /dev/null +++ b/tests/assets/Elevation_Jump.gpx.gz diff --git a/tests/playwright/test_transformers.py b/tests/playwright/test_transformers.py index d4f07e1..fc89afb 100644 --- a/tests/playwright/test_transformers.py +++ b/tests/playwright/test_transformers.py @@ -1,3 +1,4 @@ +import gpxpy  from playwright.sync_api import Page, expect  from sqlalchemy import select @@ -141,3 +142,31 @@ def test_transformer_steep_slope_edited(page: Page, playwright_helper, tmp_path,      track = dbaccess.execute(select(models.Track).filter_by(id=track_id)).scalar_one()      assert track.cache.uphill < 2 + + +def test_transformer_elevation_jump_enabled(page: Page, playwright_helper, tmp_path, data_manager): +    playwright_helper.login() + +    page.goto("/") +    page.get_by_text("Upload").click() + +    extract_and_upload(page, "Elevation_Jump.gpx.gz", tmp_path) + +    page.locator("#transformer-heading-2 .accordion-button").click() +    page.locator("#transformer-2.collapse.show").wait_for() +    page.locator("#transformer-enabled-2").check() + +    page.locator(".btn", has_text="Upload").click() + +    page.locator(".alert", has_text="Upload successful").wait_for() + +    new_track_id = int(page.url.rsplit("/", 1)[1]) +    data = data_manager.open(new_track_id) + +    gpx = gpxpy.parse(data.decompress_gpx()) +    points = iter(gpx.walk(only_points=True)) +    next(points) +    for prev_point, point in zip(gpx.walk(only_points=True), points): +        # The given GPX has a jump of 94 between two consecutive points, so +        # here we assert that that jump is gone. +        assert abs(prev_point.elevation - point.elevation) < 10.0  | 
