diff options
36 files changed, 908 insertions, 691 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index cb3c9ef..e500b7a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -17,7 +17,7 @@ cache: before_script: - python --version # For debugging - - pip install tox + - pip install 'tox<4' test: script: @@ -30,8 +30,12 @@ test-pypy: lint: script: - - tox -e pylint,flake8 + - tox -e pylint,flake8,black lint-tests: script: - tox -e pylint-tests + +mypy: + script: + - tox -e mypy diff --git a/.mypy.ini b/.mypy.ini new file mode 100644 index 0000000..ed220e3 --- /dev/null +++ b/.mypy.ini @@ -0,0 +1,17 @@ +[mypy] +follow_imports = silent +check_untyped_defs = True +allow_redefinition = True +exclude = fietsboek/updater/scripts/.+\.py + +[mypy-pyramid.*] +ignore_missing_imports = True + +[mypy-sqlalchemy.*] +ignore_missing_imports = True + +[mypy-webob.*] +ignore_missing_imports = True + +[mypy-zope.*] +ignore_missing_imports = True diff --git a/fietsboek/__init__.py b/fietsboek/__init__.py index 8e82131..e648cfa 100644 --- a/fietsboek/__init__.py +++ b/fietsboek/__init__.py @@ -17,7 +17,7 @@ from .pages import Pages from . import jinja2 as mod_jinja2, config as mod_config -__VERSION__ = importlib_metadata.version('fietsboek') +__VERSION__ = importlib_metadata.version("fietsboek") def locale_negotiator(request): @@ -48,8 +48,7 @@ def locale_negotiator(request): def main(_global_config, **settings): - """ This function returns a Pyramid WSGI application. - """ + """This function returns a Pyramid WSGI application.""" parsed_config = mod_config.parse(settings) def data_manager(request): @@ -73,13 +72,13 @@ def main(_global_config, **settings): def pages(_request): return page_manager - my_session_factory = SignedCookieSessionFactory(parsed_config.derive_secret('sessions')) + my_session_factory = SignedCookieSessionFactory(parsed_config.derive_secret("sessions")) with Configurator(settings=settings) as config: - config.include('pyramid_jinja2') - config.include('.routes') - config.include('.models') + config.include("pyramid_jinja2") + config.include(".routes") + config.include(".models") config.scan() - config.add_translation_dirs('fietsboek:locale/') + config.add_translation_dirs("fietsboek:locale/") config.set_session_factory(my_session_factory) config.set_security_policy(SecurityPolicy()) config.set_csrf_storage_policy(CookieCSRFStoragePolicy()) @@ -91,9 +90,9 @@ def main(_global_config, **settings): config.add_request_method(config_, name="config", reify=True) jinja2_env = config.get_jinja2_environment() - jinja2_env.filters['format_decimal'] = mod_jinja2.filter_format_decimal - jinja2_env.filters['format_datetime'] = mod_jinja2.filter_format_datetime - jinja2_env.filters['local_datetime'] = mod_jinja2.filter_local_datetime - jinja2_env.globals['embed_tile_layers'] = mod_jinja2.global_embed_tile_layers + jinja2_env.filters["format_decimal"] = mod_jinja2.filter_format_decimal + jinja2_env.filters["format_datetime"] = mod_jinja2.filter_format_datetime + jinja2_env.filters["local_datetime"] = mod_jinja2.filter_local_datetime + jinja2_env.globals["embed_tile_layers"] = mod_jinja2.global_embed_tile_layers return config.make_wsgi_app() diff --git a/fietsboek/alembic/env.py b/fietsboek/alembic/env.py index ae79f02..644d98b 100644 --- a/fietsboek/alembic/env.py +++ b/fietsboek/alembic/env.py @@ -25,7 +25,7 @@ def run_migrations_offline(): script output. """ - context.configure(url=settings['sqlalchemy.url']) + context.configure(url=settings["sqlalchemy.url"]) with context.begin_transaction(): context.run_migrations() @@ -37,13 +37,10 @@ def run_migrations_online(): and associate a connection with the context. """ - engine = engine_from_config(settings, prefix='sqlalchemy.') + engine = engine_from_config(settings, prefix="sqlalchemy.") connection = engine.connect() - context.configure( - connection=connection, - target_metadata=target_metadata - ) + context.configure(connection=connection, target_metadata=target_metadata) try: with context.begin_transaction(): diff --git a/fietsboek/config.py b/fietsboek/config.py index 05fd4f6..da31061 100644 --- a/fietsboek/config.py +++ b/fietsboek/config.py @@ -21,7 +21,12 @@ from enum import Enum import pydantic from pydantic import ( - BaseModel, Field, AnyUrl, DirectoryPath, validator, SecretStr, + BaseModel, + Field, + AnyUrl, + DirectoryPath, + validator, + SecretStr, ) from pyramid import settings from termcolor import colored @@ -46,8 +51,14 @@ KNOWN_PYRAMID_SETTINGS = { } KNOWN_TILE_LAYERS = [ - "osm", "osmde", "satellite", "opentopo", "topplusopen", - "opensea", "cycling", "hiking", + "osm", + "osmde", + "satellite", + "opentopo", + "topplusopen", + "opensea", + "cycling", + "hiking", ] @@ -61,15 +72,16 @@ class ValidationError(Exception): self.errors = errors def __str__(self): - lines = [''] + lines = [""] for where, error in self.errors: - lines.append(colored(f'Error in {where}:', 'red')) + lines.append(colored(f"Error in {where}:", "red")) lines.append(str(error)) return "\n".join(lines) class LayerType(Enum): """Enum to distinguish base layers and overlay layers.""" + BASE = "base" OVERLAY = "overlay" @@ -80,6 +92,7 @@ class LayerAccess(Enum): Note that in the future, a finer-grained distinction might be possible. """ + PUBLIC = "public" RESTRICTED = "restricted" @@ -142,7 +155,7 @@ class Config(BaseModel): session_key: str """Session key.""" - available_locales: PyramidList = ["en", "de"] + available_locales: PyramidList = PyramidList(["en", "de"]) """Available locales.""" email_from: str = Field(alias="email.from") @@ -160,8 +173,9 @@ class Config(BaseModel): pages: PyramidList = Field([], alias="fietsboek.pages") """Custom pages.""" - default_tile_layers: PyramidList = Field(KNOWN_TILE_LAYERS, - alias="fietsboek.default_tile_layers") + default_tile_layers: PyramidList = Field( + KNOWN_TILE_LAYERS, alias="fietsboek.default_tile_layers" + ) """The subset of the default tile layers that should be enabled. By default, that's all of them. @@ -173,8 +187,7 @@ class Config(BaseModel): thunderforest_maps: PyramidList = Field([], alias="thunderforest.maps") """List of enabled Thunderforest maps.""" - thunderforest_access: LayerAccess = Field(LayerAccess.RESTRICTED, - alias="thunderforest.access") + thunderforest_access: LayerAccess = Field(LayerAccess.RESTRICTED, alias="thunderforest.access") """Thunderforest access restriction.""" disable_tile_proxy: bool = Field(False, alias="fietsboek.tile_proxy.disable") @@ -196,7 +209,7 @@ class Config(BaseModel): def _known_smtp_url(cls, value): """Ensures that the SMTP URL is valid.""" parsed = urllib.parse.urlparse(value) - if parsed.scheme not in {'debug', 'smtp', 'smtp+ssl', 'smtp+starttls'}: + if parsed.scheme not in {"debug", "smtp", "smtp+ssl", "smtp+starttls"}: raise ValueError(f"Unknown mailing scheme {parsed.scheme}".strip()) return value @@ -240,27 +253,27 @@ def parse(config): continue provider_id = match.group(1) - prefix = f'{value}.' - inner = {k[len(prefix):]: v for (k, v) in config.items() if k.startswith(prefix)} - inner['layer_id'] = provider_id - inner['name'] = value + prefix = f"{value}." + inner = {k[len(prefix) :]: v for (k, v) in config.items() if k.startswith(prefix)} + inner["layer_id"] = provider_id + inner["name"] = value try: layer_config = TileLayerConfig.parse_obj(inner) tile_layers.append(layer_config) except pydantic.ValidationError as validation_error: - errors.append((f'tile layer {provider_id}', validation_error)) + errors.append((f"tile layer {provider_id}", validation_error)) keys.discard(key) for field in TileLayerConfig.__fields__.values(): - keys.discard(f'{prefix}{_field_name(field)}') + keys.discard(f"{prefix}{_field_name(field)}") - config['tile_layers'] = tile_layers + config["tile_layers"] = tile_layers # Now we can parse the main config try: config = Config.parse_obj(config) except pydantic.ValidationError as validation_error: - errors.append(('configuration', validation_error)) + errors.append(("configuration", validation_error)) if errors: raise ValidationError(errors) @@ -276,7 +289,7 @@ def parse(config): def _field_name(field): - alias = getattr(field, 'alias', None) + alias = getattr(field, "alias", None) if alias: return alias return field.name diff --git a/fietsboek/data.py b/fietsboek/data.py index c1cd214..bd4222b 100644 --- a/fietsboek/data.py +++ b/fietsboek/data.py @@ -76,6 +76,7 @@ class DataManager: :param track_id: The ID of the track. :type track_id: int """ + def log_error(_, path, exc_info): LOGGER.warning("Failed to remove %s", path, exc_info=exc_info) @@ -132,7 +133,7 @@ class DataManager: """ # Be sure to not delete anything else than the image file image_id = secure_filename(image_id) - if '/' in image_id or '\\' in image_id: + if "/" in image_id or "\\" in image_id: return path = self.image_path(track_id, image_id) path.unlink() diff --git a/fietsboek/email.py b/fietsboek/email.py index 1ebb740..78b0493 100644 --- a/fietsboek/email.py +++ b/fietsboek/email.py @@ -25,12 +25,12 @@ def prepare_message(sender, addr_to, subject): :rtype: email.message.EmailMessage """ message = EmailMessage() - message['To'] = addr_to - if '<' not in sender and '>' not in sender: - message['From'] = f'Fietsboek <{sender}>' + message["To"] = addr_to + if "<" not in sender and ">" not in sender: + message["From"] = f"Fietsboek <{sender}>" else: - message['From'] = sender - message['Subject'] = subject + message["From"] = sender + message["Subject"] = subject return message @@ -49,15 +49,15 @@ def send_message(server_url, username, password, message): :type message: email.message.EmailMessage """ parsed_url = urlparse(server_url) - if parsed_url.scheme == 'debug': + if parsed_url.scheme == "debug": print(message, file=sys.stderr) return try: - if parsed_url.scheme == 'smtp': + if parsed_url.scheme == "smtp": client = smtplib.SMTP(parsed_url.hostname, parsed_url.port) - elif parsed_url.scheme == 'smtp+ssl': + elif parsed_url.scheme == "smtp+ssl": client = smtplib.SMTP_SSL(parsed_url.hostname, parsed_url.port) - elif parsed_url.scheme == 'smtp+starttls': + elif parsed_url.scheme == "smtp+starttls": client = smtplib.SMTP(parsed_url.hostname, parsed_url.port) client.starttls() if username and password: diff --git a/fietsboek/jinja2.py b/fietsboek/jinja2.py index e7ef522..6e5e7b6 100644 --- a/fietsboek/jinja2.py +++ b/fietsboek/jinja2.py @@ -22,7 +22,7 @@ def filter_format_decimal(ctx, value): :return: The formatted decimal. :rtype: str """ - request = ctx.get('request') + request = ctx.get("request") locale = request.localizer.locale_name return format_decimal(value, locale=locale) @@ -38,7 +38,7 @@ def filter_format_datetime(ctx, value): :return: The formatted date. :rtype: str """ - request = ctx.get('request') + request = ctx.get("request") locale = request.localizer.locale_name return format_datetime(value, locale=locale) @@ -69,7 +69,7 @@ def filter_local_datetime(ctx, value): else: value = value.astimezone(datetime.timezone.utc) - request = ctx.get('request') + request = ctx.get("request") locale = request.localizer.locale_name fallback = Markup.escape(format_datetime(value, locale=locale)) @@ -93,24 +93,34 @@ def global_embed_tile_layers(request): """ # pylint: disable=import-outside-toplevel,cyclic-import from .views import tileproxy + tile_sources = tileproxy.sources_for(request) if request.config.disable_tile_proxy: + def _url(source): return source.url_template + else: + def _url(source): - return (request.route_url("tile-proxy", provider=source.key, x="{x}", y="{y}", z="{z}") - .replace("%7Bx%7D", "{x}") - .replace("%7By%7D", "{y}") - .replace("%7Bz%7D", "{z}")) - - return Markup(json.dumps([ - { - "name": source.name, - "url": _url(source), - "attribution": source.attribution, - "type": source.layer_type.value, - } - for source in tile_sources - ])) + return ( + request.route_url("tile-proxy", provider=source.key, x="{x}", y="{y}", z="{z}") + .replace("%7Bx%7D", "{x}") + .replace("%7By%7D", "{y}") + .replace("%7Bz%7D", "{z}") + ) + + return Markup( + json.dumps( + [ + { + "name": source.name, + "url": _url(source), + "attribution": source.attribution, + "type": source.layer_type.value, + } + for source in tile_sources + ] + ) + ) diff --git a/fietsboek/models/__init__.py b/fietsboek/models/__init__.py index 53feb22..828b689 100644 --- a/fietsboek/models/__init__.py +++ b/fietsboek/models/__init__.py @@ -21,7 +21,7 @@ from .image import ImageMetadata # flake8: noqa configure_mappers() -def get_engine(settings, prefix='sqlalchemy.'): +def get_engine(settings, prefix="sqlalchemy."): """Create an SQL Engine from the given settings.""" return engine_from_config(settings, prefix) @@ -89,9 +89,7 @@ def get_tm_session(session_factory, transaction_manager, request=None): request = dbsession.info["request"] """ dbsession = session_factory(info={"request": request}) - zope.sqlalchemy.register( - dbsession, transaction_manager=transaction_manager - ) + zope.sqlalchemy.register(dbsession, transaction_manager=transaction_manager) return dbsession @@ -103,7 +101,7 @@ def includeme(config): """ settings = config.get_settings() - settings['tm.manager_hook'] = 'pyramid_tm.explicit_manager' + settings["tm.manager_hook"] = "pyramid_tm.explicit_manager" # Use ``pyramid_tm`` to hook the transaction lifecycle to the request. # Note: the packages ``pyramid_tm`` and ``transaction`` work together to @@ -111,28 +109,26 @@ def includeme(config): # If your project migrates away from ``pyramid_tm``, you may need to use a # Pyramid callback function to close the database session after each # request. - config.include('pyramid_tm') + config.include("pyramid_tm") # use pyramid_retry to retry a request when transient exceptions occur - config.include('pyramid_retry') + config.include("pyramid_retry") # hook to share the dbengine fixture in testing - dbengine = settings.get('dbengine') + dbengine = settings.get("dbengine") if not dbengine: dbengine = get_engine(settings) session_factory = get_session_factory(dbengine) - config.registry['dbsession_factory'] = session_factory + config.registry["dbsession_factory"] = session_factory # make request.dbsession available for use in Pyramid def dbsession(request): # hook to share the dbsession fixture in testing - dbsession = request.environ.get('app.dbsession') + dbsession = request.environ.get("app.dbsession") if dbsession is None: # request.tm is the transaction manager used by pyramid_tm - dbsession = get_tm_session( - session_factory, request.tm, request=request - ) + dbsession = get_tm_session(session_factory, request.tm, request=request) return dbsession config.add_request_method(dbsession, reify=True) diff --git a/fietsboek/models/badge.py b/fietsboek/models/badge.py index 3bbe714..f16e9bf 100644 --- a/fietsboek/models/badge.py +++ b/fietsboek/models/badge.py @@ -27,13 +27,14 @@ class Badge(Base): :ivar tracks: Tracks associated with this badge. :vartype tracks: list[fietsboek.models.track.Track] """ + # pylint: disable=too-few-public-methods - __tablename__ = 'badges' + __tablename__ = "badges" id = Column(Integer, primary_key=True) title = Column(Text) image = Column(LargeBinary) - tracks = relationship('Track', secondary='track_badge_assoc', back_populates='badges') + tracks = relationship("Track", secondary="track_badge_assoc", back_populates="badges") @classmethod def factory(cls, request): diff --git a/fietsboek/models/comment.py b/fietsboek/models/comment.py index 23f1871..386dfce 100644 --- a/fietsboek/models/comment.py +++ b/fietsboek/models/comment.py @@ -31,6 +31,7 @@ class Comment(Base): :ivar track: Track that the comment belongs to. :vartype track: fietsboek.model.track.Track """ + # pylint: disable=too-few-public-methods __tablename__ = "comments" id = Column(Integer, primary_key=True) @@ -40,5 +41,5 @@ class Comment(Base): title = Column(Text) text = Column(Text) - author = relationship('User', back_populates='comments') - track = relationship('Track', back_populates='comments') + author = relationship("User", back_populates="comments") + track = relationship("Track", back_populates="comments") diff --git a/fietsboek/models/image.py b/fietsboek/models/image.py index 4037619..cf507ec 100644 --- a/fietsboek/models/image.py +++ b/fietsboek/models/image.py @@ -30,6 +30,7 @@ class ImageMetadata(Base): :ivar track: The track that this image belongs to. :vartype track: fietsboek.models.track.Track """ + # pylint: disable=too-few-public-methods __tablename__ = "image_metadata" id = Column(Integer, primary_key=True) @@ -37,9 +38,9 @@ class ImageMetadata(Base): image_name = Column(Text, nullable=False) description = Column(Text) - track = relationship('Track', back_populates='images') + track = relationship("Track", back_populates="images") - __table_args__ = (UniqueConstraint('track_id', 'image_name'),) + __table_args__ = (UniqueConstraint("track_id", "image_name"),) @classmethod def get_or_create(cls, dbsession, track, image_name): diff --git a/fietsboek/models/meta.py b/fietsboek/models/meta.py index 87c82b2..6b11a09 100644 --- a/fietsboek/models/meta.py +++ b/fietsboek/models/meta.py @@ -10,7 +10,7 @@ NAMING_CONVENTION = { "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", - "pk": "pk_%(table_name)s" + "pk": "pk_%(table_name)s", } metadata = MetaData(naming_convention=NAMING_CONVENTION) diff --git a/fietsboek/models/track.py b/fietsboek/models/track.py index ce7b4d0..9eeae55 100644 --- a/fietsboek/models/track.py +++ b/fietsboek/models/track.py @@ -35,7 +35,12 @@ from sqlalchemy.orm import relationship from pyramid.httpexceptions import HTTPNotFound from pyramid.i18n import TranslationString as _ from pyramid.authorization import ( - Allow, Everyone, Authenticated, ALL_PERMISSIONS, ACLHelper, ACLAllowed, + Allow, + Everyone, + Authenticated, + ALL_PERMISSIONS, + ACLHelper, + ACLAllowed, ) from markupsafe import Markup @@ -58,12 +63,13 @@ class Tag(Base): :ivar track: The track object that this tag belongs to. :vartype track: Track """ + # pylint: disable=too-few-public-methods - __tablename__ = 'tags' + __tablename__ = "tags" track_id = Column(Integer, ForeignKey("tracks.id"), primary_key=True) tag = Column(Text, primary_key=True) - track = relationship('Track', back_populates='tags') + track = relationship("Track", back_populates="tags") class Visibility(enum.Enum): @@ -72,6 +78,7 @@ class Visibility(enum.Enum): Note that the track is always visible to tagged people and via the sharing link. """ + PRIVATE = enum.auto() """Only the owner of the track can see it.""" FRIENDS = enum.auto() @@ -178,9 +185,10 @@ class Track(Base): :ivar images: Metadata of the images saved for this track. :vartype images: list[fietsboek.models.image.ImageMetadata] """ - __tablename__ = 'tracks' + + __tablename__ = "tracks" id = Column(Integer, primary_key=True) - owner_id = Column(Integer, ForeignKey('users.id')) + owner_id = Column(Integer, ForeignKey("users.id")) title = Column(Text) description = Column(Text) date_raw = Column(DateTime) @@ -190,15 +198,17 @@ class Track(Base): link_secret = Column(Text) type = Column(Enum(TrackType)) - owner = relationship('User', back_populates='tracks') - cache = relationship('TrackCache', back_populates='track', uselist=False, - cascade="all, delete-orphan") - tagged_people = relationship('User', secondary=track_people_assoc, - back_populates='tagged_tracks') - badges = relationship('Badge', secondary=track_badge_assoc, back_populates='tracks') - tags = relationship('Tag', back_populates='track', cascade="all, delete-orphan") - comments = relationship('Comment', back_populates='track', cascade="all, delete-orphan") - images = relationship('ImageMetadata', back_populates='track', cascade="all, delete-orphan") + owner = relationship("User", back_populates="tracks") + cache = relationship( + "TrackCache", back_populates="track", uselist=False, cascade="all, delete-orphan" + ) + tagged_people = relationship( + "User", secondary=track_people_assoc, back_populates="tagged_tracks" + ) + badges = relationship("Badge", secondary=track_badge_assoc, back_populates="tracks") + tags = relationship("Tag", back_populates="track", cascade="all, delete-orphan") + comments = relationship("Comment", back_populates="track", cascade="all, delete-orphan") + images = relationship("ImageMetadata", back_populates="track", cascade="all, delete-orphan") @classmethod def factory(cls, request): @@ -214,7 +224,7 @@ class Track(Base): :return: The track. :type: Track """ - track_id = request.matchdict['track_id'] + track_id = request.matchdict["track_id"] query = select(cls).filter_by(id=track_id) track = request.dbsession.execute(query).scalar_one_or_none() if track is None: @@ -224,24 +234,27 @@ class Track(Base): def __acl__(self): # Basic ACL: Permissions for the admin, the owner and the share link acl = [ - (Allow, 'group:admins', ALL_PERMISSIONS), - (Allow, f'user:{self.owner_id}', - ['track.view', 'track.edit', 'track.unshare', 'track.comment']), - (Allow, f'secret:{self.link_secret}', 'track.view'), + (Allow, "group:admins", ALL_PERMISSIONS), + ( + Allow, + f"user:{self.owner_id}", + ["track.view", "track.edit", "track.unshare", "track.comment"], + ), + (Allow, f"secret:{self.link_secret}", "track.view"), ] # Tagged people may always see the track for tagged in self.tagged_people: - acl.append((Allow, f'user:{tagged.id}', ['track.view', 'track.comment'])) + acl.append((Allow, f"user:{tagged.id}", ["track.view", "track.comment"])) if self.visibility == Visibility.PUBLIC: - acl.append((Allow, Everyone, 'track.view')) - acl.append((Allow, Authenticated, 'track.comment')) + acl.append((Allow, Everyone, "track.view")) + acl.append((Allow, Authenticated, "track.comment")) elif self.visibility == Visibility.LOGGED_IN: - acl.append((Allow, Authenticated, ['track.view', 'track.comment'])) + acl.append((Allow, Authenticated, ["track.view", "track.comment"])) elif self.visibility == Visibility.FRIENDS: acl.extend( - (Allow, f'user:{friend.id}', ['track.view', 'track.comment']) + (Allow, f"user:{friend.id}", ["track.view", "track.comment"]) for friend in self.owner.get_friends() ) elif self.visibility == Visibility.FRIENDS_TAGGED: @@ -251,7 +264,7 @@ class Track(Base): for friend in person.get_friends() ) acl.extend( - (Allow, f'user:{friend.id}', ['track.view', 'track.comment']) + (Allow, f"user:{friend.id}", ["track.view", "track.comment"]) for friend in all_friends ) return acl @@ -294,8 +307,11 @@ class Track(Base): @date.setter def date(self, value): if value.tzinfo is None: - LOGGER.debug('Non-aware datetime passed (track_id=%d, value=%s), assuming offset=0', - self.id or -1, value) + LOGGER.debug( + "Non-aware datetime passed (track_id=%d, value=%s), assuming offset=0", + self.id or -1, + value, + ) self.date_tz = 0 else: self.date_tz = value.tzinfo.utcoffset(value).total_seconds() // 60 @@ -325,7 +341,7 @@ class Track(Base): if user: principals.append(Authenticated) principals.extend(user.principals()) - result = ACLHelper().permits(self, principals, 'track.view') + result = ACLHelper().permits(self, principals, "track.view") return isinstance(result, ACLAllowed) def ensure_cache(self): @@ -519,19 +535,24 @@ class Track(Base): :return: The generated HTML. :rtype: Markup """ + def number(num): return format_decimal(num, locale=localizer.locale_name) rows = [ - (_("tooltip.table.length"), f'{number(round(self.length / 1000, 2))} km'), - (_("tooltip.table.uphill"), f'{number(round(self.uphill, 2))} m'), - (_("tooltip.table.downhill"), f'{number(round(self.downhill, 2))} m'), - (_("tooltip.table.moving_time"), f'{self.moving_time}'), - (_("tooltip.table.stopped_time"), f'{self.stopped_time}'), - (_("tooltip.table.max_speed"), - f'{number(round(util.mps_to_kph(self.max_speed), 2))} km/h'), - (_("tooltip.table.avg_speed"), - f'{number(round(util.mps_to_kph(self.avg_speed), 2))} km/h'), + (_("tooltip.table.length"), f"{number(round(self.length / 1000, 2))} km"), + (_("tooltip.table.uphill"), f"{number(round(self.uphill, 2))} m"), + (_("tooltip.table.downhill"), f"{number(round(self.downhill, 2))} m"), + (_("tooltip.table.moving_time"), f"{self.moving_time}"), + (_("tooltip.table.stopped_time"), f"{self.stopped_time}"), + ( + _("tooltip.table.max_speed"), + f"{number(round(util.mps_to_kph(self.max_speed), 2))} km/h", + ), + ( + _("tooltip.table.avg_speed"), + f"{number(round(util.mps_to_kph(self.avg_speed), 2))} km/h", + ), ] rows = [ f"<tr><td>{localizer.translate(name)}</td><td>{value}</td></tr>" @@ -574,9 +595,10 @@ class TrackCache(Base): :ivar track: The track that belongs to this cache entry. :vartype track: Track """ + # pylint: disable=too-many-instance-attributes,too-few-public-methods - __tablename__ = 'track_cache' - track_id = Column(Integer, ForeignKey('tracks.id'), primary_key=True) + __tablename__ = "track_cache" + track_id = Column(Integer, ForeignKey("tracks.id"), primary_key=True) length = Column(Float) uphill = Column(Float) downhill = Column(Float) @@ -589,7 +611,7 @@ class TrackCache(Base): end_time_raw = Column(DateTime) end_time_tz = Column(Integer) - track = relationship('Track', back_populates='cache') + track = relationship("Track", back_populates="cache") @property def start_time(self): @@ -607,8 +629,11 @@ class TrackCache(Base): @start_time.setter def start_time(self, value): if value.tzinfo is None: - LOGGER.debug('Non-aware datetime passed (cache_id=%d, value=%s), assuming offset=0', - self.id or -1, value) + LOGGER.debug( + "Non-aware datetime passed (cache_id=%d, value=%s), assuming offset=0", + self.id or -1, + value, + ) self.start_time_tz = 0 else: self.start_time_tz = value.tzinfo.utcoffset(value).total_seconds() // 60 @@ -630,8 +655,11 @@ class TrackCache(Base): @end_time.setter def end_time(self, value): if value.tzinfo is None: - LOGGER.debug('Non-aware datetime passed (cache_id=%d, value=%s), assuming offset=0', - self.id or -1, value) + LOGGER.debug( + "Non-aware datetime passed (cache_id=%d, value=%s), assuming offset=0", + self.id or -1, + value, + ) self.end_time_tz = 0 else: self.end_time_tz = value.tzinfo.utcoffset(value).total_seconds() // 60 @@ -655,14 +683,15 @@ class Upload(Base): :ivar owner: Uploader of this track. :vartype owner: fietsboek.model.user.User """ + # pylint: disable=too-many-instance-attributes,too-few-public-methods - __tablename__ = 'uploads' + __tablename__ = "uploads" id = Column(Integer, primary_key=True) uploaded_at = Column(DateTime) - owner_id = Column(Integer, ForeignKey('users.id')) + owner_id = Column(Integer, ForeignKey("users.id")) gpx = Column(LargeBinary) - owner = relationship('User', back_populates='uploads') + owner = relationship("User", back_populates="uploads") @classmethod def factory(cls, request): @@ -678,7 +707,7 @@ class Upload(Base): :return: The upload. :type: Track """ - query = select(cls).filter_by(id=request.matchdict['upload_id']) + query = select(cls).filter_by(id=request.matchdict["upload_id"]) upload = request.dbsession.execute(query).scalar_one_or_none() if upload is None: raise HTTPNotFound() @@ -686,8 +715,8 @@ class Upload(Base): def __acl__(self): return [ - (Allow, 'group:admins', ALL_PERMISSIONS), - (Allow, f'user:{self.owner_id}', 'upload.finish'), + (Allow, "group:admins", ALL_PERMISSIONS), + (Allow, f"user:{self.owner_id}", "upload.finish"), ] @property diff --git a/fietsboek/models/user.py b/fietsboek/models/user.py index 3a267d8..0fe7877 100644 --- a/fietsboek/models/user.py +++ b/fietsboek/models/user.py @@ -41,10 +41,10 @@ class PasswordMismatch(Exception): # The parameters were chosen according to the documentation in # https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#cryptography.hazmat.primitives.kdf.scrypt.Scrypt SCRYPT_PARAMETERS = { - 'length': 32, - 'n': 2**14, - 'r': 8, - 'p': 1, + "length": 32, + "n": 2**14, + "r": 8, + "p": 1, } SALT_LENGTH = 32 @@ -87,7 +87,8 @@ class User(Base): :ivar comments: List of comments left by this user. :vartype comments: list[fietsboek.model.comment.Comment] """ - __tablename__ = 'users' + + __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(Text) password = Column(LargeBinary) @@ -96,18 +97,27 @@ class User(Base): is_admin = Column(Boolean, default=False) is_verified = Column(Boolean, default=False) - tracks = relationship('Track', back_populates='owner', cascade="all, delete-orphan") - tagged_tracks = relationship('Track', secondary='track_people_assoc', - back_populates='tagged_people') - uploads = relationship('Upload', back_populates='owner', cascade="all, delete-orphan") - tokens = relationship('Token', back_populates='user', cascade="all, delete-orphan") - comments = relationship('Comment', back_populates='author', cascade="all, delete-orphan") + tracks = relationship("Track", back_populates="owner", cascade="all, delete-orphan") + tagged_tracks = relationship( + "Track", secondary="track_people_assoc", back_populates="tagged_people" + ) + uploads = relationship("Upload", back_populates="owner", cascade="all, delete-orphan") + tokens = relationship("Token", back_populates="user", cascade="all, delete-orphan") + comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan") # We don't use them, but include them to ensure our cascading works - friends_1 = relationship('User', secondary='friends_assoc', back_populates='friends_2', - foreign_keys=[friends_assoc.c.user_1_id]) - friends_2 = relationship('User', secondary='friends_assoc', back_populates='friends_1', - foreign_keys=[friends_assoc.c.user_2_id]) + friends_1 = relationship( + "User", + secondary="friends_assoc", + back_populates="friends_2", + foreign_keys=[friends_assoc.c.user_1_id], + ) + friends_2 = relationship( + "User", + secondary="friends_assoc", + back_populates="friends_1", + foreign_keys=[friends_assoc.c.user_2_id], + ) @classmethod def query_by_email(cls, email): @@ -131,7 +141,7 @@ class User(Base): :param new_password: The new password of the user. :type new_password: str """ - new_password = new_password.encode('utf-8') + new_password = new_password.encode("utf-8") salt = secrets.token_bytes(SALT_LENGTH) scrypt = Scrypt(salt=salt, **SCRYPT_PARAMETERS) password = scrypt.derive(new_password) @@ -148,7 +158,7 @@ class User(Base): :param password: The password to check. :type password: str """ - password = password.encode('utf-8') + password = password.encode("utf-8") scrypt = Scrypt(salt=self.salt, **SCRYPT_PARAMETERS) try: scrypt.verify(password, self.password) @@ -164,9 +174,9 @@ class User(Base): :return: The seceurity principals that this user fulfills. :rtype: list[str] """ - principals = [f'user:{self.id}'] + principals = [f"user:{self.id}"] if self.is_admin: - principals.append('group:admins') + principals.append("group:admins") return principals def all_tracks_query(self): @@ -191,6 +201,7 @@ class User(Base): # Late import to avoid cycles # pylint: disable=import-outside-toplevel from .track import Track + own = select(Track).where(with_parent(self, User.tracks)) friends = select(Track).where(with_parent(self, User.tagged_tracks)) # Create a fresh select so we can apply filter operations @@ -214,6 +225,7 @@ class User(Base): # Late import to avoid cycles # pylint: disable=import-outside-toplevel,protected-access from .track import Track, Visibility, track_people_assoc + # We build the list of visible tracks in multiple steps, and then union # them later. queries = [] @@ -233,8 +245,9 @@ class User(Base): select(Track) # The owner also counts as a "tagged person", so we need to # include FRIENDS_TAGGED here as well. - .where(Track.visibility.in_([Visibility.FRIENDS, Visibility.FRIENDS_TAGGED])) - .where(Track.owner_id.in_(friend_ids)) + .where(Track.visibility.in_([Visibility.FRIENDS, Visibility.FRIENDS_TAGGED])).where( + Track.owner_id.in_(friend_ids) + ) ) # Step 5: Am I a friend of a tagged person? # We do this via a big join: @@ -261,12 +274,12 @@ class User(Base): return union(*queries) def _friend_query(self): - qry1 = (select(User) - .filter(friends_assoc.c.user_1_id == self.id, - friends_assoc.c.user_2_id == User.id)) - qry2 = (select(User) - .filter(friends_assoc.c.user_2_id == self.id, - friends_assoc.c.user_1_id == User.id)) + qry1 = select(User).filter( + friends_assoc.c.user_1_id == self.id, friends_assoc.c.user_2_id == User.id + ) + qry2 = select(User).filter( + friends_assoc.c.user_2_id == self.id, friends_assoc.c.user_1_id == User.id + ) return union(qry1, qry2) def get_friends(self): @@ -314,7 +327,7 @@ class User(Base): return reduce(lambda acc, track: acc | track.text_tags(), self.tracks, set()) -Index('idx_users_email', User.email, unique=True) +Index("idx_users_email", User.email, unique=True) class FriendRequest(Base): @@ -333,19 +346,22 @@ class FriendRequest(Base): :ivar recipient: Recipient of the friendship. :vartype recipient: User """ + # pylint: disable=too-few-public-methods - __tablename__ = 'friend_requests' + __tablename__ = "friend_requests" id = Column(Integer, primary_key=True) - sender_id = Column(Integer, ForeignKey('users.id')) - recipient_id = Column(Integer, ForeignKey('users.id')) + sender_id = Column(Integer, ForeignKey("users.id")) + recipient_id = Column(Integer, ForeignKey("users.id")) date = Column(DateTime) - sender = relationship('User', primaryjoin='User.id == FriendRequest.sender_id', - backref='outgoing_requests') - recipient = relationship('User', primaryjoin='User.id == FriendRequest.recipient_id', - backref='incoming_requests') + sender = relationship( + "User", primaryjoin="User.id == FriendRequest.sender_id", backref="outgoing_requests" + ) + recipient = relationship( + "User", primaryjoin="User.id == FriendRequest.recipient_id", backref="incoming_requests" + ) - __table_args__ = (UniqueConstraint('sender_id', 'recipient_id'),) + __table_args__ = (UniqueConstraint("sender_id", "recipient_id"),) class TokenType(enum.Enum): @@ -354,6 +370,7 @@ class TokenType(enum.Enum): A token can be used either to verify the user's email, or it can be used to reset the password. """ + VERIFY_EMAIL = enum.auto() """A token that can be used to verify a user's email.""" RESET_PASSWORD = enum.auto() @@ -383,15 +400,16 @@ class Token(Base): :ivar user: User that this token belongs to. :vartype user: User """ + # pylint: disable=too-few-public-methods __tablename__ = "tokens" id = Column(Integer, primary_key=True) - user_id = Column(Integer, ForeignKey('users.id')) + user_id = Column(Integer, ForeignKey("users.id")) uuid = Column(Text) token_type = Column(Enum(TokenType)) date = Column(DateTime) - user = relationship('User', back_populates='tokens') + user = relationship("User", back_populates="tokens") @classmethod def generate(cls, user, token_type): @@ -409,4 +427,4 @@ class Token(Base): return cls(user=user, uuid=token_uuid, date=now, token_type=token_type) -Index('idx_token_uuid', Token.uuid, unique=True) +Index("idx_token_uuid", Token.uuid, unique=True) diff --git a/fietsboek/pages.py b/fietsboek/pages.py index e94a493..f11b126 100644 --- a/fietsboek/pages.py +++ b/fietsboek/pages.py @@ -1,6 +1,7 @@ """Module containing logic to support "static" pages.""" import enum import re +from typing import List, Optional import markdown @@ -41,8 +42,16 @@ class Page: :vartype menu_index: int """ - def __init__(self, slug, title, content, link_name, locale_filter=None, - user_filter=UserFilter.EVERYONE, menu_index=0): + def __init__( + self, + slug, + title, + content, + link_name, + locale_filter=None, + user_filter=UserFilter.EVERYONE, + menu_index=0, + ): # pylint: disable=too-many-arguments self.slug = slug self.title = title @@ -68,8 +77,7 @@ class Page: if self.locale_filter is not None: return any( - lfilter.match(request.localizer.locale_name) - for lfilter in self.locale_filter + lfilter.match(request.localizer.locale_name) for lfilter in self.locale_filter ) return True @@ -91,36 +99,39 @@ class Page: parser = markdown.Markdown(extensions=["meta"]) content = parser.convert(text) - title = parser.Meta.get('title', [''])[0] + title = parser.Meta.get("title", [""])[0] # type: ignore if not title: raise PageException("Missing `title`") - link_name = parser.Meta.get('link-name', [''])[0] + link_name = parser.Meta.get("link-name", [""])[0] # type: ignore if not link_name: raise PageException("Missing `link-name`") - slug = parser.Meta.get('slug', [''])[0] + slug = parser.Meta.get("slug", [""])[0] # type: ignore if not slug: raise PageException("Missing `slug`") + locale_filter: Optional[List[re.Pattern]] try: - locale_filter = list(map(re.compile, parser.Meta.get('locale', []))) + locale_filter = list(map(re.compile, parser.Meta.get("locale", []))) # type: ignore except re.error as exc: raise PageException("Invalid locale regex") from exc if not locale_filter: locale_filter = None filter_map = { - 'logged-in': UserFilter.LOGGED_IN, - 'logged-out': UserFilter.LOGGED_OUT, - 'everyone': UserFilter.EVERYONE, + "logged-in": UserFilter.LOGGED_IN, + "logged-out": UserFilter.LOGGED_OUT, + "everyone": UserFilter.EVERYONE, } - user_filter = filter_map.get(parser.Meta.get('show-to', ['everyone'])[0].lower()) + user_filter = filter_map.get( + parser.Meta.get("show-to", ["everyone"])[0].lower() # type: ignore + ) if user_filter is None: raise PageException("Invalid `show-to` filter") try: - menu_index = int(parser.Meta.get('index', ['0'])[0]) + menu_index = int(parser.Meta.get("index", ["0"])[0]) # type: ignore except ValueError as exc: raise PageException("Invalid value for `index`") from exc @@ -195,10 +206,7 @@ class Pages: :return: A list of menu entries to show. :rtype: list[Page] """ - return [ - page for page in self.collection - if page.menu_index < 0 and page.matches(request) - ] + return [page for page in self.collection if page.menu_index < 0 and page.matches(request)] def post_menu_items(self, request): """Return all items that should appear after Fietsboek's main menu. @@ -208,7 +216,4 @@ class Pages: :return: A list of menu entries to show. :rtype: list[Page] """ - return [ - page for page in self.collection - if page.menu_index > 0 and page.matches(request) - ] + return [page for page in self.collection if page.menu_index > 0 and page.matches(request)] diff --git a/fietsboek/pshell.py b/fietsboek/pshell.py index cc6988f..0907e80 100644 --- a/fietsboek/pshell.py +++ b/fietsboek/pshell.py @@ -8,12 +8,12 @@ def setup(env): :param env: The environment to set up. :type env: pyramid.scripting.AppEnvironment """ - request = env['request'] + request = env["request"] # start a transaction request.tm.begin() # inject some vars into the shell builtins - env['tm'] = request.tm - env['dbsession'] = request.dbsession - env['models'] = models + env["tm"] = request.tm + env["dbsession"] = request.dbsession + env["models"] = models diff --git a/fietsboek/routes.py b/fietsboek/routes.py index 9286f13..9e71686 100644 --- a/fietsboek/routes.py +++ b/fietsboek/routes.py @@ -3,57 +3,61 @@ def includeme(config): # pylint: disable=missing-function-docstring - config.add_static_view('static', 'static', cache_max_age=3600) - config.add_route('home', '/') - config.add_route('login', '/login') - config.add_route('logout', '/logout') - config.add_route('browse', '/track/') - - config.add_route('static-page', '/page/{slug}') - - config.add_route('track-archive', '/track/archive') - - config.add_route('password-reset', '/password-reset') - config.add_route('use-token', '/token/{uuid}') - config.add_route('create-account', '/create-account') - - config.add_route('upload', '/upload') - config.add_route('preview', '/preview/{upload_id}.gpx', - factory='fietsboek.models.Upload.factory') - config.add_route('finish-upload', '/upload/{upload_id}', - factory='fietsboek.models.Upload.factory') - config.add_route('cancel-upload', '/cancel/{upload_id}', - factory='fietsboek.models.Upload.factory') - - config.add_route('details', '/track/{track_id}', - factory='fietsboek.models.Track.factory') - config.add_route('edit', '/track/{track_id}/edit', - factory='fietsboek.models.Track.factory') - config.add_route('gpx', '/gpx/{track_id}.gpx', - factory='fietsboek.models.Track.factory') - config.add_route('invalidate-share', '/track/{track_id}/invalidate-link', - factory='fietsboek.models.Track.factory') - config.add_route('delete-track', '/track/{track_id}/delete', - factory='fietsboek.models.Track.factory') - config.add_route('add-comment', '/track/{track_id}/comment', - factory='fietsboek.models.Track.factory') - config.add_route('image', '/track/{track_id}/images/{image_name}', - factory='fietsboek.models.Track.factory') - - config.add_route('badge', '/badge/{badge_id}', - factory='fietsboek.models.Badge.factory') - - config.add_route('admin', '/admin') - config.add_route('admin-badge-add', '/admin/add-badge') - config.add_route('admin-badge-edit', '/admin/edit-badge') - config.add_route('admin-badge-delete', '/admin/delete-badge') - - config.add_route('profile', '/me') - config.add_route('change-profile', '/me/personal-data') - config.add_route('add-friend', '/me/send-friend-request') - config.add_route('delete-friend', '/me/delete-friend') - config.add_route('accept-friend', '/me/accept-friend') - config.add_route('json-friends', '/me/friends.json') - - config.add_route('tile-proxy', - '/tile/{provider}/{z:\\d+}/{x:\\d+}/{y:\\d+}') + config.add_static_view("static", "static", cache_max_age=3600) + config.add_route("home", "/") + config.add_route("login", "/login") + config.add_route("logout", "/logout") + config.add_route("browse", "/track/") + + config.add_route("static-page", "/page/{slug}") + + config.add_route("track-archive", "/track/archive") + + config.add_route("password-reset", "/password-reset") + config.add_route("use-token", "/token/{uuid}") + config.add_route("create-account", "/create-account") + + config.add_route("upload", "/upload") + config.add_route( + "preview", "/preview/{upload_id}.gpx", factory="fietsboek.models.Upload.factory" + ) + config.add_route( + "finish-upload", "/upload/{upload_id}", factory="fietsboek.models.Upload.factory" + ) + config.add_route( + "cancel-upload", "/cancel/{upload_id}", factory="fietsboek.models.Upload.factory" + ) + + config.add_route("details", "/track/{track_id}", factory="fietsboek.models.Track.factory") + config.add_route("edit", "/track/{track_id}/edit", factory="fietsboek.models.Track.factory") + config.add_route("gpx", "/gpx/{track_id}.gpx", factory="fietsboek.models.Track.factory") + config.add_route( + "invalidate-share", + "/track/{track_id}/invalidate-link", + factory="fietsboek.models.Track.factory", + ) + config.add_route( + "delete-track", "/track/{track_id}/delete", factory="fietsboek.models.Track.factory" + ) + config.add_route( + "add-comment", "/track/{track_id}/comment", factory="fietsboek.models.Track.factory" + ) + config.add_route( + "image", "/track/{track_id}/images/{image_name}", factory="fietsboek.models.Track.factory" + ) + + config.add_route("badge", "/badge/{badge_id}", factory="fietsboek.models.Badge.factory") + + config.add_route("admin", "/admin") + config.add_route("admin-badge-add", "/admin/add-badge") + config.add_route("admin-badge-edit", "/admin/edit-badge") + config.add_route("admin-badge-delete", "/admin/delete-badge") + + config.add_route("profile", "/me") + config.add_route("change-profile", "/me/personal-data") + config.add_route("add-friend", "/me/send-friend-request") + config.add_route("delete-friend", "/me/delete-friend") + config.add_route("accept-friend", "/me/accept-friend") + config.add_route("json-friends", "/me/friends.json") + + config.add_route("tile-proxy", "/tile/{provider}/{z:\\d+}/{x:\\d+}/{y:\\d+}") diff --git a/fietsboek/scripts/fietsctl.py b/fietsboek/scripts/fietsctl.py index e8f7b3f..bd37987 100644 --- a/fietsboek/scripts/fietsctl.py +++ b/fietsboek/scripts/fietsctl.py @@ -83,7 +83,7 @@ def cmd_userdel(env, args): print(user.email) if not args.force: query = input("Really delete this user? [y/N] ") - if query not in {'Y', 'y'}: + if query not in {"Y", "y"}: print("Aborted by user.") return EXIT_FAILURE dbsession.delete(user) @@ -103,9 +103,9 @@ def cmd_userlist(env, args): dbsession = env["request"].dbsession users = dbsession.execute(select(models.User).order_by(models.User.id)).scalars() for user in users: - tag = '[{}{}]'.format( - 'a' if user.is_admin else '-', - 'v' if user.is_verified else '-', + tag = "[{}{}]".format( + "a" if user.is_admin else "-", + "v" if user.is_verified else "-", ) print(f"{tag} {user.id} - {user.email} - {user.name}") return EXIT_OKAY @@ -146,13 +146,14 @@ def parse_args(argv): """ parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( - '-c', '--config', - dest='config_uri', - help='configuration file, e.g., development.ini', + "-c", + "--config", + dest="config_uri", + help="configuration file, e.g., development.ini", required=True, ) - subparsers = parser.add_subparsers(help='available subcommands', required=True) + subparsers = parser.add_subparsers(help="available subcommands", required=True) p_useradd = subparsers.add_parser( "useradd", @@ -160,20 +161,20 @@ def parse_args(argv): description=cmd_useradd.__doc__, ) p_useradd.add_argument( - '--email', + "--email", help="email address of the user", ) p_useradd.add_argument( - '--name', + "--name", help="name of the user", ) p_useradd.add_argument( - '--password', + "--password", help="password of the user", ) p_useradd.add_argument( - '--admin', - action='store_true', + "--admin", + action="store_true", help="make the new user an admin", ) p_useradd.set_defaults(func=cmd_useradd) @@ -184,18 +185,21 @@ def parse_args(argv): description=cmd_userdel.__doc__, ) p_userdel.add_argument( - '--force', '-f', - action='store_true', + "--force", + "-f", + action="store_true", help="override the safety check", ) group = p_userdel.add_mutually_exclusive_group(required=True) group.add_argument( - '--id', '-i', + "--id", + "-i", type=int, help="database ID of the user", ) group.add_argument( - '--email', '-e', + "--email", + "-e", help="email of the user", ) p_userdel.set_defaults(func=cmd_userdel) @@ -213,17 +217,19 @@ def parse_args(argv): description=cmd_userdel.__doc__, ) p_passwd.add_argument( - '--password', + "--password", help="password of the user", ) group = p_passwd.add_mutually_exclusive_group(required=True) group.add_argument( - '--id', '-i', + "--id", + "-i", type=int, help="database ID of the user", ) group.add_argument( - '--email', '-e', + "--email", + "-e", help="email of the user", ) p_passwd.set_defaults(func=cmd_passwd) diff --git a/fietsboek/security.py b/fietsboek/security.py index a5cafd4..84dd88a 100644 --- a/fietsboek/security.py +++ b/fietsboek/security.py @@ -9,11 +9,12 @@ from sqlalchemy import select from . import models -ADMIN_PERMISSIONS = {'admin'} +ADMIN_PERMISSIONS = {"admin"} class SecurityPolicy: """Implementation of the Pyramid security policy.""" + def __init__(self): self.helper = SessionAuthenticationHelper() @@ -39,12 +40,12 @@ class SecurityPolicy: # If the context is not there, we are on a static site that does not use ACL if isinstance(context, DefaultRootFactory): if identity is None: - return Denied('User is not signed in.') + return Denied("User is not signed in.") if permission not in ADMIN_PERMISSIONS: - return Allowed('User is signed in.') + return Allowed("User is signed in.") if identity.is_admin: - return Allowed('User is an administrator.') - return Denied('User is not an administrator.') + return Allowed("User is an administrator.") + return Denied("User is not an administrator.") # If the context is there, use ACL principals = [Everyone] @@ -52,7 +53,7 @@ class SecurityPolicy: principals.append(Authenticated) principals.extend(identity.principals()) - if 'secret' in request.GET: + if "secret" in request.GET: principals.append(f'secret:{request.GET["secret"]}') return ACLHelper().permits(context, principals, permission) diff --git a/fietsboek/summaries.py b/fietsboek/summaries.py index 9d4c0aa..04b74c5 100644 --- a/fietsboek/summaries.py +++ b/fietsboek/summaries.py @@ -100,6 +100,7 @@ class MonthSummary: :ivar tracks: List of tracks in this month. :vartype tracks: list[fietsboek.model.track.Track] """ + def __init__(self, month): self.month = month self.tracks = [] diff --git a/fietsboek/updater/__init__.py b/fietsboek/updater/__init__.py index a5bcf0e..348f713 100644 --- a/fietsboek/updater/__init__.py +++ b/fietsboek/updater/__init__.py @@ -5,6 +5,7 @@ import random import string import importlib.util from pathlib import Path +from typing import List # Compat for Python < 3.9 import importlib_resources @@ -81,13 +82,9 @@ class Updater: scripts = _load_update_scripts() for script in scripts: self.scripts[script.id] = script - self.forward_dependencies = { - script.id: script.previous for script in self.scripts.values() - } + self.forward_dependencies = {script.id: script.previous for script in self.scripts.values()} # Ensure that each script has an entry - self.backward_dependencies = { - script.id: [] for script in self.scripts.values() - } + self.backward_dependencies = {script.id: [] for script in self.scripts.values()} for script in self.scripts.values(): for prev_id in script.previous: self.backward_dependencies[prev_id].append(script.id) @@ -151,7 +148,7 @@ class Updater: def _make_schedule(self, wanted, dependencies): wanted = set(wanted) - queue = [] + queue: List[str] = [] while wanted: next_updates = { update @@ -233,7 +230,7 @@ class Updater: current_alembic = context.get_current_heads() LOGGER.debug("Found alembic versions: %s", current_alembic) assert len(current_alembic) == 1 - current_alembic = current_alembic[0] + current_alembic = current_alembic[0] # type: ignore loader = jinja2.DictLoader({"revision.py": TEMPLATE}) env = jinja2.Environment(loader=loader, autoescape=False) @@ -291,7 +288,8 @@ class UpdateScript: def __init__(self, source, name): self.name = name spec = importlib.util.spec_from_loader(f"{__name__}.{name}", None) - self.module = importlib.util.module_from_spec(spec) + self.module = importlib.util.module_from_spec(spec) # type: ignore + assert self.module exec(source, self.module.__dict__) # pylint: disable=exec-used def __repr__(self): diff --git a/fietsboek/updater/cli.py b/fietsboek/updater/cli.py index a6ea6c1..5c97687 100644 --- a/fietsboek/updater/cli.py +++ b/fietsboek/updater/cli.py @@ -18,7 +18,8 @@ from . import Updater # https://github.com/pallets/click/issues/295 # https://github.com/pallets/click/issues/814 config_option = click.option( - "-c", "--config", + "-c", + "--config", type=click.Path(exists=True, dir_okay=False), required=True, help="Path to the Fietsboek configuration file", @@ -44,7 +45,7 @@ def user_confirm(verb): @click.group( help=__doc__, - context_settings={'help_option_names': ['-h', '--help']}, + context_settings={"help_option_names": ["-h", "--help"]}, ) def cli(): """CLI main entry point.""" @@ -53,7 +54,8 @@ def cli(): @cli.command("update") @config_option @click.option( - "-f", "--force", + "-f", + "--force", is_flag=True, help="Skip the safety question and just run the update", ) @@ -97,7 +99,8 @@ def update(ctx, config, version, force): @cli.command("downgrade") @config_option @click.option( - "-f", "--force", + "-f", + "--force", is_flag=True, help="Skip the safety question and just run the downgrade", ) diff --git a/fietsboek/util.py b/fietsboek/util.py index 71f5d16..c0a59a5 100644 --- a/fietsboek/util.py +++ b/fietsboek/util.py @@ -4,6 +4,7 @@ import re import os import unicodedata import secrets +from typing import Optional # Compat for Python < 3.9 import importlib_resources @@ -11,20 +12,27 @@ import babel import markdown import bleach import gpxpy +import webob +import sqlalchemy from pyramid.i18n import TranslationString as _ from pyramid.httpexceptions import HTTPBadRequest +from pyramid.request import Request from markupsafe import Markup from sqlalchemy import select -ALLOWED_TAGS = (bleach.sanitizer.ALLOWED_TAGS + - # Allow headings - ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'] + - ['p'] + ['img']) +ALLOWED_TAGS = ( + bleach.sanitizer.ALLOWED_TAGS + + + # Allow headings + ["h1", "h2", "h3", "h4", "h5", "h6"] + + ["p"] + + ["img"] +) ALLOWED_ATTRIBUTES = dict(bleach.sanitizer.ALLOWED_ATTRIBUTES) -ALLOWED_ATTRIBUTES['img'] = ['alt', 'src'] +ALLOWED_ATTRIBUTES["img"] = ["alt", "src"] # Arbitrarily chosen, just make sure they are representable DEFAULT_START_TIME = datetime.datetime(1977, 5, 25, 8, 0) @@ -47,23 +55,21 @@ _windows_device_files = ( ) -def safe_markdown(md_source): +def safe_markdown(md_source: str) -> Markup: """Transform a markdown document into a safe HTML document. This uses ``markdown`` to first parse the markdown source into HTML, and then ``bleach`` to strip any disallowed HTML tags. :param md_source: The markdown source. - :type md_source: str :return: The safe HTML transformed version. - :rtype: Markup """ - html = markdown.markdown(md_source, output_format='html5') + html = markdown.markdown(md_source, output_format="html") html = bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) return Markup(html) -def fix_iso_timestamp(timestamp): +def fix_iso_timestamp(timestamp: str) -> str: """Fixes an ISO timestamp to make it parseable by :meth:`datetime.datetime.fromisoformat`. @@ -71,24 +77,21 @@ def fix_iso_timestamp(timestamp): it with '+00:00'. :param timestamp: The timestamp to fix. - :type timestamp: str :return: The fixed timestamp. - :rtype: str """ - if timestamp.endswith('Z'): - return timestamp[:-1] + '+00:00' + if timestamp.endswith("Z"): + return timestamp[:-1] + "+00:00" return timestamp -def round_timedelta_to_multiple(value, multiples): +def round_timedelta_to_multiple( + value: datetime.timedelta, multiples: datetime.timedelta +) -> datetime.timedelta: """Round the timedelta `value` to be a multiple of `multiples`. :param value: The value to be rounded. - :type value: datetime.timedelta :param multiples: The size of each multiple. - :type multiples: datetime.timedelta :return: The rounded value. - :rtype: datetime.timedelta """ lower = value.total_seconds() // multiples.total_seconds() * multiples.total_seconds() second_offset = value.total_seconds() - lower @@ -99,16 +102,14 @@ def round_timedelta_to_multiple(value, multiples): return datetime.timedelta(seconds=lower) + multiples -def guess_gpx_timezone(gpx): +def guess_gpx_timezone(gpx: gpxpy.gpx.GPX) -> datetime.tzinfo: """Guess which timezone the GPX file was recorded in. This looks at a few timestamps to see if they have timezone information attached, including some known GPX extensions. :param gpx: The parsed GPX file to analyse. - :type gpx: gpxpy.GPX :return: The timezone information. - :rtype: datetime.timezone """ time_bounds = gpx.get_time_bounds() times = [ @@ -131,12 +132,14 @@ def guess_gpx_timezone(gpx): time = times[0] local_time = None for extension in track.extensions: - if extension.tag.lower() == 'localtime': + if extension.tag.lower() == "localtime": local_time = datetime.datetime.fromisoformat( - fix_iso_timestamp(extension.text)).replace(tzinfo=None) - elif extension.tag.lower() == 'time': + fix_iso_timestamp(extension.text) + ).replace(tzinfo=None) + elif extension.tag.lower() == "time": time = datetime.datetime.fromisoformat( - fix_iso_timestamp(extension.text)).replace(tzinfo=None) + fix_iso_timestamp(extension.text) + ).replace(tzinfo=None) if local_time is not None: # We found a pair that we can use! offset = local_time - time @@ -152,7 +155,7 @@ def guess_gpx_timezone(gpx): return datetime.timezone.utc -def tour_metadata(gpx_data): +def tour_metadata(gpx_data: str) -> dict: """Calculate the metadata of the tour. Returns a dict with ``length``, ``uphill``, ``downhill``, ``moving_time``, @@ -160,9 +163,7 @@ def tour_metadata(gpx_data): ``end_time``. :param gpx_data: The GPX data of the tour. - :type gpx_data: str :return: A dictionary with the computed values. - :rtype: dict """ gpx = gpxpy.parse(gpx_data) timezone = guess_gpx_timezone(gpx) @@ -174,58 +175,56 @@ def tour_metadata(gpx_data): except ZeroDivisionError: avg_speed = 0.0 return { - 'length': gpx.length_3d(), - 'uphill': uphill, - 'downhill': downhill, - 'moving_time': moving_data.moving_time, - 'stopped_time': moving_data.stopped_time, - 'max_speed': moving_data.max_speed, - 'avg_speed': avg_speed, - 'start_time': (time_bounds.start_time or DEFAULT_START_TIME).astimezone(timezone), - 'end_time': (time_bounds.end_time or DEFAULT_END_TIME).astimezone(timezone), + "length": gpx.length_3d(), + "uphill": uphill, + "downhill": downhill, + "moving_time": moving_data.moving_time, + "stopped_time": moving_data.stopped_time, + "max_speed": moving_data.max_speed, + "avg_speed": avg_speed, + "start_time": (time_bounds.start_time or DEFAULT_START_TIME).astimezone(timezone), + "end_time": (time_bounds.end_time or DEFAULT_END_TIME).astimezone(timezone), } -def mps_to_kph(mps): +def mps_to_kph(mps: float) -> float: """Converts meters/second to kilometers/hour. :param mps: Input meters/second. - :type mps: float :return: The converted km/h value. - :rtype: float """ return mps / 1000 * 60 * 60 -def month_name(request, month): +def month_name(request: Request, month: int) -> str: """Returns the localized name for the month with the given number. :param request: The pyramid request. - :type request: pyramid.request.Request :param month: Number of the month, 1 = January. - :type month: int :return: The localized month name. - :rtype: str """ assert 1 <= month <= 12 locale = babel.Locale.parse(request.localizer.locale_name) return locale.months["stand-alone"]["wide"][month] -def random_link_secret(nbytes=20): +def random_link_secret(nbytes: int = 20) -> str: """Safely generates a secret suitable for the link share. The returned string consists of characters that are safe to use in a URL. :param nbytes: Number of random bytes to use. - :type nbytes: int :return: A randomly drawn string. - :rtype: str """ return secrets.token_urlsafe(nbytes) -def retrieve_multiple(dbsession, model, params, name): +def retrieve_multiple( + dbsession: "sqlalchemy.orm.session.Session", + model: type, + params: "webob.multidict.NestedMultiDict", + name: str, +) -> list: """Parses a reply to retrieve multiple database objects. This is usable for arrays sent by HTML forms, for example to retrieve all @@ -237,15 +236,10 @@ def retrieve_multiple(dbsession, model, params, name): :raises pyramid.httpexceptions.HTTPBadRequest: If an object could not be found. :param dbsession: The database session. - :type dbsession: sqlalchemy.orm.session.Session :param model: The model class to retrieve. - :type model: class :param params: The form parameters. - :type params: webob.multidict.NestedMultiDict :param name: Name of the parameter to look for. - :type name: str :return: A list of elements found. - :rtype: list[model] """ objects = [] for obj_id in params.getall(name): @@ -259,7 +253,7 @@ def retrieve_multiple(dbsession, model, params, name): return objects -def check_password_constraints(password, repeat_password=None): +def check_password_constraints(password: str, repeat_password: Optional[str] = None): """Verifies that the password constraints match for the given password. This is usually also verified client-side, but for people that bypass the @@ -273,9 +267,7 @@ def check_password_constraints(password, repeat_password=None): :class:`~pyramid.i18n.TranslationString` with the message of why the verification failed. :param password: The password which to verify. - :type password: str :param repeat_password: The password repeat. - :type repeat_password: str """ if repeat_password is not None: if repeat_password != password: @@ -284,7 +276,7 @@ def check_password_constraints(password, repeat_password=None): raise ValueError(_("password_constraint.length")) -def read_localized_resource(locale_name, path, raise_on_error=False): +def read_localized_resource(locale_name: str, path: str, raise_on_error: bool = False) -> str: """Reads a localized resource. Localized resources are located in the ``fietsboek/locale/**`` directory. @@ -293,13 +285,11 @@ def read_localized_resource(locale_name, path, raise_on_error=False): If the resource could not be found, a placeholder string is returned instead. :param locale_name: Name of the locale. - :type locale_name: str + :param path: Path of the resource. :param raise_on_error: Raise an error instead of returning a placeholder. - :type raise_on_error: bool :raises FileNotFoundError: If the path could not be found and ``raise_on_error`` is ``True``. :return: The text content of the resource. - :rtype: str """ locales = [locale_name] # Second chance: If the locale is a specific form of a more general @@ -308,7 +298,7 @@ def read_localized_resource(locale_name, path, raise_on_error=False): locales.append(locale_name.split("_", 1)[0]) for locale in locales: - locale_dir = importlib_resources.files('fietsboek') / 'locale' / locale + locale_dir = importlib_resources.files("fietsboek") / "locale" / locale resource_path = locale_dir / path try: return resource_path.read_text() @@ -319,7 +309,7 @@ def read_localized_resource(locale_name, path, raise_on_error=False): return f"{locale_name}:{path}" -def secure_filename(filename): +def secure_filename(filename: str) -> str: r"""Pass it a filename and it will return a secure version of it. This filename can then safely be stored on a regular file system and passed to :func:`os.path.join`. The filename returned is an ASCII only string @@ -339,9 +329,7 @@ def secure_filename(filename): generate a random filename if the function returned an empty one. :param filename: the filename to secure - :type filename: str :return: The secure filename. - :rtype: str """ # Taken from # https://github.com/pallets/werkzeug/blob/main/src/werkzeug/utils.py @@ -352,9 +340,7 @@ def secure_filename(filename): for sep in os.path.sep, os.path.altsep: if sep: filename = filename.replace(sep, " ") - filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip( - "._" - ) + filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip("._") # on nt a couple of special files are present in each folder. We # have to ensure that the target file is not such a filename. In diff --git a/fietsboek/views/account.py b/fietsboek/views/account.py index f9f48e9..b181148 100644 --- a/fietsboek/views/account.py +++ b/fietsboek/views/account.py @@ -7,8 +7,11 @@ from .. import models, util, email from ..models.user import TokenType -@view_config(route_name="create-account", renderer="fietsboek:templates/create_account.jinja2", - request_method="GET") +@view_config( + route_name="create-account", + renderer="fietsboek:templates/create_account.jinja2", + request_method="GET", +) def create_account(request): """Shows the "create account" page. @@ -23,8 +26,11 @@ def create_account(request): return {} -@view_config(route_name="create-account", renderer="fietsboek:templates/create_account.jinja2", - request_method="POST") +@view_config( + route_name="create-account", + renderer="fietsboek:templates/create_account.jinja2", + request_method="POST", +) def do_create_account(request): """Shows the "create account" page. @@ -41,17 +47,17 @@ def do_create_account(request): util.check_password_constraints(password, request.params["repeat-password"]) except ValueError as exc: request.session.flash(request.localizer.translate(exc.args[0])) - return HTTPFound(request.route_url('create-account')) + return HTTPFound(request.route_url("create-account")) name = request.params["name"] if not name: - request.session.flash(request.localizer.translate(_('flash.invalid_name'))) - return HTTPFound(request.route_url('create-account')) + request.session.flash(request.localizer.translate(_("flash.invalid_name"))) + return HTTPFound(request.route_url("create-account")) email_addr = request.params["email"] if not email_addr: - request.session.flash(request.localizer.translate(_('flash.invalid_email'))) - return HTTPFound(request.route_url('create-account')) + request.session.flash(request.localizer.translate(_("flash.invalid_email"))) + return HTTPFound(request.route_url("create-account")) user = models.User(name=name, email=email_addr) user.set_password(password) @@ -63,10 +69,13 @@ def do_create_account(request): message = email.prepare_message( request.config.email_from, user.email, - request.localizer.translate(_('email.verify_mail.subject')), + request.localizer.translate(_("email.verify_mail.subject")), + ) + message.set_content( + request.localizer.translate(_("email.verify.text")).format( + request.route_url("use-token", uuid=token.uuid) + ) ) - message.set_content(request.localizer.translate(_('email.verify.text')) - .format(request.route_url('use-token', uuid=token.uuid))) email.send_message( request.config.email_smtp_url, request.config.email_username, @@ -75,4 +84,4 @@ def do_create_account(request): ) request.session.flash(request.localizer.translate(_("flash.a_confirmation_link_has_been_sent"))) - return HTTPFound(request.route_url('login')) + return HTTPFound(request.route_url("login")) diff --git a/fietsboek/views/admin.py b/fietsboek/views/admin.py index 7ad6372..454d1e1 100644 --- a/fietsboek/views/admin.py +++ b/fietsboek/views/admin.py @@ -8,8 +8,12 @@ from sqlalchemy import select from .. import models -@view_config(route_name='admin', renderer='fietsboek:templates/admin.jinja2', - request_method="GET", permission="admin") +@view_config( + route_name="admin", + renderer="fietsboek:templates/admin.jinja2", + request_method="GET", + permission="admin", +) def admin(request): """Renders the main admin overview. @@ -20,11 +24,11 @@ def admin(request): """ badges = request.dbsession.execute(select(models.Badge)).scalars() return { - 'badges': badges, + "badges": badges, } -@view_config(route_name='admin-badge-add', permission="admin", request_method="POST") +@view_config(route_name="admin-badge-add", permission="admin", request_method="POST") def do_badge_add(request): """Adds a badge. @@ -36,17 +40,17 @@ def do_badge_add(request): :rtype: pyramid.response.Response """ - image = request.params['badge-image'].file.read() - title = request.params['badge-title'] + image = request.params["badge-image"].file.read() + title = request.params["badge-title"] badge = models.Badge(title=title, image=image) request.dbsession.add(badge) request.session.flash(request.localizer.translate(_("flash.badge_added"))) - return HTTPFound(request.route_url('admin')) + return HTTPFound(request.route_url("admin")) -@view_config(route_name='admin-badge-edit', permission="admin", request_method="POST") +@view_config(route_name="admin-badge-edit", permission="admin", request_method="POST") def do_badge_edit(request): """Modifies an already existing badge. @@ -58,19 +62,19 @@ def do_badge_edit(request): :rtype: pyramid.response.Response """ badge = request.dbsession.execute( - select(models.Badge).filter_by(id=request.params["badge-edit-id"]) - ).scalar_one() + select(models.Badge).filter_by(id=request.params["badge-edit-id"]) + ).scalar_one() try: - badge.image = request.params['badge-image'].file.read() + badge.image = request.params["badge-image"].file.read() except AttributeError: pass - badge.title = request.params['badge-title'] + badge.title = request.params["badge-title"] request.session.flash(request.localizer.translate(_("flash.badge_modified"))) - return HTTPFound(request.route_url('admin')) + return HTTPFound(request.route_url("admin")) -@view_config(route_name='admin-badge-delete', permission="admin", request_method="POST") +@view_config(route_name="admin-badge-delete", permission="admin", request_method="POST") def do_badge_delete(request): """Removes a badge. @@ -82,9 +86,9 @@ def do_badge_delete(request): :rtype: pyramid.response.Response """ badge = request.dbsession.execute( - select(models.Badge).filter_by(id=request.params["badge-delete-id"]) - ).scalar_one() + select(models.Badge).filter_by(id=request.params["badge-delete-id"]) + ).scalar_one() request.dbsession.delete(badge) request.session.flash(request.localizer.translate(_("flash.badge_deleted"))) - return HTTPFound(request.route_url('admin')) + return HTTPFound(request.route_url("admin")) diff --git a/fietsboek/views/browse.py b/fietsboek/views/browse.py index c01d4f6..018cb6e 100644 --- a/fietsboek/views/browse.py +++ b/fietsboek/views/browse.py @@ -1,6 +1,7 @@ """Views for browsing all tracks.""" import datetime from io import RawIOBase +from typing import List from zipfile import ZipFile, ZIP_DEFLATED from pyramid.view import view_config @@ -39,21 +40,21 @@ def _get_int(request, name): try: return int(request.params.get(name)) except ValueError as exc: - raise HTTPBadRequest(f'Invalid integer in {name!r}') from exc + raise HTTPBadRequest(f"Invalid integer in {name!r}") from exc def _get_date(request, name): try: return datetime.date.fromisoformat(request.params.get(name)) except ValueError as exc: - raise HTTPBadRequest(f'Invalid date in {name!r}') from exc + raise HTTPBadRequest(f"Invalid date in {name!r}") from exc def _get_enum(enum, value): try: return enum[value] except KeyError as exc: - raise HTTPBadRequest(f'Invalid enum value {value!r}') from exc + raise HTTPBadRequest(f"Invalid enum value {value!r}") from exc class Filter: @@ -123,10 +124,12 @@ class TagFilter(Filter): def compile(self, query, track, track_cache): lower_tags = [tag.lower() for tag in self.tags] for tag in lower_tags: - exists_query = (select(models.Tag) - .where(models.Tag.track_id == track.id) - .where(func.lower(models.Tag.tag) == tag) - .exists()) + exists_query = ( + select(models.Tag) + .where(models.Tag.track_id == track.id) + .where(func.lower(models.Tag.tag) == tag) + .exists() + ) query = query.where(exists_query) return query @@ -146,15 +149,19 @@ class PersonFilter(Filter): lower_names = [name.lower() for name in self.names] for name in lower_names: tpa = models.track.track_people_assoc - exists_query = (select(tpa) - .join(models.User, tpa.c.user_id == models.User.id) - .where(tpa.c.track_id == track.id) - .where(func.lower(models.User.name) == name) - .exists()) - is_owner = (select(models.User.id) - .where(models.User.id == track.owner_id) - .where(func.lower(models.User.name) == name) - .exists()) + exists_query = ( + select(tpa) + .join(models.User, tpa.c.user_id == models.User.id) + .where(tpa.c.track_id == track.id) + .where(func.lower(models.User.name) == name) + .exists() + ) + is_owner = ( + select(models.User.id) + .where(models.User.id == track.owner_id) + .where(func.lower(models.User.name) == name) + .exists() + ) query = query.where(or_(exists_query, is_owner)) return query @@ -172,13 +179,17 @@ class UserTaggedFilter(Filter): def compile(self, query, track, track_cache): tpa = models.track.track_people_assoc - return query.where(or_( - track.owner == self.user, - (select(tpa) - .where(tpa.c.track_id == track.id) - .where(tpa.c.user_id == self.user.id) - .exists()), - )) + return query.where( + or_( + track.owner == self.user, + ( + select(tpa) + .where(tpa.c.track_id == track.id) + .where(tpa.c.user_id == self.user.id) + .exists() + ), + ) + ) def apply(self, track): return track.owner == self.user or self.user in track.tagged_people @@ -212,84 +223,105 @@ class FilterCollection(Filter): :rtype: FilterCollection """ # pylint: disable=singleton-comparison - filters = [] - if request.params.get('search-terms'): - term = request.params.get('search-terms').strip() + filters: List[Filter] = [] + if request.params.get("search-terms"): + term = request.params.get("search-terms").strip() filters.append(SearchFilter([term])) - if request.params.get('tags'): - tags = [tag.strip() for tag in request.params.get('tags').split('&&')] + if request.params.get("tags"): + tags = [tag.strip() for tag in request.params.get("tags").split("&&")] tags = list(filter(bool, tags)) filters.append(TagFilter(tags)) - if request.params.get('tagged-person'): - names = [name.strip() for name in request.params.get('tagged-person').split('&&')] + if request.params.get("tagged-person"): + names = [name.strip() for name in request.params.get("tagged-person").split("&&")] names = list(filter(bool, names)) filters.append(PersonFilter(names)) - if request.params.get('min-length'): + if request.params.get("min-length"): # Value is given in km, so convert it to m min_length = _get_int(request, "min-length") * 1000 - filters.append(LambdaFilter( - lambda query, track, track_cache: - query.where(or_(track_cache.length >= min_length, - track_cache.length == None)), # noqa: E711 - lambda track: track.length >= min_length, - )) - - if request.params.get('max-length'): + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where( + or_( + track_cache.length >= min_length, + track_cache.length == None, # noqa: E711 + ) + ), + lambda track: track.length >= min_length, + ) + ) + + if request.params.get("max-length"): max_length = _get_int(request, "max-length") * 1000 - filters.append(LambdaFilter( - lambda query, track, track_cache: - query.where(or_(track_cache.length <= max_length, - track_cache.length == None)), # noqa: E711 - lambda track: track.length <= max_length, - )) - - if request.params.get('min-date'): + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where( + or_( + track_cache.length <= max_length, + track_cache.length == None, # noqa: E711 + ) + ), + lambda track: track.length <= max_length, + ) + ) + + if request.params.get("min-date"): min_date = _get_date(request, "min-date") min_date = datetime.datetime.combine(min_date, datetime.time.min) - filters.append(LambdaFilter( - lambda query, track, track_cache: query.where(track.date_raw >= min_date), - lambda track: track.date.replace(tzinfo=None) >= min_date, - )) - - if request.params.get('max-date'): + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where(track.date_raw >= min_date), + lambda track: track.date.replace(tzinfo=None) >= min_date, + ) + ) + + if request.params.get("max-date"): max_date = _get_date(request, "max-date") max_date = datetime.datetime.combine(max_date, datetime.time.max) - filters.append(LambdaFilter( - lambda query, track, track_cache: query.where(track.date_raw <= max_date), - lambda track: track.date.replace(tzinfo=None) <= max_date, - )) - - if "mine" in request.params.getall('show-only[]'): - filters.append(LambdaFilter( - lambda query, track, track_cache: query.where(track.owner == request.identity), - lambda track: track.owner == request.identity, - )) - - if "friends" in request.params.getall('show-only[]') and request.identity: + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where(track.date_raw <= max_date), + lambda track: track.date.replace(tzinfo=None) <= max_date, + ) + ) + + if "mine" in request.params.getall("show-only[]"): + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where(track.owner == request.identity), + lambda track: track.owner == request.identity, + ) + ) + + if "friends" in request.params.getall("show-only[]") and request.identity: friend_ids = {friend.id for friend in request.identity.get_friends()} - filters.append(LambdaFilter( - lambda query, track, track_cache: query.where(track.owner_id.in_(friend_ids)), - lambda track: track.owner in request.identity.get_friends(), - )) - - if "user-tagged" in request.params.getall('show-only[]') and request.identity: + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where(track.owner_id.in_(friend_ids)), + lambda track: track.owner in request.identity.get_friends(), + ) + ) + + if "user-tagged" in request.params.getall("show-only[]") and request.identity: filters.append(UserTaggedFilter(request.identity)) - if 'type[]' in request.params: - types = {_get_enum(TrackType, value) for value in request.params.getall('type[]')} - filters.append(LambdaFilter( - lambda query, track, track_cache: query.where(track.type.in_(types)), - lambda track: track.type in types, - )) + if "type[]" in request.params: + types = {_get_enum(TrackType, value) for value in request.params.getall("type[]")} + filters.append( + LambdaFilter( + lambda query, track, track_cache: query.where(track.type.in_(types)), + lambda track: track.type in types, + ) + ) return cls(filters) -@view_config(route_name="browse", renderer="fietsboek:templates/browse.jinja2", - request_method="GET") +@view_config( + route_name="browse", renderer="fietsboek:templates/browse.jinja2", request_method="GET" +) def browse(request): """Returns the page that lets a user browse all visible tracks. @@ -309,9 +341,9 @@ def browse(request): tracks = request.dbsession.execute(query).scalars() tracks = [track for track in tracks if filters.apply(track)] return { - 'tracks': tracks, - 'mps_to_kph': util.mps_to_kph, - 'used_filters': bool(filters), + "tracks": tracks, + "mps_to_kph": util.mps_to_kph, + "used_filters": bool(filters), } @@ -325,11 +357,14 @@ def archive(request): :rtype: pyramid.response.Response """ # We need to create a separate session, otherwise we will get detached instances - session = request.registry['dbsession_factory']() + session = request.registry["dbsession_factory"]() track_ids = set(map(int, request.params.getall("track_id[]"))) - tracks = session.execute( - select(models.Track).filter(models.Track.id.in_(track_ids))).scalars().fetchall() + tracks = ( + session.execute(select(models.Track).filter(models.Track.id.in_(track_ids))) + .scalars() + .fetchall() + ) if len(tracks) != len(track_ids): return HTTPNotFound() @@ -341,7 +376,7 @@ def archive(request): def generate(): try: stream = Stream() - with ZipFile(stream, "w", ZIP_DEFLATED) as zipfile: + with ZipFile(stream, "w", ZIP_DEFLATED) as zipfile: # type: ignore for track in tracks: zipfile.writestr(f"track_{track.id}.gpx", track.gpx_data) yield stream.readall() diff --git a/fietsboek/views/default.py b/fietsboek/views/default.py index f4aaa8f..0a2d7e2 100644 --- a/fietsboek/views/default.py +++ b/fietsboek/views/default.py @@ -16,7 +16,7 @@ from ..models.user import PasswordMismatch, TokenType from ..models.track import TrackType -@view_config(route_name='home', renderer='fietsboek:templates/home.jinja2') +@view_config(route_name="home", renderer="fietsboek:templates/home.jinja2") def home(request): """Renders the home page. @@ -27,13 +27,13 @@ def home(request): """ if not request.identity: # See if the admin set a custom home page - page = request.pages.find('/', request) + page = request.pages.find("/", request) if page is not None: return render_to_response( - 'fietsboek:templates/static-page.jinja2', + "fietsboek:templates/static-page.jinja2", { - 'title': page.title, - 'content': Markup(page.content), + "title": page.title, + "content": Markup(page.content), }, request, ) @@ -42,7 +42,7 @@ def home(request): locale = request.localizer.locale_name content = util.read_localized_resource(locale, "html/home.html") return { - 'home_content': content, + "home_content": content, } query = request.identity.all_tracks_query() @@ -55,12 +55,12 @@ def home(request): summary.add(track) return { - 'summary': summary, - 'month_name': util.month_name, + "summary": summary, + "month_name": util.month_name, } -@view_config(route_name='static-page', renderer='fietsboek:templates/static-page.jinja2') +@view_config(route_name="static-page", renderer="fietsboek:templates/static-page.jinja2") def static_page(request): """Renders a static page. @@ -69,17 +69,17 @@ def static_page(request): :return: The HTTP response. :rtype: pyramid.response.Response """ - page = request.pages.find(request.matchdict['slug'], request) + page = request.pages.find(request.matchdict["slug"], request) if page is None: return HTTPNotFound() return { - 'title': page.title, - 'content': Markup(page.content), + "title": page.title, + "content": Markup(page.content), } -@view_config(route_name='login', renderer='fietsboek:templates/login.jinja2', request_method='GET') +@view_config(route_name="login", renderer="fietsboek:templates/login.jinja2", request_method="GET") def login(request): """Renders the login page. @@ -92,7 +92,7 @@ def login(request): return {} -@view_config(route_name='login', request_method='POST') +@view_config(route_name="login", request_method="POST") def do_login(request): """Endpoint for the login form. @@ -101,24 +101,24 @@ def do_login(request): :return: The HTTP response. :rtype: pyramid.response.Response """ - query = models.User.query_by_email(request.params['email']) + query = models.User.query_by_email(request.params["email"]) try: user = request.dbsession.execute(query).scalar_one() - user.check_password(request.params['password']) + user.check_password(request.params["password"]) except (NoResultFound, PasswordMismatch): - request.session.flash(request.localizer.translate(_('flash.invalid_credentials'))) - return HTTPFound(request.route_url('login')) + request.session.flash(request.localizer.translate(_("flash.invalid_credentials"))) + return HTTPFound(request.route_url("login")) if not user.is_verified: - request.session.flash(request.localizer.translate(_('flash.account_not_verified'))) - return HTTPFound(request.route_url('login')) + request.session.flash(request.localizer.translate(_("flash.account_not_verified"))) + return HTTPFound(request.route_url("login")) - request.session.flash(request.localizer.translate(_('flash.logged_in'))) + request.session.flash(request.localizer.translate(_("flash.logged_in"))) headers = remember(request, str(user.id)) - return HTTPFound('/', headers=headers) + return HTTPFound("/", headers=headers) -@view_config(route_name='logout') +@view_config(route_name="logout") def logout(request): """Logs the user out. @@ -127,13 +127,16 @@ def logout(request): :return: The HTTP response. :rtype: pyramid.response.Response """ - request.session.flash(request.localizer.translate(_('flash.logged_out'))) + request.session.flash(request.localizer.translate(_("flash.logged_out"))) headers = forget(request) - return HTTPFound('/', headers=headers) + return HTTPFound("/", headers=headers) -@view_config(route_name="password-reset", request_method="GET", - renderer="fietsboek:templates/request_password.jinja2") +@view_config( + route_name="password-reset", + request_method="GET", + renderer="fietsboek:templates/request_password.jinja2", +) def password_reset(request): """Form to request a new password. @@ -155,11 +158,11 @@ def do_password_reset(request): :return: The HTTP response. :rtype: pyramid.response.Response """ - query = models.User.query_by_email(request.params['email']) + query = models.User.query_by_email(request.params["email"]) user = request.dbsession.execute(query).scalar_one_or_none() if user is None: request.session.flash(request.localizer.translate(_("flash.reset_invalid_email"))) - return HTTPFound(request.route_url('password-reset')) + return HTTPFound(request.route_url("password-reset")) token = models.Token.generate(user, TokenType.RESET_PASSWORD) request.dbsession.add(token) @@ -171,9 +174,9 @@ def do_password_reset(request): request.localizer.translate(_("page.password_reset.email.subject")), ) mail.set_content( - request.localizer - .translate(_("page.password_reset.email.body")) - .format(request.route_url('use-token', uuid=token.uuid)) + request.localizer.translate(_("page.password_reset.email.body")).format( + request.route_url("use-token", uuid=token.uuid) + ) ) email.send_message( request.config.email_smtp_url, @@ -182,7 +185,7 @@ def do_password_reset(request): mail, ) - return HTTPFound(request.route_url('password-reset')) + return HTTPFound(request.route_url("password-reset")) @view_config(route_name="use-token") @@ -196,25 +199,25 @@ def use_token(request): :rtype: pyramid.response.Response """ token = request.dbsession.execute( - select(models.Token).filter_by(uuid=request.matchdict['uuid']) - ).scalar_one_or_none() + select(models.Token).filter_by(uuid=request.matchdict["uuid"]) + ).scalar_one_or_none() if token is None: return HTTPNotFound() if token.token_type == TokenType.VERIFY_EMAIL: token.user.is_verified = True request.dbsession.delete(token) - request.session.flash(request.localizer.translate(_('flash.email_verified'))) - return HTTPFound(request.route_url('login')) - if request.method == 'GET' and token.token_type == TokenType.RESET_PASSWORD: - return render_to_response('fietsboek:templates/password_reset.jinja2', {}, request) - if request.method == 'POST' and token.token_type == TokenType.RESET_PASSWORD: + request.session.flash(request.localizer.translate(_("flash.email_verified"))) + return HTTPFound(request.route_url("login")) + if request.method == "GET" and token.token_type == TokenType.RESET_PASSWORD: + return render_to_response("fietsboek:templates/password_reset.jinja2", {}, request) + if request.method == "POST" and token.token_type == TokenType.RESET_PASSWORD: password = request.params["password"] try: util.check_password_constraints(password, request.params["repeat-password"]) except ValueError as exc: request.session.flash(request.localizer.translate(exc.args[0])) - return HTTPFound(request.route_url('use-token', uuid=token.uuid)) + return HTTPFound(request.route_url("use-token", uuid=token.uuid)) token.user.set_password(password) request.dbsession.delete(token) diff --git a/fietsboek/views/detail.py b/fietsboek/views/detail.py index a135916..e0ca113 100644 --- a/fietsboek/views/detail.py +++ b/fietsboek/views/detail.py @@ -11,8 +11,9 @@ from sqlalchemy import select from .. import models, util -@view_config(route_name='details', renderer='fietsboek:templates/details.jinja2', - permission='track.view') +@view_config( + route_name="details", renderer="fietsboek:templates/details.jinja2", permission="track.view" +) def details(request): """Renders the detail page for a given track. @@ -23,13 +24,13 @@ def details(request): """ track = request.context description = util.safe_markdown(track.description) - show_edit_link = (track.owner == request.identity) + show_edit_link = track.owner == request.identity images = [] for image_name in request.data_manager.images(track.id): query = [] - if 'secret' in request.GET: - query.append(('secret', request.GET['secret'])) + if "secret" in request.GET: + query.append(("secret", request.GET["secret"])) img_src = request.route_url("image", track_id=track.id, image_name=image_name, _query=query) query = select(models.ImageMetadata).filter_by(track=track, image_name=image_name) image_metadata = request.dbsession.execute(query).scalar_one_or_none() @@ -39,17 +40,17 @@ def details(request): images.append((img_src, "")) return { - 'track': track, - 'show_organic': track.show_organic_data(), - 'show_edit_link': show_edit_link, - 'mps_to_kph': util.mps_to_kph, - 'comment_md_to_html': util.safe_markdown, - 'description': description, - 'images': images, + "track": track, + "show_organic": track.show_organic_data(), + "show_edit_link": show_edit_link, + "mps_to_kph": util.mps_to_kph, + "comment_md_to_html": util.safe_markdown, + "description": description, + "images": images, } -@view_config(route_name='gpx', http_cache=3600, permission='track.view') +@view_config(route_name="gpx", http_cache=3600, permission="track.view") def gpx(request): """Returns the actual GPX data from the stored track. @@ -62,7 +63,7 @@ def gpx(request): # We can be nice to the client if they support it, and deliver the gzipped # data straight. This saves decompression time on the server and saves a # lot of bandwidth. - if 'gzip' in request.accept_encoding: + if "gzip" in request.accept_encoding: response = Response(track.gpx, content_type="application/gpx+xml", content_encoding="gzip") else: response = Response(track.gpx_data, content_type="application/gpx+xml") @@ -70,7 +71,7 @@ def gpx(request): return response -@view_config(route_name='invalidate-share', request_method='POST', permission='track.unshare') +@view_config(route_name="invalidate-share", request_method="POST", permission="track.unshare") def invalidate_share(request): """Endpoint to invalidate the share link. @@ -81,10 +82,10 @@ def invalidate_share(request): """ track = request.context track.link_secret = util.random_link_secret() - return HTTPFound(request.route_url('details', track_id=track.id)) + return HTTPFound(request.route_url("details", track_id=track.id)) -@view_config(route_name='delete-track', request_method='POST', permission='track.delete') +@view_config(route_name="delete-track", request_method="POST", permission="track.delete") def delete_track(request): """Endpoint to delete the track. @@ -98,10 +99,10 @@ def delete_track(request): request.dbsession.delete(track) request.data_manager.purge(track_id) request.session.flash(request.localizer.translate(_("flash.track_deleted"))) - return HTTPFound(request.route_url('home')) + return HTTPFound(request.route_url("home")) -@view_config(route_name='badge', http_cache=3600) +@view_config(route_name="badge", http_cache=3600) def badge(request): """Returns the image data associated with a badge. @@ -115,7 +116,7 @@ def badge(request): return response -@view_config(route_name='image', http_cache=3600, permission='track.view') +@view_config(route_name="image", http_cache=3600, permission="track.view") def image(request): """Returns the image data for the requested image. @@ -129,7 +130,7 @@ def image(request): """ track = request.context try: - image_path = request.data_manager.image_path(track.id, request.matchdict['image_name']) + image_path = request.data_manager.image_path(track.id, request.matchdict["image_name"]) except FileNotFoundError: return HTTPNotFound() else: diff --git a/fietsboek/views/edit.py b/fietsboek/views/edit.py index ff71282..003f7c7 100644 --- a/fietsboek/views/edit.py +++ b/fietsboek/views/edit.py @@ -18,8 +18,12 @@ ImageEmbed = namedtuple("ImageEmbed", "name url description") LOGGER = logging.getLogger(__name__) -@view_config(route_name='edit', renderer='fietsboek:templates/edit.jinja2', - permission='track.edit', request_method='GET') +@view_config( + route_name="edit", + renderer="fietsboek:templates/edit.jinja2", + permission="track.edit", + request_method="GET", +) def edit(request): """Renders the edit form. @@ -35,8 +39,8 @@ def edit(request): images = [] for image in request.data_manager.images(track.id): metadata = request.dbsession.execute( - select(models.ImageMetadata).filter_by(track=track, image_name=image) - ).scalar_one_or_none() + select(models.ImageMetadata).filter_by(track=track, image_name=image) + ).scalar_one_or_none() if metadata: description = metadata.description else: @@ -45,13 +49,13 @@ def edit(request): images.append(ImageEmbed(image, img_src, description)) return { - 'track': track, - 'badges': badges, - 'images': images, + "track": track, + "badges": badges, + "images": images, } -@view_config(route_name='edit', permission='track.edit', request_method='POST') +@view_config(route_name="edit", permission="track.edit", request_method="POST") def do_edit(request): """Endpoint for saving the edited data. @@ -65,8 +69,9 @@ def do_edit(request): user_friends = request.identity.get_friends() badges = util.retrieve_multiple(request.dbsession, models.Badge, request.params, "badge[]") - tagged_people = util.retrieve_multiple(request.dbsession, models.User, - request.params, "tagged-friend[]") + tagged_people = util.retrieve_multiple( + request.dbsession, models.User, request.params, "tagged-friend[]" + ) if any(user not in track.tagged_people and user not in user_friends for user in tagged_people): return HTTPBadRequest() @@ -86,7 +91,7 @@ def do_edit(request): edit_images(request, request.context) - return HTTPFound(request.route_url('details', track_id=track.id)) + return HTTPFound(request.route_url("details", track_id=track.id)) def edit_images(request, track): @@ -105,8 +110,8 @@ def edit_images(request, track): for image in request.params.getall("delete-image[]"): request.data_manager.delete_image(track.id, image) image_meta = request.dbsession.execute( - select(models.ImageMetadata).filter_by(track_id=track.id, image_name=image) - ).scalar_one_or_none() + select(models.ImageMetadata).filter_by(track_id=track.id, image_name=image) + ).scalar_one_or_none() LOGGER.debug("Deleted image %s %s (metadata: %s)", track.id, image, image_meta) if image_meta: request.dbsession.delete(image_meta) diff --git a/fietsboek/views/notfound.py b/fietsboek/views/notfound.py index f7117fe..ac24008 100644 --- a/fietsboek/views/notfound.py +++ b/fietsboek/views/notfound.py @@ -2,7 +2,7 @@ from pyramid.view import notfound_view_config -@notfound_view_config(renderer='fietsboek:templates/404.jinja2') +@notfound_view_config(renderer="fietsboek:templates/404.jinja2") def notfound_view(request): """Renders the 404 response. diff --git a/fietsboek/views/profile.py b/fietsboek/views/profile.py index f4acd3d..7a929aa 100644 --- a/fietsboek/views/profile.py +++ b/fietsboek/views/profile.py @@ -10,8 +10,12 @@ from sqlalchemy import select from .. import models, util -@view_config(route_name='profile', renderer='fietsboek:templates/profile.jinja2', - permission='user', request_method="GET") +@view_config( + route_name="profile", + renderer="fietsboek:templates/profile.jinja2", + permission="user", + request_method="GET", +) def profile(request): """Provides the profile overview. @@ -22,15 +26,15 @@ def profile(request): """ coming_requests = request.dbsession.execute( - select(models.FriendRequest).filter_by(recipient_id=request.identity.id) - ).scalars() + select(models.FriendRequest).filter_by(recipient_id=request.identity.id) + ).scalars() going_requests = request.dbsession.execute( - select(models.FriendRequest).filter_by(sender_id=request.identity.id) - ).scalars() + select(models.FriendRequest).filter_by(sender_id=request.identity.id) + ).scalars() return { - 'user': request.identity, - 'outgoing_friend_requests': going_requests, - 'incoming_friend_requests': coming_requests, + "user": request.identity, + "outgoing_friend_requests": going_requests, + "incoming_friend_requests": coming_requests, } @@ -49,16 +53,16 @@ def do_change_profile(request): util.check_password_constraints(password, request.params["repeat-password"]) except ValueError as exc: request.session.flash(request.localizer.translate(exc.args[0])) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) request.identity.set_password(request.params["password"]) name = request.params["name"] if request.identity.name != name: request.identity.name = name request.session.flash(request.localizer.translate(_("flash.personal_data_updated"))) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) -@view_config(route_name='add-friend', permission='user', request_method='POST') +@view_config(route_name="add-friend", permission="user", request_method="POST") def do_add_friend(request): """Sends a friend request. @@ -69,18 +73,17 @@ def do_add_friend(request): :return: The HTTP response. :rtype: pyramid.response.Response """ - email = request.params['friend-email'] - candidate = (request.dbsession - .execute(models.User.query_by_email(email)) - .scalar_one_or_none()) + email = request.params["friend-email"] + candidate = request.dbsession.execute(models.User.query_by_email(email)).scalar_one_or_none() if candidate is None: request.session.flash(request.localizer.translate(_("flash.friend_not_found"))) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) - if (candidate in request.identity.get_friends() - or candidate in [x.recipient for x in request.identity.outgoing_requests]): + if candidate in request.identity.get_friends() or candidate in [ + x.recipient for x in request.identity.outgoing_requests + ]: request.session.flash(request.localizer.translate(_("flash.friend_already_exists"))) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) for incoming in request.identity.incoming_requests: if incoming.sender == candidate: @@ -88,7 +91,7 @@ def do_add_friend(request): request.identity.add_friend(candidate) request.dbsession.delete(incoming) request.session.flash(request.localizer.translate(_("flash.friend_added"))) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) # Nothing helped, so we send the friend request friend_req = models.FriendRequest( @@ -98,10 +101,10 @@ def do_add_friend(request): ) request.dbsession.add(friend_req) request.session.flash(request.localizer.translate(_("flash.friend_request_sent"))) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) -@view_config(route_name='delete-friend', permission='user', request_method='POST') +@view_config(route_name="delete-friend", permission="user", request_method="POST") def do_delete_friend(request): """Deletes a friend. @@ -113,14 +116,14 @@ def do_delete_friend(request): :rtype: pyramid.response.Response """ friend = request.dbsession.execute( - select(models.User).filter_by(id=request.params["friend-id"]) - ).scalar_one_or_none() + select(models.User).filter_by(id=request.params["friend-id"]) + ).scalar_one_or_none() if friend: request.identity.remove_friend(friend) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) -@view_config(route_name='accept-friend', permission='user', request_method='POST') +@view_config(route_name="accept-friend", permission="user", request_method="POST") def do_accept_friend(request): """Accepts a friend request. @@ -132,8 +135,8 @@ def do_accept_friend(request): :rtype: pyramid.response.Response """ friend_request = request.dbsession.execute( - select(models.FriendRequest).filter_by(id=request.params["request-id"]) - ).scalar_one_or_none() + select(models.FriendRequest).filter_by(id=request.params["request-id"]) + ).scalar_one_or_none() if friend_request is None: return HTTPNotFound() if friend_request.recipient != request.identity: @@ -141,10 +144,10 @@ def do_accept_friend(request): friend_request.sender.add_friend(friend_request.recipient) request.dbsession.delete(friend_request) - return HTTPFound(request.route_url('profile')) + return HTTPFound(request.route_url("profile")) -@view_config(route_name='json-friends', renderer='json', permission='user') +@view_config(route_name="json-friends", renderer="json", permission="user") def json_friends(request): """Returns a JSON-ified list of the user's friends. @@ -153,7 +156,5 @@ def json_friends(request): :return: The HTTP response. :rtype: pyramid.response.Response """ - friends = [ - {'name': friend.name, 'id': friend.id} for friend in request.identity.get_friends() - ] + friends = [{"name": friend.name, "id": friend.id} for friend in request.identity.get_friends()] return friends diff --git a/fietsboek/views/tileproxy.py b/fietsboek/views/tileproxy.py index 3e2abc1..f0612dc 100644 --- a/fietsboek/views/tileproxy.py +++ b/fietsboek/views/tileproxy.py @@ -9,7 +9,7 @@ Additionally, this protects the users' IP, as only fietsboek can see it. import datetime import random import logging -from typing import NamedTuple +from typing import NamedTuple, Optional from itertools import chain from pyramid.view import view_config @@ -25,6 +25,7 @@ from ..config import LayerType, LayerAccess class TileSource(NamedTuple): """Represents a remote server that can provide tiles to us.""" + key: str """Key to indicate this source in URLs.""" name: str @@ -33,7 +34,7 @@ class TileSource(NamedTuple): """URL with placeholders.""" layer_type: LayerType """Type of this layer.""" - zoom: int + zoom: Optional[int] """Max zoom of this layer.""" access: LayerAccess """Access restrictions to use this layer.""" @@ -54,88 +55,107 @@ _jb_copy = _href("https://www.j-berkemeier.de/GPXViewer", "GPXViewer") DEFAULT_TILE_LAYERS = [ # Main base layers TileSource( - 'osm', - 'OSM', - 'https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', + "osm", + "OSM", + "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png", LayerType.BASE, 19, LayerAccess.PUBLIC, - ''.join([ - _jb_copy, ' | Map data © ', - _href("https://www.openstreetmap.org/", "OpenStreetMap"), ' and contributors ', - _href("https://creativecommons.org/licenses/by-sa/2.0/", "CC-BY-SA"), - ]), + "".join( + [ + _jb_copy, + " | Map data © ", + _href("https://www.openstreetmap.org/", "OpenStreetMap"), + " and contributors ", + _href("https://creativecommons.org/licenses/by-sa/2.0/", "CC-BY-SA"), + ] + ), ), TileSource( - 'satellite', - 'Satellit', - 'https://server.arcgisonline.com/ArcGIS/rest/services/' - 'World_Imagery/MapServer/tile/{z}/{y}/{x}', + "satellite", + "Satellit", + "https://server.arcgisonline.com/ArcGIS/rest/services/" + "World_Imagery/MapServer/tile/{z}/{y}/{x}", LayerType.BASE, 21, LayerAccess.PUBLIC, - ''.join([ - _jb_copy, ' | Map data © ', _href("https://www.esri.com", "Esri"), - ', i-cubed, USDA, USGS, AEX, GeoEye, Getmapping, Aerogrid, IGN, ', - 'IGP, UPR-EGP, and the GIS User Community', - ]), + "".join( + [ + _jb_copy, + " | Map data © ", + _href("https://www.esri.com", "Esri"), + ", i-cubed, USDA, USGS, AEX, GeoEye, Getmapping, Aerogrid, IGN, ", + "IGP, UPR-EGP, and the GIS User Community", + ] + ), ), TileSource( - 'osmde', - 'OSMDE', - 'https://{s}.tile.openstreetmap.de/tiles/osmde/{z}/{x}/{y}.png', + "osmde", + "OSMDE", + "https://{s}.tile.openstreetmap.de/tiles/osmde/{z}/{x}/{y}.png", LayerType.BASE, 19, LayerAccess.PUBLIC, - ''.join([ - _jb_copy, ' | Map data © ', - _href("https://www.openstreetmap.org/", "OpenStreetMap"), ' and contributors ', - _href("https://creativecommons.org/licenses/by-sa/2.0/", "CC-BY-SA") - ]), + "".join( + [ + _jb_copy, + " | Map data © ", + _href("https://www.openstreetmap.org/", "OpenStreetMap"), + " and contributors ", + _href("https://creativecommons.org/licenses/by-sa/2.0/", "CC-BY-SA"), + ] + ), ), TileSource( - 'opentopo', - 'Open Topo', - 'https://{s}.tile.opentopomap.org/{z}/{x}/{y}.png', + "opentopo", + "Open Topo", + "https://{s}.tile.opentopomap.org/{z}/{x}/{y}.png", LayerType.BASE, 17, LayerAccess.PUBLIC, - ''.join([ - _jb_copy, - ' | Kartendaten: © OpenStreetMap-Mitwirkende, SRTM | Kartendarstellung: © ', - _href("https://opentopomap.org/about", "OpenTopoMap"), ' (CC-BY-SA)', - ]), + "".join( + [ + _jb_copy, + " | Kartendaten: © OpenStreetMap-Mitwirkende, SRTM | Kartendarstellung: © ", + _href("https://opentopomap.org/about", "OpenTopoMap"), + " (CC-BY-SA)", + ] + ), ), TileSource( - 'topplusopen', - 'TopPlusOpen', - 'https://sgx.geodatenzentrum.de/wmts_topplus_open/tile/' - '1.0.0/web/default/WEBMERCATOR/{z}/{y}/{x}.png', + "topplusopen", + "TopPlusOpen", + "https://sgx.geodatenzentrum.de/wmts_topplus_open/tile/" + "1.0.0/web/default/WEBMERCATOR/{z}/{y}/{x}.png", LayerType.BASE, 18, LayerAccess.PUBLIC, - ''.join([ - _jb_copy, ' | Kartendaten: © ', - _href("https://www.bkg.bund.de/SharedDocs/Produktinformationen" - "/BKG/DE/P-2017/170922-TopPlus-Web-Open.html", - "Bundesamt für Kartographie und Geodäsie"), - ]), + "".join( + [ + _jb_copy, + " | Kartendaten: © ", + _href( + "https://www.bkg.bund.de/SharedDocs/Produktinformationen" + "/BKG/DE/P-2017/170922-TopPlus-Web-Open.html", + "Bundesamt für Kartographie und Geodäsie", + ), + ] + ), ), - # Overlay layers TileSource( - 'opensea', - 'OpenSea', - 'https://tiles.openseamap.org/seamark/{z}/{x}/{y}.png', + "opensea", + "OpenSea", + "https://tiles.openseamap.org/seamark/{z}/{x}/{y}.png", LayerType.OVERLAY, None, LayerAccess.PUBLIC, 'Kartendaten: © <a href="http://www.openseamap.org">OpenSeaMap</a> contributors', ), TileSource( - 'hiking', - 'Hiking', - 'https://tile.waymarkedtrails.org/hiking/{z}/{x}/{y}.png', + "hiking", + "Hiking", + "https://tile.waymarkedtrails.org/hiking/{z}/{x}/{y}.png", LayerType.OVERLAY, None, LayerAccess.PUBLIC, @@ -143,9 +163,9 @@ DEFAULT_TILE_LAYERS = [ f'({_href("https://creativecommons.org/licenses/by-sa/3.0/", "CC-BY-SA")})', ), TileSource( - 'cycling', - 'Cycling', - 'https://tile.waymarkedtrails.org/cycling/{z}/{x}/{y}.png', + "cycling", + "Cycling", + "https://tile.waymarkedtrails.org/cycling/{z}/{x}/{y}.png", LayerType.OVERLAY, None, LayerAccess.PUBLIC, @@ -167,7 +187,7 @@ PUNISHMENT_THRESHOLD = 10 """Block a provider after that many requests have timed out.""" -@view_config(route_name='tile-proxy', http_cache=3600) +@view_config(route_name="tile-proxy", http_cache=3600) def tile_proxy(request): """Requests the given tile from the proxy. @@ -179,13 +199,16 @@ def tile_proxy(request): if request.config.disable_tile_proxy: raise HTTPBadRequest("Tile proxying is disabled") - provider = request.matchdict['provider'] + provider = request.matchdict["provider"] tile_sources = {source.key: source for source in sources_for(request)} if provider not in tile_sources: raise HTTPBadRequest("Invalid provider") - x, y, z = (int(request.matchdict['x']), int(request.matchdict['y']), - int(request.matchdict['z'])) + x, y, z = ( + int(request.matchdict["x"]), + int(request.matchdict["y"]), + int(request.matchdict["z"]), + ) cache_key = f"tile:{provider}-{x}-{y}-{z}" content_type = "image/png" @@ -220,8 +243,7 @@ def tile_proxy(request): resp.raise_for_status() except requests.HTTPError as exc: LOGGER.info("Proxy request failed for %s: %s", provider, exc) - return Response(f"Failed to get tile from {provider}", - status_code=resp.status_code) + return Response(f"Failed to get tile from {provider}", status_code=resp.status_code) request.redis.set(cache_key, resp.content, ex=TTL) return Response(resp.content, content_type=resp.headers.get("Content-type", content_type)) @@ -235,9 +257,13 @@ def sources_for(request): :rtype: list[TileSource] """ return [ - source for source in chain( - (default_layer for default_layer in DEFAULT_TILE_LAYERS - if default_layer.key in request.config.default_tile_layers), + source + for source in chain( + ( + default_layer + for default_layer in DEFAULT_TILE_LAYERS + if default_layer.key in request.config.default_tile_layers + ), extract_tile_layers(request.config), ) if source.access == LayerAccess.PUBLIC or request.identity is not None @@ -263,17 +289,26 @@ def _extract_thunderforest(config): tf_api_key = config.thunderforest_key.get_secret_value() if tf_api_key: tf_access = config.thunderforest_access - tf_attribution = ' | '.join([ - _jb_copy, - _href("https://www.thunderforest.com/", "Thunderforest"), - _href("https://www.openstreetmap.org/", "OpenStreetMap"), - ]) + tf_attribution = " | ".join( + [ + _jb_copy, + _href("https://www.thunderforest.com/", "Thunderforest"), + _href("https://www.openstreetmap.org/", "OpenStreetMap"), + ] + ) for tf_map in config.thunderforest_maps: - url = (f"https://tile.thunderforest.com/{tf_map}/" - f"{{z}}/{{x}}/{{y}}.png?apikey={tf_api_key}") + url = ( + f"https://tile.thunderforest.com/{tf_map}/" + f"{{z}}/{{x}}/{{y}}.png?apikey={tf_api_key}" + ) yield TileSource( - f"tf-{tf_map}", f"TF {tf_map.title()}", url, - LayerType.BASE, 22, tf_access, tf_attribution, + f"tf-{tf_map}", + f"TF {tf_map.title()}", + url, + LayerType.BASE, + 22, + tf_access, + tf_attribution, ) @@ -287,5 +322,5 @@ def _extract_user_layers(config): layer.layer_type, layer.zoom, layer.access, - layer.attribution + layer.attribution, ) diff --git a/fietsboek/views/upload.py b/fietsboek/views/upload.py index f63f45d..d691e46 100644 --- a/fietsboek/views/upload.py +++ b/fietsboek/views/upload.py @@ -19,8 +19,12 @@ from ..models.track import Visibility, TrackType LOGGER = logging.getLogger(__name__) -@view_config(route_name='upload', renderer='fietsboek:templates/upload.jinja2', - request_method='GET', permission='upload') +@view_config( + route_name="upload", + renderer="fietsboek:templates/upload.jinja2", + request_method="GET", + permission="upload", +) def show_upload(request): """Renders the main upload form. @@ -33,7 +37,7 @@ def show_upload(request): return {} -@view_config(route_name='upload', request_method='POST', permission='upload') +@view_config(route_name="upload", request_method="POST", permission="upload") def do_upload(request): """Endpoint to store the uploaded file. @@ -47,10 +51,10 @@ def do_upload(request): :rtype: pyramid.response.Response """ try: - gpx = request.POST['gpx'].file.read() + gpx = request.POST["gpx"].file.read() except AttributeError: - request.session.flash(request.localizer.translate(_('flash.no_file_selected'))) - return HTTPFound(request.route_url('upload')) + request.session.flash(request.localizer.translate(_("flash.no_file_selected"))) + return HTTPFound(request.route_url("upload")) # Before we do anything, we check if we can parse the file. # gpxpy might throw different exceptions, so we simply catch `Exception` @@ -59,9 +63,9 @@ def do_upload(request): try: gpxpy.parse(gpx) except Exception as exc: - request.session.flash(request.localizer.translate(_('flash.invalid_file'))) + request.session.flash(request.localizer.translate(_("flash.invalid_file"))) LOGGER.info("Could not parse gpx: %s", exc) - return HTTPFound(request.route_url('upload')) + return HTTPFound(request.route_url("upload")) now = datetime.datetime.utcnow() @@ -73,10 +77,10 @@ def do_upload(request): request.dbsession.add(upload) request.dbsession.flush() - return HTTPFound(request.route_url('finish-upload', upload_id=upload.id)) + return HTTPFound(request.route_url("finish-upload", upload_id=upload.id)) -@view_config(route_name='preview', permission='upload.finish') +@view_config(route_name="preview", permission="upload.finish") def preview(request): """Allows a preview of the uploaded track by returning the GPX data of a :class:`~fietsboek.models.track.Upload` @@ -87,11 +91,15 @@ def preview(request): :rtype: pyramid.response.Response """ upload = request.context - return Response(upload.gpx_data, content_type='application/gpx+xml') + return Response(upload.gpx_data, content_type="application/gpx+xml") -@view_config(route_name='finish-upload', renderer='fietsboek:templates/finish_upload.jinja2', - request_method='GET', permission='upload.finish') +@view_config( + route_name="finish-upload", + renderer="fietsboek:templates/finish_upload.jinja2", + request_method="GET", + permission="upload.finish", +) def finish_upload(request): """Renders the form that allows the user to finish the upload. @@ -108,6 +116,7 @@ def finish_upload(request): date = gpx.time or gpx.get_time_bounds().start_time or datetime.datetime.now() date = date.astimezone(timezone) tz_offset = timezone.utcoffset(date) + tz_offset = 0 if tz_offset is None else tz_offset.total_seconds() track_name = "" for track in gpx.tracks: if track.name: @@ -115,20 +124,20 @@ def finish_upload(request): break return { - 'preview_id': upload.id, - 'upload_title': gpx.name or track_name, - 'upload_date': date, - 'upload_date_tz': int(tz_offset.total_seconds() // 60), - 'upload_visibility': Visibility.PRIVATE, - 'upload_type': TrackType.ORGANIC, - 'upload_description': gpx.description, - 'upload_tags': set(), - 'upload_tagged_people': [], - 'badges': badges, + "preview_id": upload.id, + "upload_title": gpx.name or track_name, + "upload_date": date, + "upload_date_tz": int(tz_offset // 60), + "upload_visibility": Visibility.PRIVATE, + "upload_type": TrackType.ORGANIC, + "upload_description": gpx.description, + "upload_tags": set(), + "upload_tagged_people": [], + "badges": badges, } -@view_config(route_name='finish-upload', request_method='POST', permission='upload.finish') +@view_config(route_name="finish-upload", request_method="POST", permission="upload.finish") def do_finish_upload(request): """Endpoint for the "finishing upload" form. @@ -140,8 +149,9 @@ def do_finish_upload(request): upload = request.context user_friends = request.identity.get_friends() badges = util.retrieve_multiple(request.dbsession, models.Badge, request.params, "badge[]") - tagged_people = util.retrieve_multiple(request.dbsession, models.User, - request.params, "tagged-friend[]") + tagged_people = util.retrieve_multiple( + request.dbsession, models.User, request.params, "tagged-friend[]" + ) if any(user not in user_friends for user in tagged_people): return HTTPBadRequest() @@ -178,10 +188,10 @@ def do_finish_upload(request): request.session.flash(request.localizer.translate(_("flash.upload_success"))) - return HTTPFound(request.route_url('details', track_id=track.id)) + return HTTPFound(request.route_url("details", track_id=track.id)) -@view_config(route_name='cancel-upload', permission='upload.finish', request_method="POST") +@view_config(route_name="cancel-upload", permission="upload.finish", request_method="POST") def cancel_upload(request): """Cancels the upload and clears the temporary data. @@ -193,4 +203,4 @@ def cancel_upload(request): upload = request.context request.dbsession.delete(upload) request.session.flash(request.localizer.translate(_("flash.upload_cancelled"))) - return HTTPFound(request.route_url('upload')) + return HTTPFound(request.route_url("upload")) diff --git a/pyproject.toml b/pyproject.toml index 42f8ea3..6f178fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,3 +72,7 @@ fietsupdate = "fietsboek.updater.cli:cli" [tool.poetry.plugins."paste.app_factory"] main = "fietsboek:main" + +[tool.black] +line-length = 100 +extend-exclude = '''upd_.+\.py|^/fietsboek/alembic/versions/.+''' @@ -1,11 +1,12 @@ [flake8] max-line-length = 100 exclude = fietsboek/alembic +extend-ignore = E203 per-file-ignores = fietsboek/models/__init__.py:F401 [tox] -envlist = python,pylint,pylint-tests,flake8 +envlist = python,pylint,pylint-tests,flake8,mypy,black isolated_build = true [testenv] @@ -44,3 +45,21 @@ allowlist_externals = make changedir={toxinidir}{/}doc commands = make html + +[testenv:mypy] +deps = + mypy + types-Markdown + types-bleach + types-babel + types-redis + types-requests +usedevelop = true +commands = + mypy fietsboek + +[testenv:black] +deps = black +usedevelop = true +commands = + black --check fietsboek |