1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
"""Various utility functions for testing."""
import datetime
import gzip
import io
from pathlib import Path
from typing import NamedTuple
from playwright.sync_api import Page
from sqlalchemy import Engine
from sqlalchemy.orm import Session
from fietsboek import convert, models, util
from fietsboek.data import DataManager
class PopulationIds(NamedTuple):
"""Collection of database IDs that :func:`populate` returns."""
jon: int
davos: int
winterfell: int
riverrun: int
def load_test_asset(filename: str) -> bytes:
"""Load a test asset.
Unlike :func:`load_gpx_asset`, this function does not do gzip decompression.
:param filkename: Name of the asset to load.
:return: The content of the file as bytes.
"""
asset_dir = Path(__file__).parent / "assets"
return (asset_dir / filename).read_bytes()
def load_gpx_asset(filename: str) -> bytes:
"""Load a GPX test asset.
This looks in the tests/assets/ folder, reads and unzips the file and
returns its contents.
:param filename: Name of the asset to load.
:return: The content of the asset as bytes.
"""
asset_dir = Path(__file__).parent / 'assets'
test_file = asset_dir / filename
with gzip.open(test_file, 'rb') as fobj:
return fobj.read()
def extract_and_upload(page: Page, filename: str, tmp_path: Path):
"""Extracts the given test asset, fills in the upload form and presses
upload.
:param page: The playwright page on which to execute the actions.
:param filename: The filename.
:param tmp_path: The temporary path (as given by pytest).
"""
gpx_data = load_gpx_asset(filename)
gpx_path = tmp_path / "Upload.gpx"
with open(gpx_path, "wb") as gpx_fobj:
gpx_fobj.write(gpx_data)
page.get_by_label("GPX file").set_input_files(gpx_path)
page.locator(".bi-upload").click()
def populate(dbengine: Engine, data_manager: DataManager) -> PopulationIds:
"""Populates the database and data directory with some test data.
This adds:
Jon (jon.snow@nw.org, admin, password: ygritte)
Trip around Winterfell (2 images)
Road to Riverrun
Davos (davos@seaworth.com, password: 123456)
:return: An object carrying the database IDs of the added objects.
"""
user_ids = []
with Session(dbengine) as session:
user = models.User(name="Jon", email="jon.snow@nw.org", is_verified=True, is_admin=True)
user.set_password("ygritte")
user.roll_session_secret()
session.add(user)
session.flush()
user_ids.append(user.id)
user = models.User(name="Davos", email="davos@seaworth.com", is_verified=True)
user.set_password("123456")
user.roll_session_secret()
session.add(user)
session.flush()
user_ids.append(user.id)
session.commit()
for user_id in user_ids:
data_manager.initialize_user(user_id)
track_ids = []
# First track for Jon:
gpx_data = load_gpx_asset("Teasi_1.gpx.gz")
track = convert.smart_convert(gpx_data)
path = track.path()
track.points = []
track.owner_id = user_ids[0]
track.title = "Trip around Winterfell"
track.visibility = models.track.Visibility.PUBLIC
track.type = models.track.TrackType.ORGANIC
track.description = "I took my sister for a quick trip around Winterfell"
track.badges = []
track.link_secret = util.random_link_secret()
track.tagged_people = []
track.date = datetime.datetime(1984, 1, 2, 10, 11, tzinfo=datetime.UTC)
track.transformers = []
track.sync_tags({"westeros"})
with Session(dbengine) as session:
session.add(track)
session.flush()
assert track.id is not None
track_ids.append(track.id)
manager = data_manager.initialize(track.id)
manager.compress_backup(gpx_data)
track.fast_set_path(path)
track.ensure_cache(path)
session.add(track.cache)
image_name = manager.add_image(io.BytesIO(load_test_asset("picture01.jpg")), "PIC001.jpg")
image_meta = models.ImageMetadata(track=track, image_name=image_name)
image_meta.description = "Beautiful sight out of the Hunter's Gate"
session.add(image_meta)
image_name = manager.add_image(io.BytesIO(load_test_asset("picture02.jpg")), "PIC002.jpg")
image_meta = models.ImageMetadata(track=track, image_name=image_name)
image_meta.description = "Our steel wire horses ready to gallop"
session.add(image_meta)
session.commit()
# Second track for Jon:
gpx_data = load_gpx_asset("MyTourbook_1.gpx.gz")
track = convert.smart_convert(gpx_data)
path = track.path()
track.points = []
track.owner_id = user_ids[0]
track.title = "Road to Riverrun"
track.visibility = models.track.Visibility.PUBLIC
track.type = models.track.TrackType.ORGANIC
track.description = "Got a wedding to attend!"
track.badges = []
track.link_secret = util.random_link_secret()
track.tagged_people = []
track.date = datetime.datetime(1985, 8, 5, 16, 41, tzinfo=datetime.UTC)
track.transformers = []
track.sync_tags({"westeros"})
with Session(dbengine) as session:
session.add(track)
session.flush()
assert track.id is not None
track_ids.append(track.id)
manager = data_manager.initialize(track.id)
manager.compress_backup(gpx_data)
track.fast_set_path(path)
track.ensure_cache(path)
session.add(track.cache)
session.commit()
return PopulationIds(
jon=user_ids[0],
davos=user_ids[1],
winterfell=track_ids[0],
riverrun=track_ids[1],
)
|