diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 2 | ||||
| -rw-r--r-- | src/matrix.rs | 318 | 
2 files changed, 273 insertions, 47 deletions
| diff --git a/src/config.rs b/src/config.rs index bf9af1e..1c44aea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,7 +41,7 @@ pub struct Discord {  #[derive(Debug, Clone, Deserialize)]  pub struct Matrix {      /// Matrix homeserver. -    pub homeserver: reqwest::Url, +    pub homeserver: String,      /// Matrix username.      pub username: String,      /// Matrix password. diff --git a/src/matrix.rs b/src/matrix.rs index 1ae79e7..cc6ba28 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -4,28 +4,35 @@ use super::logbag::{state_emoji, LogBag};  use std::convert::TryFrom;  use std::time::{Duration, SystemTime}; +use std::path::{Path, PathBuf};  use anyhow::Result;  use evtclib::Log;  use log::{debug, info};  use tokio::runtime::Runtime; - -use reqwest::Url; +use tokio::fs; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use serde::{Deserialize, Serialize};  use matrix_sdk::{ +    config::SyncSettings, +    matrix_auth::MatrixSession,      ruma::{ -        api::client::r0::message::get_message_events, -        events::room::message::{MessageEventContent, MessageType, Relation, Replacement}, -        events::{AnyMessageEvent, AnyMessageEventContent, AnyRoomEvent}, -        identifiers::{EventId, RoomId, UserId}, +        api::client::message::get_message_events, +        api::client::filter::FilterDefinition, +        events::room::message::{RoomMessageEventContent, MessageType, Relation, ReplacementMetadata}, +        events::{AnyMessageLikeEvent, AnyMessageLikeEventContent, AnyTimelineEvent}, +        OwnedEventId,  +        UserId,          MilliSecondsSinceUnixEpoch, UInt, +        RoomId, OwnedRoomId      },      Client  };  #[derive(Debug, Clone, PartialEq, Eq, Hash)]  pub struct MatrixUser { -    pub homeserver: Url, +    pub homeserver: String,      pub username: String,      pub password: String,      pub device_id: Option<String>, @@ -42,6 +49,37 @@ impl From<config::Matrix> for MatrixUser {      }  } +/// The data needed to re-build a client. +#[derive(Debug, Serialize, Deserialize)] +struct ClientSession { +    /// The URL of the homeserver of the user. +    homeserver: String, + +    /// The path of the database. +    db_path: PathBuf, + +    /// The passphrase of the database. +    passphrase: String, +} + +/// The full session to persist. +#[derive(Debug, Serialize, Deserialize)] +struct FullSession { +    /// The data to re-build the client. +    client_session: ClientSession, + +    /// The Matrix user session. +    user_session: MatrixSession, + +    /// The latest sync token. +    /// +    /// It is only needed to persist it when using `Client::sync_once()` and we +    /// want to make our syncs faster by not receiving all the initial sync +    /// again. +    #[serde(skip_serializing_if = "Option::is_none")] +    sync_token: Option<String>, +} +  /// Maximum age of the message to still be edited.  const MAX_HOURS: u64 = 5;  /// Amount of messages to be loaded in one chunk. @@ -57,39 +95,214 @@ const MESSAGE_CHUNK_COUNT: u16 = 3;  /// the homeserver.  pub fn post_link(user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Result<()> {      let rt = Runtime::new()?; -    let room_id = RoomId::try_from(room_id)?; +    let room_id = RoomId::parse(room_id)?;      rt.block_on(async { -        let client = Client::new(user.homeserver)?; -        let my_data = client -            .login( -                &user.username, -                &user.password, -                user.device_id.as_ref().map(|x| x as &str), -                None, -            ) -            .await?; -        info!("Matrix connected as {:?}", my_data.user_id); - -        let old_msg = find_message(&client, &my_data.user_id, &room_id).await?; +        // The folder containing this user's data. +        let data_dir = dirs::data_dir().expect("no data_dir directory found").join("ezau-matrix-store"); +        // The file where the session is persisted. +        let session_file = data_dir.join("session"); + +        let (client, sync_token) = if session_file.exists() { +            restore_session(&session_file).await? +        } else { +            (login(&data_dir, &session_file, user.homeserver, user.username, user.password).await?, None) +        }; + +        sync(&client, sync_token, &session_file).await?; +         +        info!("Matrix connected as {:?}", client.user_id()); + +        let old_msg = find_message(&client, &client.user_id().unwrap(), &room_id).await?;          match old_msg {              None => {                  debug!("Creating a fresh message for matrix"); -                post_new(&client, &room_id, log, link).await?; +                post_new(&client, room_id, log, link).await?;              }              Some((old_id, old_text)) => {                  debug!("Updating message {:?}", old_id); +                debug!("Updating message body {:?}", old_text);                  let logbag = insert_log(&old_text, log, link);                  let new_text = logbag.render_plain(); +                debug!("New message body {:?}", new_text);                  let new_html = logbag.render_html(); -                update_message(&client, &room_id, &old_id, &new_text, &new_html).await?; +                update_message(&client, room_id, old_id, &new_text, &new_html).await?;              }          }          Ok(())      })  } +/// Restore a previous session. +async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<String>)> { +    println!("Previous session found in '{}'", session_file.to_string_lossy()); + +    // The session was serialized as JSON in a file. +    let serialized_session = fs::read_to_string(session_file).await?; +    let FullSession { client_session, user_session, sync_token } = +        serde_json::from_str(&serialized_session)?; + +    // Build the client with the previous settings from the session. +    let client = Client::builder() +        .homeserver_url(client_session.homeserver) +        .sqlite_store(client_session.db_path, Some(&client_session.passphrase)) +        .build() +        .await?; + +    println!("Restoring session for {}…", user_session.meta.user_id); + +    // Restore the Matrix user session. +    client.restore_session(user_session).await?; + +    Ok((client, sync_token)) +} + +/// Login with a new device. +async fn login(data_dir: &Path, session_file: &Path, homeserver: String, username: String, password: String) -> anyhow::Result<Client> { +    println!("No previous session found, logging in…"); + +    let (client, client_session) = build_client(data_dir, homeserver).await?; +    let matrix_auth = client.matrix_auth(); + +    loop { +        match matrix_auth +            .login_username(&username, &password) +            .initial_device_display_name("ezau client") +            .await +        { +            Ok(_) => { +                println!("Logged in as {username}"); +                break; +            } +            Err(error) => { +                println!("Error logging in: {error}"); +                println!("Please try again\n"); +            } +        } +    } + +    // Persist the session to reuse it later. +    // This is not very secure, for simplicity. If the system provides a way of +    // storing secrets securely, it should be used instead. +    // Note that we could also build the user session from the login response. +    let user_session = matrix_auth.session().expect("A logged-in client should have a session"); +    let serialized_session = +        serde_json::to_string(&FullSession { client_session, user_session, sync_token: None })?; +    fs::write(session_file, serialized_session).await?; + +    println!("Session persisted in {}", session_file.to_string_lossy()); + +    // After logging in, you might want to verify this session with another one, or bootstrap  +    // cross-signing if this is your first session with encryption, or if you need to reset  +    // cross-signing because you don't have access to your old sessions. + +    Ok(client) +} + +/// Build a new client. +async fn build_client(data_dir: &Path, homeserver: String) -> anyhow::Result<(Client, ClientSession)> { +    let mut rng = thread_rng(); + +    // Generating a subfolder for the database is not mandatory, but it is useful if +    // you allow several clients to run at the same time. Each one must have a +    // separate database, which is a different folder with the SQLite store. +    let db_subfolder: String = +        (&mut rng).sample_iter(Alphanumeric).take(7).map(char::from).collect(); +    let db_path = data_dir.join(db_subfolder); + +    // Generate a random passphrase. +    let passphrase: String = +        (&mut rng).sample_iter(Alphanumeric).take(32).map(char::from).collect(); + +    // We create a loop here so the user can retry if an error happens. +    loop { +        match Client::builder() +            .homeserver_url(&homeserver) +            // We use the SQLite store, which is enabled by default. This is the crucial part to +            // persist the encryption setup. +            // Note that other store backends are available and you can even implement your own. +            .sqlite_store(&db_path, Some(&passphrase)) +            .build() +            .await +        { +            Ok(client) => return Ok((client, ClientSession { homeserver, db_path, passphrase })), +            Err(error) => match &error { +                matrix_sdk::ClientBuildError::AutoDiscovery(_) +                | matrix_sdk::ClientBuildError::Url(_) +                | matrix_sdk::ClientBuildError::Http(_) => { +                    println!("Error checking the homeserver: {error}"); +                    println!("Please try again\n"); +                } +                _ => { +                    // Forward other errors, it's unlikely we can retry with a different outcome. +                    return Err(error.into()); +                } +            }, +        } +    } +} + +/// Setup the client to listen to new messages. +async fn sync( +    client: &Client, +    initial_sync_token: Option<String>, +    session_file: &Path, +) -> anyhow::Result<()> { +    println!("Launching a first sync to ignore past messages…"); + +    // Enable room members lazy-loading, it will speed up the initial sync a lot +    // with accounts in lots of rooms. +    // See <https://spec.matrix.org/v1.6/client-server-api/#lazy-loading-room-members>. +    let filter = FilterDefinition::with_lazy_loading(); + +    let mut sync_settings = SyncSettings::default().filter(filter.into()); + +    // We restore the sync where we left. +    // This is not necessary when not using `sync_once`. The other sync methods get +    // the sync token from the store. +    if let Some(sync_token) = initial_sync_token { +        sync_settings = sync_settings.token(sync_token); +    } + +    // Let's ignore messages before the program was launched. +    // This is a loop in case the initial sync is longer than our timeout. The +    // server should cache the response and it will ultimately take less time to +    // receive. +    loop { +        match client.sync_once(sync_settings.clone()).await { +            Ok(response) => { +                // This is the last time we need to provide this token, the sync method after +                // will handle it on its own. +                sync_settings = sync_settings.token(response.next_batch.clone()); +                persist_sync_token(session_file, response.next_batch).await?; +                break; +            } +            Err(error) => { +                println!("An error occurred during initial sync: {error}"); +                println!("Trying again…"); +            } +        } +    } + +    println!("The client is ready!"); +    Ok(()) +} + +/// Persist the sync token for a future session. +/// Note that this is needed only when using `sync_once`. Other sync methods get +/// the sync token from the store. +async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { +    let serialized_session = fs::read_to_string(session_file).await?; +    let mut full_session: FullSession = serde_json::from_str(&serialized_session)?; + +    full_session.sync_token = Some(sync_token); +    let serialized_session = serde_json::to_string(&full_session)?; +    fs::write(session_file, serialized_session).await?; + +    Ok(()) +} +  /// Finds the right message if there is one to edit.  ///  /// Either returns the message ID and the old message text, or None if no suitable message was @@ -97,8 +310,8 @@ pub fn post_link(user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Resu  async fn find_message(      client: &Client,      my_id: &UserId, -    room_id: &RoomId, -) -> Result<Option<(EventId, String)>> { +    room_id: &OwnedRoomId, +) -> Result<Option<(OwnedEventId, String)>> {      let limit = UInt::try_from(MESSAGE_CHUNK_SIZE).unwrap();      let time_limit = MilliSecondsSinceUnixEpoch::from_system_time(          SystemTime::now() - Duration::from_secs(MAX_HOURS * 60 * 60), @@ -114,22 +327,35 @@ async fn find_message(              MESSAGE_CHUNK_COUNT          ); -        let mut request = get_message_events::Request::backward(room_id, &continue_from); +        let mut request = get_message_events::v3::Request::backward(room_id.clone()).from(continue_from);          request.limit = limit;          let response = client.send(request, None).await?;          for raw_message in response.chunk { -            if let Ok(AnyRoomEvent::Message(AnyMessageEvent::RoomMessage(msg))) = +            if let Ok(AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomMessage(msg))) =                  raw_message.deserialize()              { -                if &msg.sender == my_id && msg.origin_server_ts >= time_limit { -                    if let MessageType::Text(text) = msg.content.msgtype { -                        if !matches!( -                            msg.content.relates_to, -                            Some(Relation::Reply { .. } | Relation::Replacement(..)) -                        ) { -                            return Ok(Some((msg.event_id, text.body))); +                debug!("my id: {:?}", my_id); +                debug!("time limit: {:?}", time_limit); +                if msg.sender() == my_id && msg.origin_server_ts() >= time_limit { +                    // filters out redacted messages +                    if let Some(orig_msg) = &msg.as_original() { +                        if let MessageType::Text(text) = &orig_msg.content.msgtype { +                            if !matches!( +                                orig_msg.content.relates_to, +                                Some(Relation::Reply { .. } | Relation::Replacement(..)) +                            ) { +                                return Ok(Some((orig_msg.event_id.to_owned(), text.body.clone()))); +                            } else { +                                debug!("Rejected because of Relations: {:?}", msg); +                            } +                        } else { +                            debug!("Rejected because of msgtype: {:?}", msg);                          } +                    } else { +                        debug!("Rejected original message: {:?}", msg);                      } +                } else { +                    debug!("Rejected because of sender/time limit: {:?}", msg);                  }              }          } @@ -139,21 +365,22 @@ async fn find_message(              None => break,          }      } +    debug!("no messages found!");      Ok(None)  }  /// Post a new message to the given Matrix channel. -async fn post_new(client: &Client, room_id: &RoomId, log: &Log, link: &str) -> Result<()> { +async fn post_new(client: &Client, room_id: OwnedRoomId, log: &Log, link: &str) -> Result<()> {      let line = format!("{} {}", state_emoji(log), link);      let logbag: LogBag = vec![(log.category().to_string(), vec![line])].into();      let body = logbag.render_plain();      let html = logbag.render_html();      client -        .room_send( -            room_id, -            AnyMessageEventContent::RoomMessage(MessageEventContent::text_html(body, html)), -            None, +        .get_room(&room_id) +        .unwrap() +        .send( +            AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_html(body, html)),          )          .await?;      Ok(()) @@ -164,18 +391,17 @@ async fn post_new(client: &Client, room_id: &RoomId, log: &Log, link: &str) -> R  /// This constructs and sends the right Matrix API message.  async fn update_message(      client: &Client, -    room_id: &RoomId, -    old_id: &EventId, +    room_id: OwnedRoomId, +    old_id: OwnedEventId,      new_text: &str,      new_html: &str,  ) -> Result<()> { -    let mut message = MessageEventContent::text_html(new_text, new_html); -    message.relates_to = Some(Relation::Replacement(Replacement::new( -        old_id.clone(), -        Box::new(MessageEventContent::text_html(new_text, new_html)), -    ))); +    let new_content = RoomMessageEventContent::text_html(new_text, new_html); +    let content = new_content.make_replacement(ReplacementMetadata::new(old_id.clone(), None), None);      client -        .room_send(room_id, AnyMessageEventContent::RoomMessage(message), None) +        .get_room(&room_id) +        .unwrap() +        .send(content)          .await?;      Ok(())  } | 
