use hittekaart::gpx::{self, Compression, Coordinates}; use hittekaart::renderer::{self, Renderer}; use pyo3::create_exception; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use std::error::Error; use std::ffi::OsStr; use std::fmt::Write as _; use std::os::unix::ffi::OsStrExt as _; create_exception!(hittekaart_py, HitteError, pyo3::exceptions::PyException); fn err_to_py(mut error: &dyn Error) -> PyErr { let mut text = error.to_string(); loop { match error.source() { None => break, Some(e) => error = e, } write!(&mut text, "\ncaused by: {error}").unwrap(); } HitteError::new_err(text) } #[pyclass] #[derive(Debug, Clone)] struct Track { inner: Vec, } #[pymethods] impl Track { #[staticmethod] fn from_file(path: &[u8], compression: Option<&str>) -> PyResult { let compression = match compression { None | Some("") => Compression::None, Some("gzip") => Compression::Gzip, Some("brotli") => Compression::Brotli, Some(x) => return Err(HitteError::new_err(format!("invalid compression: {x}"))), }; let track = gpx::extract_from_file(OsStr::from_bytes(path), compression) .map_err(|e| err_to_py(&e))?; Ok(Track { inner: track }) } #[staticmethod] fn from_coordinates(coordinates: Vec<(f64, f64)>) -> Track { Track { inner: coordinates .iter() .map(|(lon, lat)| Coordinates::new(*lon, *lat)) .collect(), } } fn coordinates(&self) -> Vec<(f64, f64)> { self.inner .iter() .map(|c| (c.longitude, c.latitude)) .collect() } } #[pyclass] struct Storage { inner: Box, } #[pymethods] impl Storage { #[staticmethod] #[pyo3(name = "Folder")] fn folder(path: &[u8]) -> Self { let path = OsStr::from_bytes(path); let storage = hittekaart::storage::Folder::new(path.into()); Storage { inner: Box::new(storage), } } } #[pyclass] struct HeatmapRenderer { inner: renderer::heatmap::Renderer, } #[pymethods] impl HeatmapRenderer { #[new] fn new() -> HeatmapRenderer { HeatmapRenderer { inner: renderer::heatmap::Renderer, } } } #[pyfunction] fn generate( items: &Bound<'_, PyAny>, renderer: &Bound<'_, PyAny>, storage: &Bound<'_, Storage>, ) -> PyResult<()> { let mut tracks = vec![]; for item in items.try_iter()? { let item = item?; tracks.push(item.extract::()?.inner); } if let Ok(r) = renderer.downcast::() { do_generate(tracks, &r.borrow().inner, &mut storage.borrow_mut()) } else { Err(PyTypeError::new_err("Expected a HeatmapRenderer")) } } fn do_generate( tracks: Vec>, renderer: &R, storage: &mut Storage, ) -> PyResult<()> { storage.inner.prepare().map_err(|e| err_to_py(&e))?; for zoom in 0..=19 { let counter = renderer::prepare(renderer, zoom, &tracks, || Ok(())).map_err(|e| err_to_py(&e))?; storage .inner .prepare_zoom(zoom) .map_err(|e| err_to_py(&e))?; renderer::colorize(renderer, counter, |rendered_tile| { storage .inner .store(zoom, rendered_tile.x, rendered_tile.y, &rendered_tile.data)?; Ok(()) }) .map_err(|e| err_to_py(&e))?; } storage.inner.finish().map_err(|e| err_to_py(&e))?; Ok(()) } /// A Python module implemented in Rust. #[pymodule] fn hittekaart_py(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(generate, m)?)?; m.add("HitteError", py.get_type::())?; Ok(()) }