diff options
-rw-r--r-- | fietsboek/transformers/__init__.py | 7 | ||||
-rw-r--r-- | fietsboek/transformers/elevation.py | 81 |
2 files changed, 65 insertions, 23 deletions
diff --git a/fietsboek/transformers/__init__.py b/fietsboek/transformers/__init__.py index 7df700c..b1a0245 100644 --- a/fietsboek/transformers/__init__.py +++ b/fietsboek/transformers/__init__.py @@ -10,10 +10,10 @@ function to load and apply transformers. """ from abc import ABC, abstractmethod -from collections.abc import Callable, Iterable, Mapping +from collections.abc import Mapping from typing import Literal, NamedTuple, TypeVar -from gpxpy.gpx import GPX, GPXTrackPoint +from gpxpy.gpx import GPX from pydantic import BaseModel from pyramid.i18n import TranslationString from pyramid.request import Request @@ -134,10 +134,11 @@ def list_transformers() -> list[type[Transformer]]: """ # pylint: disable=import-outside-toplevel,cyclic-import from .breaks import RemoveBreaks - from .elevation import FixNullElevation + from .elevation import FixElevationJumps, FixNullElevation return [ FixNullElevation, + FixElevationJumps, RemoveBreaks, ] diff --git a/fietsboek/transformers/elevation.py b/fietsboek/transformers/elevation.py index 0e6f3b0..0af5161 100644 --- a/fietsboek/transformers/elevation.py +++ b/fietsboek/transformers/elevation.py @@ -1,6 +1,6 @@ """Transformers that deal with elevation changes in the track.""" from collections.abc import Callable, Iterable -from itertools import islice +from itertools import islice, zip_longest from gpxpy.gpx import GPX, GPXTrackPoint from pyramid.i18n import TranslationString @@ -9,6 +9,26 @@ from . import Parameters, Transformer _ = TranslationString +MAX_ORGANIC_SLOPE: float = 1.0 + + +def slope(point_a: GPXTrackPoint, point_b: GPXTrackPoint) -> float: + """Returns the slope between two GPX points. + + This is defined as delta_h / euclid_distance. + + :param point_a: First point. + :param point_b: Second point. + :return: The slope, as percentage. + """ + if point_a.elevation is None or point_b.elevation is None: + return 0.0 + delta_h = abs(point_a.elevation - point_b.elevation) + dist = point_a.distance_2d(point_b) + if dist == 0.0 or dist is None: + return 0.0 + return delta_h / dist + class FixNullElevation(Transformer): """A transformer that fixes points with zero elevation.""" @@ -67,15 +87,13 @@ class FixNullElevation(Transformer): :param points: A function that generates the iterable of points. """ - max_slope = 1.0 - bad_until = 0 final_elevation = 0.0 for i, (point, next_point) in enumerate(zip(points(), islice(points(), 1, None))): if ( point.elevation is not None and point.elevation != 0.0 - and cls.slope(point, next_point) < max_slope + and slope(point, next_point) < MAX_ORGANIC_SLOPE ): bad_until = i final_elevation = point.elevation @@ -84,23 +102,46 @@ class FixNullElevation(Transformer): for point in islice(points(), None, bad_until): point.elevation = final_elevation - @staticmethod - def slope(point_a: GPXTrackPoint, point_b: GPXTrackPoint) -> float: - """Returns the slope between two GPX points. - This is defined as delta_h / euclid_distance. +class FixElevationJumps(Transformer): + """A transformer that fixes big jumps in the elevation.""" - :param point_a: First point. - :param point_b: Second point. - :return: The slope, as percentage. - """ - if point_a.elevation is None or point_b.elevation is None: - return 0.0 - delta_h = abs(point_a.elevation - point_b.elevation) - dist = point_a.distance_2d(point_b) - if dist == 0.0 or dist is None: - return 0.0 - return delta_h / dist + @classmethod + def identifier(cls) -> str: + return "fix-elevation-jumps" + + @classmethod + def name(cls) -> TranslationString: + return _("transformers.fix-elevation-jumps") + + @classmethod + def description(cls) -> TranslationString: + return _("transformers.fix-elevation-jumps.description") + + @classmethod + def parameter_model(cls) -> type[Parameters]: + return Parameters + + @property + def parameters(self) -> Parameters: + return Parameters() + + @parameters.setter + def parameters(self, value): + pass + + def execute(self, gpx: GPX): + current_adjustment = 0.0 + + points = gpx.walk(only_points=True) + next_points = gpx.walk(only_points=True) + + for current_point, next_point in zip_longest(points, islice(next_points, 1, None)): + point_adjustment = current_adjustment + if next_point and slope(current_point, next_point) > MAX_ORGANIC_SLOPE: + current_adjustment += current_point.elevation - next_point.elevation + print(f"{current_adjustment=}") + current_point.elevation += point_adjustment -__all__ = ["FixNullElevation"] +__all__ = ["FixNullElevation", "FixElevationJumps"] |