diff options
-rw-r--r-- | fietsboek/transformers/__init__.py | 4 | ||||
-rw-r--r-- | fietsboek/transformers/breaks.py | 123 |
2 files changed, 127 insertions, 0 deletions
diff --git a/fietsboek/transformers/__init__.py b/fietsboek/transformers/__init__.py index 82f2a48..330e699 100644 --- a/fietsboek/transformers/__init__.py +++ b/fietsboek/transformers/__init__.py @@ -226,8 +226,12 @@ def list_transformers() -> list[type[Transformer]]: :return: A list of transformers. """ + # pylint: disable=import-outside-toplevel,cyclic-import + from .breaks import RemoveBreaks + return [ FixNullElevation, + RemoveBreaks, ] diff --git a/fietsboek/transformers/breaks.py b/fietsboek/transformers/breaks.py new file mode 100644 index 0000000..1c56414 --- /dev/null +++ b/fietsboek/transformers/breaks.py @@ -0,0 +1,123 @@ +"""Transformers that deal with breaks in the track.""" +import datetime + +from gpxpy.gpx import GPX, GPXTrack +from pyramid.i18n import TranslationString + +from . import Parameters, Transformer + +_ = TranslationString + +# 1km/h is the default limit of gpxpy. We convert this into m/s to make our +# live easier. +STOPPED_SPEED_LIMIT: float = 1 * 1000 / (60 * 60) +"""Speed limit (in m/s) at which the track is considered to be stopped.""" + +# We don't want to remove "breaks" at traffic lights or similar, so 5 minutes +# should be a good default. +MIN_BREAK_TO_REMOVE: datetime.timedelta = datetime.timedelta(minutes=5) +"""Minimum length of the break to trigger the removal.""" + + +class RemoveBreaks(Transformer): + """A transformer that fixes points with zero elevation.""" + + @classmethod + def identifier(cls) -> str: + return "remove-breaks" + + @classmethod + def name(cls) -> TranslationString: + return _("transformers.remove-breaks.title") + + @classmethod + def description(cls) -> TranslationString: + return _("transformers.remove-breaks.description") + + @classmethod + def parameter_model(cls) -> type[Parameters]: + return Parameters + + @property + def parameters(self) -> Parameters: + return Parameters() + + @parameters.setter + def parameters(self, value): + pass + + def execute(self, gpx: GPX): + for track in gpx.tracks: + self._clean(track) + + def _clean(self, track: GPXTrack): + if not track.get_points_no(): + return + + i = 0 + while i < track.get_points_no(): + segment_idx, point_idx = index(track, i) + point = track.segments[segment_idx].points[point_idx] + + # We check if the following points constitute a break, and if yes, + # how many of them + count = 0 + current_length = 0.0 + last_point = point + while True: + try: + j_segment, j_point = index(track, i + count + 1) + except IndexError: + break + current_point = track.segments[j_segment].points[j_point] + current_length += last_point.distance_3d(current_point) or 0.0 + last_point = current_point + + delta_t = datetime.timedelta(seconds=point.time_difference(last_point) or 0.0) + if not delta_t or current_length / delta_t.total_seconds() > STOPPED_SPEED_LIMIT: + break + count += 1 + + # No break, carry on! + if not count: + i += 1 + continue + + # At this point, check if the break is long enough to be removed + delta_t = datetime.timedelta(seconds=point.time_difference(last_point) or 0.0) + if delta_t < MIN_BREAK_TO_REMOVE: + i += 1 + continue + + # Here, we have a proper break to remove + # Delete the points belonging to the break ... + for _ in range(count): + j_segment, j_point = index(track, i + 1) + del track.segments[j_segment].points[j_point] + + # ... and shift the time of the following points + j = i + 1 + while j < track.get_points_no(): + j_segment, j_point = index(track, j) + track.segments[j_segment].points[j_point].adjust_time(-delta_t) + j += 1 + + +def index(track: GPXTrack, idx: int) -> tuple[int, int]: + """Takes a one-dimensional index (the point index) and returns an index + into the segment/segment points. + + :raises IndexError: When the given index is out of bounds. + :param track: The track for which to get the index. + :param idx: The "1D" index. + :return: A tuple with the segment index, and the index of the point within + the segment. + """ + for segment_idx, segment in enumerate(track.segments): + if idx < len(segment.points): + return (segment_idx, idx) + idx -= len(segment.points) + raise IndexError + + +__all__ = ["RemoveBreaks"] |