diff options
author | networkjanitor <networkjanitor@xafy.de> | 2024-07-18 01:06:41 +0200 |
---|---|---|
committer | networkjanitor <networkjanitor@xafy.de> | 2024-07-18 01:06:41 +0200 |
commit | 9c129458984f71cb4fc7fb5bcdcbae76b4f1e732 (patch) | |
tree | 65c33391cb80af081c2f27b15beddfdac74cefb2 /src | |
parent | 6bc512659fd301562a194e991e0e31429944173e (diff) | |
download | ezau-9c129458984f71cb4fc7fb5bcdcbae76b4f1e732.tar.gz ezau-9c129458984f71cb4fc7fb5bcdcbae76b4f1e732.tar.bz2 ezau-9c129458984f71cb4fc7fb5bcdcbae76b4f1e732.zip |
wip: defect e2e find_messages
Diffstat (limited to 'src')
-rw-r--r-- | src/config.rs | 2 | ||||
-rw-r--r-- | src/matrix.rs | 318 |
2 files changed, 273 insertions, 47 deletions
diff --git a/src/config.rs b/src/config.rs index bf9af1e..1c44aea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,7 +41,7 @@ pub struct Discord { #[derive(Debug, Clone, Deserialize)] pub struct Matrix { /// Matrix homeserver. - pub homeserver: reqwest::Url, + pub homeserver: String, /// Matrix username. pub username: String, /// Matrix password. diff --git a/src/matrix.rs b/src/matrix.rs index 1ae79e7..cc6ba28 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -4,28 +4,35 @@ use super::logbag::{state_emoji, LogBag}; use std::convert::TryFrom; use std::time::{Duration, SystemTime}; +use std::path::{Path, PathBuf}; use anyhow::Result; use evtclib::Log; use log::{debug, info}; use tokio::runtime::Runtime; - -use reqwest::Url; +use tokio::fs; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use serde::{Deserialize, Serialize}; use matrix_sdk::{ + config::SyncSettings, + matrix_auth::MatrixSession, ruma::{ - api::client::r0::message::get_message_events, - events::room::message::{MessageEventContent, MessageType, Relation, Replacement}, - events::{AnyMessageEvent, AnyMessageEventContent, AnyRoomEvent}, - identifiers::{EventId, RoomId, UserId}, + api::client::message::get_message_events, + api::client::filter::FilterDefinition, + events::room::message::{RoomMessageEventContent, MessageType, Relation, ReplacementMetadata}, + events::{AnyMessageLikeEvent, AnyMessageLikeEventContent, AnyTimelineEvent}, + OwnedEventId, + UserId, MilliSecondsSinceUnixEpoch, UInt, + RoomId, OwnedRoomId }, Client }; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MatrixUser { - pub homeserver: Url, + pub homeserver: String, pub username: String, pub password: String, pub device_id: Option<String>, @@ -42,6 +49,37 @@ impl From<config::Matrix> for MatrixUser { } } +/// The data needed to re-build a client. +#[derive(Debug, Serialize, Deserialize)] +struct ClientSession { + /// The URL of the homeserver of the user. + homeserver: String, + + /// The path of the database. + db_path: PathBuf, + + /// The passphrase of the database. + passphrase: String, +} + +/// The full session to persist. +#[derive(Debug, Serialize, Deserialize)] +struct FullSession { + /// The data to re-build the client. + client_session: ClientSession, + + /// The Matrix user session. + user_session: MatrixSession, + + /// The latest sync token. + /// + /// It is only needed to persist it when using `Client::sync_once()` and we + /// want to make our syncs faster by not receiving all the initial sync + /// again. + #[serde(skip_serializing_if = "Option::is_none")] + sync_token: Option<String>, +} + /// Maximum age of the message to still be edited. const MAX_HOURS: u64 = 5; /// Amount of messages to be loaded in one chunk. @@ -57,39 +95,214 @@ const MESSAGE_CHUNK_COUNT: u16 = 3; /// the homeserver. pub fn post_link(user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Result<()> { let rt = Runtime::new()?; - let room_id = RoomId::try_from(room_id)?; + let room_id = RoomId::parse(room_id)?; rt.block_on(async { - let client = Client::new(user.homeserver)?; - let my_data = client - .login( - &user.username, - &user.password, - user.device_id.as_ref().map(|x| x as &str), - None, - ) - .await?; - info!("Matrix connected as {:?}", my_data.user_id); - - let old_msg = find_message(&client, &my_data.user_id, &room_id).await?; + // The folder containing this user's data. + let data_dir = dirs::data_dir().expect("no data_dir directory found").join("ezau-matrix-store"); + // The file where the session is persisted. + let session_file = data_dir.join("session"); + + let (client, sync_token) = if session_file.exists() { + restore_session(&session_file).await? + } else { + (login(&data_dir, &session_file, user.homeserver, user.username, user.password).await?, None) + }; + + sync(&client, sync_token, &session_file).await?; + + info!("Matrix connected as {:?}", client.user_id()); + + let old_msg = find_message(&client, &client.user_id().unwrap(), &room_id).await?; match old_msg { None => { debug!("Creating a fresh message for matrix"); - post_new(&client, &room_id, log, link).await?; + post_new(&client, room_id, log, link).await?; } Some((old_id, old_text)) => { debug!("Updating message {:?}", old_id); + debug!("Updating message body {:?}", old_text); let logbag = insert_log(&old_text, log, link); let new_text = logbag.render_plain(); + debug!("New message body {:?}", new_text); let new_html = logbag.render_html(); - update_message(&client, &room_id, &old_id, &new_text, &new_html).await?; + update_message(&client, room_id, old_id, &new_text, &new_html).await?; } } Ok(()) }) } +/// Restore a previous session. +async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<String>)> { + println!("Previous session found in '{}'", session_file.to_string_lossy()); + + // The session was serialized as JSON in a file. + let serialized_session = fs::read_to_string(session_file).await?; + let FullSession { client_session, user_session, sync_token } = + serde_json::from_str(&serialized_session)?; + + // Build the client with the previous settings from the session. + let client = Client::builder() + .homeserver_url(client_session.homeserver) + .sqlite_store(client_session.db_path, Some(&client_session.passphrase)) + .build() + .await?; + + println!("Restoring session for {}…", user_session.meta.user_id); + + // Restore the Matrix user session. + client.restore_session(user_session).await?; + + Ok((client, sync_token)) +} + +/// Login with a new device. +async fn login(data_dir: &Path, session_file: &Path, homeserver: String, username: String, password: String) -> anyhow::Result<Client> { + println!("No previous session found, logging in…"); + + let (client, client_session) = build_client(data_dir, homeserver).await?; + let matrix_auth = client.matrix_auth(); + + loop { + match matrix_auth + .login_username(&username, &password) + .initial_device_display_name("ezau client") + .await + { + Ok(_) => { + println!("Logged in as {username}"); + break; + } + Err(error) => { + println!("Error logging in: {error}"); + println!("Please try again\n"); + } + } + } + + // Persist the session to reuse it later. + // This is not very secure, for simplicity. If the system provides a way of + // storing secrets securely, it should be used instead. + // Note that we could also build the user session from the login response. + let user_session = matrix_auth.session().expect("A logged-in client should have a session"); + let serialized_session = + serde_json::to_string(&FullSession { client_session, user_session, sync_token: None })?; + fs::write(session_file, serialized_session).await?; + + println!("Session persisted in {}", session_file.to_string_lossy()); + + // After logging in, you might want to verify this session with another one, or bootstrap + // cross-signing if this is your first session with encryption, or if you need to reset + // cross-signing because you don't have access to your old sessions. + + Ok(client) +} + +/// Build a new client. +async fn build_client(data_dir: &Path, homeserver: String) -> anyhow::Result<(Client, ClientSession)> { + let mut rng = thread_rng(); + + // Generating a subfolder for the database is not mandatory, but it is useful if + // you allow several clients to run at the same time. Each one must have a + // separate database, which is a different folder with the SQLite store. + let db_subfolder: String = + (&mut rng).sample_iter(Alphanumeric).take(7).map(char::from).collect(); + let db_path = data_dir.join(db_subfolder); + + // Generate a random passphrase. + let passphrase: String = + (&mut rng).sample_iter(Alphanumeric).take(32).map(char::from).collect(); + + // We create a loop here so the user can retry if an error happens. + loop { + match Client::builder() + .homeserver_url(&homeserver) + // We use the SQLite store, which is enabled by default. This is the crucial part to + // persist the encryption setup. + // Note that other store backends are available and you can even implement your own. + .sqlite_store(&db_path, Some(&passphrase)) + .build() + .await + { + Ok(client) => return Ok((client, ClientSession { homeserver, db_path, passphrase })), + Err(error) => match &error { + matrix_sdk::ClientBuildError::AutoDiscovery(_) + | matrix_sdk::ClientBuildError::Url(_) + | matrix_sdk::ClientBuildError::Http(_) => { + println!("Error checking the homeserver: {error}"); + println!("Please try again\n"); + } + _ => { + // Forward other errors, it's unlikely we can retry with a different outcome. + return Err(error.into()); + } + }, + } + } +} + +/// Setup the client to listen to new messages. +async fn sync( + client: &Client, + initial_sync_token: Option<String>, + session_file: &Path, +) -> anyhow::Result<()> { + println!("Launching a first sync to ignore past messages…"); + + // Enable room members lazy-loading, it will speed up the initial sync a lot + // with accounts in lots of rooms. + // See <https://spec.matrix.org/v1.6/client-server-api/#lazy-loading-room-members>. + let filter = FilterDefinition::with_lazy_loading(); + + let mut sync_settings = SyncSettings::default().filter(filter.into()); + + // We restore the sync where we left. + // This is not necessary when not using `sync_once`. The other sync methods get + // the sync token from the store. + if let Some(sync_token) = initial_sync_token { + sync_settings = sync_settings.token(sync_token); + } + + // Let's ignore messages before the program was launched. + // This is a loop in case the initial sync is longer than our timeout. The + // server should cache the response and it will ultimately take less time to + // receive. + loop { + match client.sync_once(sync_settings.clone()).await { + Ok(response) => { + // This is the last time we need to provide this token, the sync method after + // will handle it on its own. + sync_settings = sync_settings.token(response.next_batch.clone()); + persist_sync_token(session_file, response.next_batch).await?; + break; + } + Err(error) => { + println!("An error occurred during initial sync: {error}"); + println!("Trying again…"); + } + } + } + + println!("The client is ready!"); + Ok(()) +} + +/// Persist the sync token for a future session. +/// Note that this is needed only when using `sync_once`. Other sync methods get +/// the sync token from the store. +async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { + let serialized_session = fs::read_to_string(session_file).await?; + let mut full_session: FullSession = serde_json::from_str(&serialized_session)?; + + full_session.sync_token = Some(sync_token); + let serialized_session = serde_json::to_string(&full_session)?; + fs::write(session_file, serialized_session).await?; + + Ok(()) +} + /// Finds the right message if there is one to edit. /// /// Either returns the message ID and the old message text, or None if no suitable message was @@ -97,8 +310,8 @@ pub fn post_link(user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Resu async fn find_message( client: &Client, my_id: &UserId, - room_id: &RoomId, -) -> Result<Option<(EventId, String)>> { + room_id: &OwnedRoomId, +) -> Result<Option<(OwnedEventId, String)>> { let limit = UInt::try_from(MESSAGE_CHUNK_SIZE).unwrap(); let time_limit = MilliSecondsSinceUnixEpoch::from_system_time( SystemTime::now() - Duration::from_secs(MAX_HOURS * 60 * 60), @@ -114,22 +327,35 @@ async fn find_message( MESSAGE_CHUNK_COUNT ); - let mut request = get_message_events::Request::backward(room_id, &continue_from); + let mut request = get_message_events::v3::Request::backward(room_id.clone()).from(continue_from); request.limit = limit; let response = client.send(request, None).await?; for raw_message in response.chunk { - if let Ok(AnyRoomEvent::Message(AnyMessageEvent::RoomMessage(msg))) = + if let Ok(AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomMessage(msg))) = raw_message.deserialize() { - if &msg.sender == my_id && msg.origin_server_ts >= time_limit { - if let MessageType::Text(text) = msg.content.msgtype { - if !matches!( - msg.content.relates_to, - Some(Relation::Reply { .. } | Relation::Replacement(..)) - ) { - return Ok(Some((msg.event_id, text.body))); + debug!("my id: {:?}", my_id); + debug!("time limit: {:?}", time_limit); + if msg.sender() == my_id && msg.origin_server_ts() >= time_limit { + // filters out redacted messages + if let Some(orig_msg) = &msg.as_original() { + if let MessageType::Text(text) = &orig_msg.content.msgtype { + if !matches!( + orig_msg.content.relates_to, + Some(Relation::Reply { .. } | Relation::Replacement(..)) + ) { + return Ok(Some((orig_msg.event_id.to_owned(), text.body.clone()))); + } else { + debug!("Rejected because of Relations: {:?}", msg); + } + } else { + debug!("Rejected because of msgtype: {:?}", msg); } + } else { + debug!("Rejected original message: {:?}", msg); } + } else { + debug!("Rejected because of sender/time limit: {:?}", msg); } } } @@ -139,21 +365,22 @@ async fn find_message( None => break, } } + debug!("no messages found!"); Ok(None) } /// Post a new message to the given Matrix channel. -async fn post_new(client: &Client, room_id: &RoomId, log: &Log, link: &str) -> Result<()> { +async fn post_new(client: &Client, room_id: OwnedRoomId, log: &Log, link: &str) -> Result<()> { let line = format!("{} {}", state_emoji(log), link); let logbag: LogBag = vec![(log.category().to_string(), vec![line])].into(); let body = logbag.render_plain(); let html = logbag.render_html(); client - .room_send( - room_id, - AnyMessageEventContent::RoomMessage(MessageEventContent::text_html(body, html)), - None, + .get_room(&room_id) + .unwrap() + .send( + AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_html(body, html)), ) .await?; Ok(()) @@ -164,18 +391,17 @@ async fn post_new(client: &Client, room_id: &RoomId, log: &Log, link: &str) -> R /// This constructs and sends the right Matrix API message. async fn update_message( client: &Client, - room_id: &RoomId, - old_id: &EventId, + room_id: OwnedRoomId, + old_id: OwnedEventId, new_text: &str, new_html: &str, ) -> Result<()> { - let mut message = MessageEventContent::text_html(new_text, new_html); - message.relates_to = Some(Relation::Replacement(Replacement::new( - old_id.clone(), - Box::new(MessageEventContent::text_html(new_text, new_html)), - ))); + let new_content = RoomMessageEventContent::text_html(new_text, new_html); + let content = new_content.make_replacement(ReplacementMetadata::new(old_id.clone(), None), None); client - .room_send(room_id, AnyMessageEventContent::RoomMessage(message), None) + .get_room(&room_id) + .unwrap() + .send(content) .await?; Ok(()) } |