aboutsummaryrefslogtreecommitdiff
path: root/fietsboek/convert.py
blob: 7815ef4dc3ed5cca37ce93f9d50ef53200afdbb6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Conversion functions to convert between various recording formats."""
import datetime
from typing import Optional

import fitparse
import gpxpy

from . import geo, util
from .models import Track, Waypoint

FIT_RECORD_FIELDS = ["position_lat", "position_long", "altitude", "timestamp"]


def semicircles_to_deg(circles: int) -> float:
    """Convert semicircles coordinate to degree coordinate.

    :param circles: The coordinate value in semicircles.
    :return: The coordinate in degrees.
    """
    return circles * (180 / 2**31)


def from_fit(data: bytes) -> Track:
    """Reads a .fit as track data.

    This uses the fitparse_ library under the hood.

    .. _fitparse: https://pypi.org/project/fitparse/

    :param data: The input bytes.
    :return: The converted structure.
    """
    fitfile = fitparse.FitFile(data)
    start_time = None
    points = []
    for record in fitfile.get_messages("record"):
        values = record.get_values()
        try:
            if any(values[field] is None for field in FIT_RECORD_FIELDS):
                continue
            time = values["timestamp"]
            if start_time is None:
                start_time = time
            point = geo.Point(
                latitude=semicircles_to_deg(values["position_lat"]),
                longitude=semicircles_to_deg(values["position_long"]),
                elevation=values["altitude"],
                time_offset=(time - start_time).total_seconds(),
            )
        except KeyError:
            pass
        else:
            points.append(point)
    path = geo.Path(points)
    track = Track()
    track.set_path(path)
    track.date = start_time
    return track


def from_gpx(data: bytes) -> Track:
    """Reads a .gpx as track data.

    This uses the gpxpy_ library under the hood.

    .. _gpxpy: https://github.com/tkrajina/gpxpy

    :param data: The input bytes.
    :return: The converted structure.
    """
    gpx = gpxpy.parse(data)
    points = []
    start_time = None

    for track in gpx.tracks:
        for segment in track.segments:
            for point in segment.points:
                if start_time is None:
                    start_time = point.time

                if point.time is not None and start_time is not None:
                    time_offset = (point.time - start_time).total_seconds()
                else:
                    time_offset = 0
                points.append(geo.Point(
                    longitude=point.longitude,
                    latitude=point.latitude,
                    elevation=point.elevation,
                    time_offset=time_offset,
                ))

    timezone = util.guess_gpx_timezone(gpx)
    date = gpx.time or gpx.get_time_bounds().start_time or datetime.datetime.now()
    date = date.astimezone(timezone)
    track_name = gpx.name
    track_desc = gpx.description
    for track in gpx.tracks:
        if not track_name and track.name:
            track_name = track.name
        if not track_desc and track.description:
            track_desc = track.description

    path = geo.Path(points)
    track = Track()
    track.set_path(path)
    track.title = track_name
    track.description = track_desc
    track.date = date

    for waypoint in gpx.waypoints:
        desc = None
        # GPX waypoints can have both description and comment. It seems like
        # comment is what is usually used (GPXViewer only shows the comment),
        # so we'll prioritize that.
        if waypoint.comment:
            desc = waypoint.comment
        if not desc and waypoint.description:
            desc = waypoint.description
        wpt = Waypoint(
            longitude=waypoint.longitude,
            latitude=waypoint.latitude,
            elevation=waypoint.elevation,
            name=waypoint.name,
            description=desc,
        )
        track.waypoints.append(wpt)

    return track


def smart_convert(data: bytes) -> Optional[Track]:
    """Tries to be smart in converting the input bytes.

    This function automatically applies the correct conversion if possible.

    Note that this function is not guaranteed to return valid GPX bytes. In the worst case,
    invalid bytes are simply passed through.

    :param data: The input bytes.
    :return: The converted content.
    """
    if len(data) > 11 and data[9:12] == b"FIT":
        return from_fit(data)
    if data.startswith(b"<?xml") and b"<gpx" in data[:200]:
        return from_gpx(data)
    return None


__all__ = ["from_fit", "from_gpx", "smart_convert"]