diff options
Diffstat (limited to 'fietsboek/convert.py')
| -rw-r--r-- | fietsboek/convert.py | 126 |
1 files changed, 112 insertions, 14 deletions
diff --git a/fietsboek/convert.py b/fietsboek/convert.py index d3bfb22..5d7b43e 100644 --- a/fietsboek/convert.py +++ b/fietsboek/convert.py @@ -1,11 +1,27 @@ """Conversion functions to convert between various recording formats.""" +import datetime + import fitparse -from gpxpy.gpx import GPX, GPXTrack, GPXTrackPoint, GPXTrackSegment +import gpxpy + +from . import geo, util +from .models import Track, Waypoint FIT_RECORD_FIELDS = ["position_lat", "position_long", "altitude", "timestamp"] +class ConversionError(Exception): + """Error that occurred when loading a track from a file.""" + + +class UnknownFormat(ConversionError): + """The format of the source file could not be identified.""" + + def __str__(self): + return type(self).__doc__ + + def semicircles_to_deg(circles: int) -> float: """Convert semicircles coordinate to degree coordinate. @@ -15,8 +31,8 @@ def semicircles_to_deg(circles: int) -> float: return circles * (180 / 2**31) -def from_fit(data: bytes) -> GPX: - """Reads a .fit as GPX data. +def from_fit(data: bytes) -> Track: + """Reads a .fit as track data. This uses the fitparse_ library under the hood. @@ -24,32 +40,111 @@ def from_fit(data: bytes) -> GPX: :param data: The input bytes. :return: The converted structure. + :raises ConversionError: If conversion failed. """ fitfile = fitparse.FitFile(data) + start_time = None points = [] for record in fitfile.get_messages("record"): values = record.get_values() try: if any(values[field] is None for field in FIT_RECORD_FIELDS): continue - point = GPXTrackPoint( + time = values["timestamp"] + if start_time is None: + start_time = time + point = geo.Point( latitude=semicircles_to_deg(values["position_lat"]), longitude=semicircles_to_deg(values["position_long"]), elevation=values["altitude"], - time=values["timestamp"], + time_offset=(time - start_time).total_seconds(), ) except KeyError: pass else: points.append(point) - track = GPXTrack() - track.segments = [GPXTrackSegment(points)] - gpx = GPX() - gpx.tracks = [track] - return gpx + path = geo.Path(points) + track = Track() + track.set_path(path) + track.date = start_time + return track -def smart_convert(data: bytes) -> bytes: +def from_gpx(data: bytes) -> Track: + """Reads a .gpx as track data. + + This uses the gpxpy_ library under the hood. + + .. _gpxpy: https://github.com/tkrajina/gpxpy + + :param data: The input bytes. + :return: The converted structure. + :raises ConversionError: If conversion failed. + """ + # pylint: disable=too-many-locals + gpx = gpxpy.parse(data) + points = [] + start_time = None + + for track in gpx.tracks: + for segment in track.segments: + for point in segment.points: + if start_time is None: + start_time = point.time + + if point.time is not None and start_time is not None: + time_offset = (point.time - start_time).total_seconds() + else: + time_offset = 0 + points.append( + geo.Point( + longitude=point.longitude, + latitude=point.latitude, + elevation=point.elevation or 0.0, + time_offset=time_offset, + ) + ) + + timezone = util.guess_gpx_timezone(gpx) + date = gpx.time or gpx.get_time_bounds().start_time or datetime.datetime.now() + date = date.astimezone(timezone) + track_name = gpx.name + track_desc = gpx.description + for track in gpx.tracks: + if not track_name and track.name: + track_name = track.name + if not track_desc and track.description: + track_desc = track.description + + path = geo.Path(points) + track = Track() + track.set_path(path) + track.title = track_name + track.description = track_desc + track.date = date + + for waypoint in gpx.waypoints: + desc = None + # GPX waypoints can have both description and comment. It seems like + # comment is what is usually used (GPXViewer only shows the comment), + # so we'll prioritize that. + if waypoint.comment: + desc = waypoint.comment + if not desc and waypoint.description: + desc = waypoint.description + wpt = Waypoint( + longitude=waypoint.longitude, + latitude=waypoint.latitude, + elevation=waypoint.elevation, + name=waypoint.name, + description=desc, + ) + track.waypoints.append(wpt) + + return track + + +def smart_convert(data: bytes) -> Track: """Tries to be smart in converting the input bytes. This function automatically applies the correct conversion if possible. @@ -59,10 +154,13 @@ def smart_convert(data: bytes) -> bytes: :param data: The input bytes. :return: The converted content. + :raises ConversionError: When conversion fails. """ if len(data) > 11 and data[9:12] == b"FIT": - return from_fit(data).to_xml().encode("utf-8") - return data + return from_fit(data) + if data.startswith(b"<?xml") and b"<gpx" in data[:200]: + return from_gpx(data) + raise UnknownFormat() -__all__ = ["from_fit", "smart_convert"] +__all__ = ["ConversionError", "from_fit", "from_gpx", "smart_convert"] |
