aboutsummaryrefslogtreecommitdiff
path: root/fietsboek/convert.py
diff options
context:
space:
mode:
Diffstat (limited to 'fietsboek/convert.py')
-rw-r--r--fietsboek/convert.py126
1 files changed, 112 insertions, 14 deletions
diff --git a/fietsboek/convert.py b/fietsboek/convert.py
index d3bfb22..5d7b43e 100644
--- a/fietsboek/convert.py
+++ b/fietsboek/convert.py
@@ -1,11 +1,27 @@
"""Conversion functions to convert between various recording formats."""
+import datetime
+
import fitparse
-from gpxpy.gpx import GPX, GPXTrack, GPXTrackPoint, GPXTrackSegment
+import gpxpy
+
+from . import geo, util
+from .models import Track, Waypoint
FIT_RECORD_FIELDS = ["position_lat", "position_long", "altitude", "timestamp"]
+class ConversionError(Exception):
+ """Error that occurred when loading a track from a file."""
+
+
+class UnknownFormat(ConversionError):
+ """The format of the source file could not be identified."""
+
+ def __str__(self):
+ return type(self).__doc__
+
+
def semicircles_to_deg(circles: int) -> float:
"""Convert semicircles coordinate to degree coordinate.
@@ -15,8 +31,8 @@ def semicircles_to_deg(circles: int) -> float:
return circles * (180 / 2**31)
-def from_fit(data: bytes) -> GPX:
- """Reads a .fit as GPX data.
+def from_fit(data: bytes) -> Track:
+ """Reads a .fit as track data.
This uses the fitparse_ library under the hood.
@@ -24,32 +40,111 @@ def from_fit(data: bytes) -> GPX:
:param data: The input bytes.
:return: The converted structure.
+ :raises ConversionError: If conversion failed.
"""
fitfile = fitparse.FitFile(data)
+ start_time = None
points = []
for record in fitfile.get_messages("record"):
values = record.get_values()
try:
if any(values[field] is None for field in FIT_RECORD_FIELDS):
continue
- point = GPXTrackPoint(
+ time = values["timestamp"]
+ if start_time is None:
+ start_time = time
+ point = geo.Point(
latitude=semicircles_to_deg(values["position_lat"]),
longitude=semicircles_to_deg(values["position_long"]),
elevation=values["altitude"],
- time=values["timestamp"],
+ time_offset=(time - start_time).total_seconds(),
)
except KeyError:
pass
else:
points.append(point)
- track = GPXTrack()
- track.segments = [GPXTrackSegment(points)]
- gpx = GPX()
- gpx.tracks = [track]
- return gpx
+ path = geo.Path(points)
+ track = Track()
+ track.set_path(path)
+ track.date = start_time
+ return track
-def smart_convert(data: bytes) -> bytes:
+def from_gpx(data: bytes) -> Track:
+ """Reads a .gpx as track data.
+
+ This uses the gpxpy_ library under the hood.
+
+ .. _gpxpy: https://github.com/tkrajina/gpxpy
+
+ :param data: The input bytes.
+ :return: The converted structure.
+ :raises ConversionError: If conversion failed.
+ """
+ # pylint: disable=too-many-locals
+ gpx = gpxpy.parse(data)
+ points = []
+ start_time = None
+
+ for track in gpx.tracks:
+ for segment in track.segments:
+ for point in segment.points:
+ if start_time is None:
+ start_time = point.time
+
+ if point.time is not None and start_time is not None:
+ time_offset = (point.time - start_time).total_seconds()
+ else:
+ time_offset = 0
+ points.append(
+ geo.Point(
+ longitude=point.longitude,
+ latitude=point.latitude,
+ elevation=point.elevation or 0.0,
+ time_offset=time_offset,
+ )
+ )
+
+ timezone = util.guess_gpx_timezone(gpx)
+ date = gpx.time or gpx.get_time_bounds().start_time or datetime.datetime.now()
+ date = date.astimezone(timezone)
+ track_name = gpx.name
+ track_desc = gpx.description
+ for track in gpx.tracks:
+ if not track_name and track.name:
+ track_name = track.name
+ if not track_desc and track.description:
+ track_desc = track.description
+
+ path = geo.Path(points)
+ track = Track()
+ track.set_path(path)
+ track.title = track_name
+ track.description = track_desc
+ track.date = date
+
+ for waypoint in gpx.waypoints:
+ desc = None
+ # GPX waypoints can have both description and comment. It seems like
+ # comment is what is usually used (GPXViewer only shows the comment),
+ # so we'll prioritize that.
+ if waypoint.comment:
+ desc = waypoint.comment
+ if not desc and waypoint.description:
+ desc = waypoint.description
+ wpt = Waypoint(
+ longitude=waypoint.longitude,
+ latitude=waypoint.latitude,
+ elevation=waypoint.elevation,
+ name=waypoint.name,
+ description=desc,
+ )
+ track.waypoints.append(wpt)
+
+ return track
+
+
+def smart_convert(data: bytes) -> Track:
"""Tries to be smart in converting the input bytes.
This function automatically applies the correct conversion if possible.
@@ -59,10 +154,13 @@ def smart_convert(data: bytes) -> bytes:
:param data: The input bytes.
:return: The converted content.
+ :raises ConversionError: When conversion fails.
"""
if len(data) > 11 and data[9:12] == b"FIT":
- return from_fit(data).to_xml().encode("utf-8")
- return data
+ return from_fit(data)
+ if data.startswith(b"<?xml") and b"<gpx" in data[:200]:
+ return from_gpx(data)
+ raise UnknownFormat()
-__all__ = ["from_fit", "smart_convert"]
+__all__ = ["ConversionError", "from_fit", "from_gpx", "smart_convert"]