use super::categories::Categorizable; use super::config; use super::logbag::{state_emoji, LogBag}; use std::convert::TryFrom; use std::time::{Duration, SystemTime}; use std::path::{Path, PathBuf}; use anyhow::Result; use evtclib::Log; use log::{debug, info, warn, error}; use tokio::runtime::Runtime; use tokio::fs; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use serde::{Deserialize, Serialize}; use matrix_sdk::{ config::SyncSettings, matrix_auth::MatrixSession, ruma::{ api::client::filter::FilterDefinition, events::room::message::{RoomMessageEventContent, MessageType, Relation, ReplacementMetadata}, events::{AnyMessageLikeEvent, AnyMessageLikeEventContent, AnyTimelineEvent}, OwnedEventId, UserId, MilliSecondsSinceUnixEpoch, UInt, RoomId, OwnedRoomId }, room::MessagesOptions, Client }; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MatrixUser { pub homeserver: String, pub username: String, pub password: String, pub device_id: Option, } impl From for MatrixUser { fn from(matrix: config::Matrix) -> Self { MatrixUser { homeserver: matrix.homeserver, username: matrix.username, password: matrix.password, device_id: matrix.device_id, } } } /// The data needed to re-build a client. /// Copied from the matrix-rust-sdk persist-session example. #[derive(Debug, Serialize, Deserialize)] struct ClientSession { /// The URL of the homeserver of the user. homeserver: String, /// The path of the database. db_path: PathBuf, /// The passphrase of the database. passphrase: String, } /// The full session to persist. /// Copied from the matrix-rust-sdk persist-session example. #[derive(Debug, Serialize, Deserialize)] struct FullSession { /// The data to re-build the client. client_session: ClientSession, /// The Matrix user session. user_session: MatrixSession, /// The latest sync token. /// /// It is only needed to persist it when using `Client::sync_once()` and we /// want to make our syncs faster by not receiving all the initial sync /// again. #[serde(skip_serializing_if = "Option::is_none")] sync_token: Option, } /// Maximum age of the message to still be edited. const MAX_HOURS: u64 = 5; /// Amount of messages to be loaded in one chunk. const MESSAGE_CHUNK_SIZE: u16 = 20; /// Number of message chunks that should be loaded (in total) when searching for the last message. const MESSAGE_CHUNK_COUNT: u16 = 3; /// Posts a link to the log to a Matrix room. /// /// The user identification is given in the `user` parameter. /// /// This function blocks until all API calls have been made, that is until the message has reached /// the homeserver. pub fn post_link(config: &config::Config, user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Result<()> { let rt = Runtime::new()?; let room_id = RoomId::parse(room_id)?; rt.block_on(async { // The folder containing this user's data. let data_dir = dirs::data_dir().expect("no data_dir directory found").join("ezau-matrix-store"); // The file where the session is persisted. let session_file = data_dir.join("session"); let (client, sync_token) = if session_file.exists() { restore_session(&session_file).await? } else { (login(&data_dir, &session_file, user.homeserver, user.username, user.password).await?, None) }; sync(&client, sync_token, &session_file).await?; info!("Matrix connected as {:?}", client.user_id()); let old_msg = find_message(&client, &client.user_id().unwrap(), &room_id).await?; match old_msg { None => { debug!("Creating a fresh message for matrix"); post_new(&client, room_id, log, link).await?; } Some((old_id, old_text)) => { debug!("Updating message {:?}", old_id); debug!("Updating message body {:?}", old_text); let mut logbag = insert_log(&old_text, log, link); if config.sort_logs { logbag.sort(); } let new_text = logbag.render_plain(); debug!("New message body {:?}", new_text); let new_html = logbag.render_html(); update_message(&client, room_id, old_id, &new_text, &new_html).await?; } } Ok(()) }) } /// Restore a previous session. /// Copied from the matrix-rust-sdk persist-session example. async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option)> { debug!("Previous session found in '{}'", session_file.to_string_lossy()); // The session was serialized as JSON in a file. let serialized_session = fs::read_to_string(session_file).await?; let FullSession { client_session, user_session, sync_token } = serde_json::from_str(&serialized_session)?; // Build the client with the previous settings from the session. let client = Client::builder() .homeserver_url(client_session.homeserver) .sqlite_store(client_session.db_path, Some(&client_session.passphrase)) .build() .await?; debug!("Restoring session for {}…", user_session.meta.user_id); // Restore the Matrix user session. client.restore_session(user_session).await?; Ok((client, sync_token)) } /// Login with a new device. /// Copied from the matrix-rust-sdk persist-session example. async fn login(data_dir: &Path, session_file: &Path, homeserver: String, username: String, password: String) -> anyhow::Result { info!("No previous session found, logging in…"); let (client, client_session) = build_client(data_dir, homeserver).await?; let matrix_auth = client.matrix_auth(); loop { match matrix_auth .login_username(&username, &password) .initial_device_display_name("ezau client") .await { Ok(_) => { info!("Logged in as {username}"); break; } Err(error) => { error!("Error logging in: {error}"); error!("Please try again\n"); } } } // Persist the session to reuse it later. // This is not very secure, for simplicity. If the system provides a way of // storing secrets securely, it should be used instead. // Note that we could also build the user session from the login response. let user_session = matrix_auth.session().expect("A logged-in client should have a session"); let serialized_session = serde_json::to_string(&FullSession { client_session, user_session, sync_token: None })?; fs::write(session_file, serialized_session).await?; info!("Session persisted in {}", session_file.to_string_lossy()); // After logging in, you might want to verify this session with another one, or bootstrap // cross-signing if this is your first session with encryption, or if you need to reset // cross-signing because you don't have access to your old sessions. Ok(client) } /// Build a new client. /// Copied from the matrix-rust-sdk persist-session example. async fn build_client(data_dir: &Path, homeserver: String) -> anyhow::Result<(Client, ClientSession)> { let mut rng = thread_rng(); // Generating a subfolder for the database is not mandatory, but it is useful if // you allow several clients to run at the same time. Each one must have a // separate database, which is a different folder with the SQLite store. let db_subfolder: String = (&mut rng).sample_iter(Alphanumeric).take(7).map(char::from).collect(); let db_path = data_dir.join(db_subfolder); // Generate a random passphrase. let passphrase: String = (&mut rng).sample_iter(Alphanumeric).take(32).map(char::from).collect(); // We create a loop here so the user can retry if an error happens. loop { match Client::builder() .homeserver_url(&homeserver) // We use the SQLite store, which is enabled by default. This is the crucial part to // persist the encryption setup. // Note that other store backends are available and you can even implement your own. .sqlite_store(&db_path, Some(&passphrase)) .build() .await { Ok(client) => return Ok((client, ClientSession { homeserver, db_path, passphrase })), Err(error) => match &error { matrix_sdk::ClientBuildError::AutoDiscovery(_) | matrix_sdk::ClientBuildError::Url(_) | matrix_sdk::ClientBuildError::Http(_) => { error!("Error checking the homeserver: {error}"); error!("Please try again\n"); } _ => { // Forward other errors, it's unlikely we can retry with a different outcome. return Err(error.into()); } }, } } } /// Setup the client to listen to new messages. /// Copied from the matrix-rust-sdk persist-session example. async fn sync( client: &Client, initial_sync_token: Option, session_file: &Path, ) -> anyhow::Result<()> { debug!("Launching a first sync to ignore past messages…"); // Enable room members lazy-loading, it will speed up the initial sync a lot // with accounts in lots of rooms. // See . let filter = FilterDefinition::with_lazy_loading(); let mut sync_settings = SyncSettings::default().filter(filter.into()); // We restore the sync where we left. // This is not necessary when not using `sync_once`. The other sync methods get // the sync token from the store. if let Some(sync_token) = initial_sync_token { sync_settings = sync_settings.token(sync_token); } // Let's ignore messages before the program was launched. // This is a loop in case the initial sync is longer than our timeout. The // server should cache the response and it will ultimately take less time to // receive. loop { match client.sync_once(sync_settings.clone()).await { Ok(response) => { // This is the last time we need to provide this token, the sync method after // will handle it on its own. sync_settings = sync_settings.token(response.next_batch.clone()); persist_sync_token(session_file, response.next_batch).await?; break; } Err(error) => { warn!("An error occurred during initial sync: {error}"); warn!("Trying again…"); } } } debug!("The client is ready!"); Ok(()) } /// Persist the sync token for a future session. /// Note that this is needed only when using `sync_once`. Other sync methods get /// the sync token from the store. /// Copied from the matrix-rust-sdk persist-session example. async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { let serialized_session = fs::read_to_string(session_file).await?; let mut full_session: FullSession = serde_json::from_str(&serialized_session)?; full_session.sync_token = Some(sync_token); let serialized_session = serde_json::to_string(&full_session)?; fs::write(session_file, serialized_session).await?; Ok(()) } /// Finds the right message if there is one to edit. /// /// Either returns the message ID and the old message text, or None if no suitable message was /// found. async fn find_message( client: &Client, my_id: &UserId, room_id: &OwnedRoomId, ) -> Result> { let limit = UInt::try_from(MESSAGE_CHUNK_SIZE).unwrap(); let time_limit = MilliSecondsSinceUnixEpoch::from_system_time( SystemTime::now() - Duration::from_secs(MAX_HOURS * 60 * 60), ) .expect("Our time limit is before the epoch"); let mut continue_from = String::new(); let room = client.get_room(room_id).unwrap(); for chunk_nr in 0..MESSAGE_CHUNK_COUNT { debug!( "Loading {} items (chunk {} of {})", MESSAGE_CHUNK_SIZE, chunk_nr + 1, MESSAGE_CHUNK_COUNT ); let mut options = MessagesOptions::backward().from(Some(continue_from).as_deref()); options.limit = limit; let response = room.messages(options).await?; for timeline_message in response.chunk { if let Ok(AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomMessage(msg))) = timeline_message.event.deserialize() { if msg.sender() == my_id && msg.origin_server_ts() >= time_limit { // filters out redacted messages - but apparently it doesn't? // as_original() should return only the original, if not redacted. // maybe the sync is still wrong and doesn't "apply" redactions to // room.messages? if let Some(orig_msg) = &msg.as_original() { match &orig_msg.content.relates_to { Some(relation) => if let Relation::Replacement(replacement) = relation { if let MessageType::Text(text) = &replacement.new_content.msgtype { return Ok(Some((replacement.event_id.to_owned(), text.body.clone()))); } } None => if let MessageType::Text(text) = &orig_msg.content.msgtype { return Ok(Some((orig_msg.event_id.to_owned(), text.body.clone()))); } } } } } } match response.end { Some(token) => continue_from = String::from(token), None => break, } } debug!("no messages found!"); Ok(None) } /// Post a new message to the given Matrix channel. async fn post_new(client: &Client, room_id: OwnedRoomId, log: &Log, link: &str) -> Result<()> { let line = format!("{} {}", state_emoji(log), link); let logbag: LogBag = vec![(log.category(), vec![line])].into(); let body = logbag.render_plain(); let html = logbag.render_html(); client .get_room(&room_id) .unwrap() .send( AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_html(body, html)), ) .await?; Ok(()) } /// Updates the given message with some new text /// /// This constructs and sends the right Matrix API message. async fn update_message( client: &Client, room_id: OwnedRoomId, old_id: OwnedEventId, new_text: &str, new_html: &str, ) -> Result<()> { let new_content = RoomMessageEventContent::text_html(new_text, new_html); let content = new_content.make_replacement(ReplacementMetadata::new(old_id.clone(), None), None); client .get_room(&room_id) .unwrap() .send(content) .await?; Ok(()) } /// Inserts the given log into the text. /// /// The text is first parsed as a [`LogBag`], then the link is formatted and inserted. fn insert_log(old_text: &str, log: &Log, link: &str) -> LogBag { let line = format!("{} {}", state_emoji(log), link); let mut logbag = LogBag::parse_plain(old_text).unwrap(); logbag.insert(log.category(), line); logbag }