aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniel <kingdread@gmx.de>2024-07-19 21:20:24 +0000
committerDaniel <kingdread@gmx.de>2024-07-19 21:20:24 +0000
commit99a05ae1e0d613c7b5c381fce26d4e2da0b560f3 (patch)
tree55eb10780866a8807746c332413646d1c980530a /src
parent6bc512659fd301562a194e991e0e31429944173e (diff)
parenta141d4fcdeba85d07e190338cf9386019ea03de9 (diff)
downloadezau-99a05ae1e0d613c7b5c381fce26d4e2da0b560f3.tar.gz
ezau-99a05ae1e0d613c7b5c381fce26d4e2da0b560f3.tar.bz2
ezau-99a05ae1e0d613c7b5c381fce26d4e2da0b560f3.zip
Merge branch 'update-matrix-sdk-integration' into 'master'
feat: matrix e2e messaging See merge request dunj3/ezau!2
Diffstat (limited to 'src')
-rw-r--r--src/config.rs2
-rw-r--r--src/matrix.rs339
2 files changed, 286 insertions, 55 deletions
diff --git a/src/config.rs b/src/config.rs
index bf9af1e..1c44aea 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -41,7 +41,7 @@ pub struct Discord {
#[derive(Debug, Clone, Deserialize)]
pub struct Matrix {
/// Matrix homeserver.
- pub homeserver: reqwest::Url,
+ pub homeserver: String,
/// Matrix username.
pub username: String,
/// Matrix password.
diff --git a/src/matrix.rs b/src/matrix.rs
index 1ae79e7..8be0e38 100644
--- a/src/matrix.rs
+++ b/src/matrix.rs
@@ -4,28 +4,35 @@ use super::logbag::{state_emoji, LogBag};
use std::convert::TryFrom;
use std::time::{Duration, SystemTime};
+use std::path::{Path, PathBuf};
use anyhow::Result;
use evtclib::Log;
-use log::{debug, info};
+use log::{debug, info, warn, error};
use tokio::runtime::Runtime;
-
-use reqwest::Url;
+use tokio::fs;
+use rand::{distributions::Alphanumeric, thread_rng, Rng};
+use serde::{Deserialize, Serialize};
use matrix_sdk::{
+ config::SyncSettings,
+ matrix_auth::MatrixSession,
ruma::{
- api::client::r0::message::get_message_events,
- events::room::message::{MessageEventContent, MessageType, Relation, Replacement},
- events::{AnyMessageEvent, AnyMessageEventContent, AnyRoomEvent},
- identifiers::{EventId, RoomId, UserId},
+ api::client::filter::FilterDefinition,
+ events::room::message::{RoomMessageEventContent, MessageType, Relation, ReplacementMetadata},
+ events::{AnyMessageLikeEvent, AnyMessageLikeEventContent, AnyTimelineEvent},
+ OwnedEventId,
+ UserId,
MilliSecondsSinceUnixEpoch, UInt,
+ RoomId, OwnedRoomId
},
+ room::MessagesOptions,
Client
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MatrixUser {
- pub homeserver: Url,
+ pub homeserver: String,
pub username: String,
pub password: String,
pub device_id: Option<String>,
@@ -42,6 +49,39 @@ impl From<config::Matrix> for MatrixUser {
}
}
+/// The data needed to re-build a client.
+/// Copied from the matrix-rust-sdk persist-session example.
+#[derive(Debug, Serialize, Deserialize)]
+struct ClientSession {
+ /// The URL of the homeserver of the user.
+ homeserver: String,
+
+ /// The path of the database.
+ db_path: PathBuf,
+
+ /// The passphrase of the database.
+ passphrase: String,
+}
+
+/// The full session to persist.
+/// Copied from the matrix-rust-sdk persist-session example.
+#[derive(Debug, Serialize, Deserialize)]
+struct FullSession {
+ /// The data to re-build the client.
+ client_session: ClientSession,
+
+ /// The Matrix user session.
+ user_session: MatrixSession,
+
+ /// The latest sync token.
+ ///
+ /// It is only needed to persist it when using `Client::sync_once()` and we
+ /// want to make our syncs faster by not receiving all the initial sync
+ /// again.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ sync_token: Option<String>,
+}
+
/// Maximum age of the message to still be edited.
const MAX_HOURS: u64 = 5;
/// Amount of messages to be loaded in one chunk.
@@ -57,39 +97,219 @@ const MESSAGE_CHUNK_COUNT: u16 = 3;
/// the homeserver.
pub fn post_link(user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Result<()> {
let rt = Runtime::new()?;
- let room_id = RoomId::try_from(room_id)?;
+ let room_id = RoomId::parse(room_id)?;
rt.block_on(async {
- let client = Client::new(user.homeserver)?;
- let my_data = client
- .login(
- &user.username,
- &user.password,
- user.device_id.as_ref().map(|x| x as &str),
- None,
- )
- .await?;
- info!("Matrix connected as {:?}", my_data.user_id);
-
- let old_msg = find_message(&client, &my_data.user_id, &room_id).await?;
+ // The folder containing this user's data.
+ let data_dir = dirs::data_dir().expect("no data_dir directory found").join("ezau-matrix-store");
+ // The file where the session is persisted.
+ let session_file = data_dir.join("session");
+
+ let (client, sync_token) = if session_file.exists() {
+ restore_session(&session_file).await?
+ } else {
+ (login(&data_dir, &session_file, user.homeserver, user.username, user.password).await?, None)
+ };
+
+ sync(&client, sync_token, &session_file).await?;
+
+ info!("Matrix connected as {:?}", client.user_id());
+
+ let old_msg = find_message(&client, &client.user_id().unwrap(), &room_id).await?;
match old_msg {
None => {
debug!("Creating a fresh message for matrix");
- post_new(&client, &room_id, log, link).await?;
+ post_new(&client, room_id, log, link).await?;
}
Some((old_id, old_text)) => {
debug!("Updating message {:?}", old_id);
+ debug!("Updating message body {:?}", old_text);
let logbag = insert_log(&old_text, log, link);
let new_text = logbag.render_plain();
+ debug!("New message body {:?}", new_text);
let new_html = logbag.render_html();
- update_message(&client, &room_id, &old_id, &new_text, &new_html).await?;
+ update_message(&client, room_id, old_id, &new_text, &new_html).await?;
}
}
Ok(())
})
}
+/// Restore a previous session.
+/// Copied from the matrix-rust-sdk persist-session example.
+async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<String>)> {
+ debug!("Previous session found in '{}'", session_file.to_string_lossy());
+
+ // The session was serialized as JSON in a file.
+ let serialized_session = fs::read_to_string(session_file).await?;
+ let FullSession { client_session, user_session, sync_token } =
+ serde_json::from_str(&serialized_session)?;
+
+ // Build the client with the previous settings from the session.
+ let client = Client::builder()
+ .homeserver_url(client_session.homeserver)
+ .sqlite_store(client_session.db_path, Some(&client_session.passphrase))
+ .build()
+ .await?;
+
+ debug!("Restoring session for {}…", user_session.meta.user_id);
+
+ // Restore the Matrix user session.
+ client.restore_session(user_session).await?;
+
+ Ok((client, sync_token))
+}
+
+/// Login with a new device.
+/// Copied from the matrix-rust-sdk persist-session example.
+async fn login(data_dir: &Path, session_file: &Path, homeserver: String, username: String, password: String) -> anyhow::Result<Client> {
+ info!("No previous session found, logging in…");
+
+ let (client, client_session) = build_client(data_dir, homeserver).await?;
+ let matrix_auth = client.matrix_auth();
+
+ loop {
+ match matrix_auth
+ .login_username(&username, &password)
+ .initial_device_display_name("ezau client")
+ .await
+ {
+ Ok(_) => {
+ info!("Logged in as {username}");
+ break;
+ }
+ Err(error) => {
+ error!("Error logging in: {error}");
+ error!("Please try again\n");
+ }
+ }
+ }
+
+ // Persist the session to reuse it later.
+ // This is not very secure, for simplicity. If the system provides a way of
+ // storing secrets securely, it should be used instead.
+ // Note that we could also build the user session from the login response.
+ let user_session = matrix_auth.session().expect("A logged-in client should have a session");
+ let serialized_session =
+ serde_json::to_string(&FullSession { client_session, user_session, sync_token: None })?;
+ fs::write(session_file, serialized_session).await?;
+
+ info!("Session persisted in {}", session_file.to_string_lossy());
+
+ // After logging in, you might want to verify this session with another one, or bootstrap
+ // cross-signing if this is your first session with encryption, or if you need to reset
+ // cross-signing because you don't have access to your old sessions.
+
+ Ok(client)
+}
+
+/// Build a new client.
+/// Copied from the matrix-rust-sdk persist-session example.
+async fn build_client(data_dir: &Path, homeserver: String) -> anyhow::Result<(Client, ClientSession)> {
+ let mut rng = thread_rng();
+
+ // Generating a subfolder for the database is not mandatory, but it is useful if
+ // you allow several clients to run at the same time. Each one must have a
+ // separate database, which is a different folder with the SQLite store.
+ let db_subfolder: String =
+ (&mut rng).sample_iter(Alphanumeric).take(7).map(char::from).collect();
+ let db_path = data_dir.join(db_subfolder);
+
+ // Generate a random passphrase.
+ let passphrase: String =
+ (&mut rng).sample_iter(Alphanumeric).take(32).map(char::from).collect();
+
+ // We create a loop here so the user can retry if an error happens.
+ loop {
+ match Client::builder()
+ .homeserver_url(&homeserver)
+ // We use the SQLite store, which is enabled by default. This is the crucial part to
+ // persist the encryption setup.
+ // Note that other store backends are available and you can even implement your own.
+ .sqlite_store(&db_path, Some(&passphrase))
+ .build()
+ .await
+ {
+ Ok(client) => return Ok((client, ClientSession { homeserver, db_path, passphrase })),
+ Err(error) => match &error {
+ matrix_sdk::ClientBuildError::AutoDiscovery(_)
+ | matrix_sdk::ClientBuildError::Url(_)
+ | matrix_sdk::ClientBuildError::Http(_) => {
+ error!("Error checking the homeserver: {error}");
+ error!("Please try again\n");
+ }
+ _ => {
+ // Forward other errors, it's unlikely we can retry with a different outcome.
+ return Err(error.into());
+ }
+ },
+ }
+ }
+}
+
+/// Setup the client to listen to new messages.
+/// Copied from the matrix-rust-sdk persist-session example.
+async fn sync(
+ client: &Client,
+ initial_sync_token: Option<String>,
+ session_file: &Path,
+) -> anyhow::Result<()> {
+ debug!("Launching a first sync to ignore past messages…");
+
+ // Enable room members lazy-loading, it will speed up the initial sync a lot
+ // with accounts in lots of rooms.
+ // See <https://spec.matrix.org/v1.6/client-server-api/#lazy-loading-room-members>.
+ let filter = FilterDefinition::with_lazy_loading();
+
+ let mut sync_settings = SyncSettings::default().filter(filter.into());
+
+ // We restore the sync where we left.
+ // This is not necessary when not using `sync_once`. The other sync methods get
+ // the sync token from the store.
+ if let Some(sync_token) = initial_sync_token {
+ sync_settings = sync_settings.token(sync_token);
+ }
+
+ // Let's ignore messages before the program was launched.
+ // This is a loop in case the initial sync is longer than our timeout. The
+ // server should cache the response and it will ultimately take less time to
+ // receive.
+ loop {
+ match client.sync_once(sync_settings.clone()).await {
+ Ok(response) => {
+ // This is the last time we need to provide this token, the sync method after
+ // will handle it on its own.
+ sync_settings = sync_settings.token(response.next_batch.clone());
+ persist_sync_token(session_file, response.next_batch).await?;
+ break;
+ }
+ Err(error) => {
+ warn!("An error occurred during initial sync: {error}");
+ warn!("Trying again…");
+ }
+ }
+ }
+
+ debug!("The client is ready!");
+ Ok(())
+}
+
+/// Persist the sync token for a future session.
+/// Note that this is needed only when using `sync_once`. Other sync methods get
+/// the sync token from the store.
+/// Copied from the matrix-rust-sdk persist-session example.
+async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> {
+ let serialized_session = fs::read_to_string(session_file).await?;
+ let mut full_session: FullSession = serde_json::from_str(&serialized_session)?;
+
+ full_session.sync_token = Some(sync_token);
+ let serialized_session = serde_json::to_string(&full_session)?;
+ fs::write(session_file, serialized_session).await?;
+
+ Ok(())
+}
+
/// Finds the right message if there is one to edit.
///
/// Either returns the message ID and the old message text, or None if no suitable message was
@@ -97,14 +317,15 @@ pub fn post_link(user: MatrixUser, room_id: &str, log: &Log, link: &str) -> Resu
async fn find_message(
client: &Client,
my_id: &UserId,
- room_id: &RoomId,
-) -> Result<Option<(EventId, String)>> {
+ room_id: &OwnedRoomId,
+) -> Result<Option<(OwnedEventId, String)>> {
let limit = UInt::try_from(MESSAGE_CHUNK_SIZE).unwrap();
let time_limit = MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() - Duration::from_secs(MAX_HOURS * 60 * 60),
)
.expect("Our time limit is before the epoch");
let mut continue_from = String::new();
+ let room = client.get_room(room_id).unwrap();
for chunk_nr in 0..MESSAGE_CHUNK_COUNT {
debug!(
@@ -114,46 +335,57 @@ async fn find_message(
MESSAGE_CHUNK_COUNT
);
- let mut request = get_message_events::Request::backward(room_id, &continue_from);
- request.limit = limit;
- let response = client.send(request, None).await?;
- for raw_message in response.chunk {
- if let Ok(AnyRoomEvent::Message(AnyMessageEvent::RoomMessage(msg))) =
- raw_message.deserialize()
+ let mut options = MessagesOptions::backward().from(Some(continue_from).as_deref());
+ options.limit = limit;
+ let response = room.messages(options).await?;
+ for timeline_message in response.chunk {
+ if let Ok(AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomMessage(msg))) =
+ timeline_message.event.deserialize()
{
- if &msg.sender == my_id && msg.origin_server_ts >= time_limit {
- if let MessageType::Text(text) = msg.content.msgtype {
- if !matches!(
- msg.content.relates_to,
- Some(Relation::Reply { .. } | Relation::Replacement(..))
- ) {
- return Ok(Some((msg.event_id, text.body)));
+ if msg.sender() == my_id && msg.origin_server_ts() >= time_limit {
+ // filters out redacted messages - but apparently it doesn't?
+ // as_original() should return only the original, if not redacted.
+ // maybe the sync is still wrong and doesn't "apply" redactions to
+ // room.messages?
+ if let Some(orig_msg) = &msg.as_original() {
+ match &orig_msg.content.relates_to {
+ Some(relation) =>
+ if let Relation::Replacement(replacement) = relation {
+ if let MessageType::Text(text) = &replacement.new_content.msgtype {
+ return Ok(Some((replacement.event_id.to_owned(), text.body.clone())));
+ }
+ }
+ None =>
+ if let MessageType::Text(text) = &orig_msg.content.msgtype {
+ return Ok(Some((orig_msg.event_id.to_owned(), text.body.clone())));
+ }
}
- }
- }
+ }
+ }
}
}
match response.end {
- Some(token) => continue_from = token,
+ Some(token) => continue_from = String::from(token),
None => break,
}
}
+ debug!("no messages found!");
Ok(None)
}
/// Post a new message to the given Matrix channel.
-async fn post_new(client: &Client, room_id: &RoomId, log: &Log, link: &str) -> Result<()> {
+async fn post_new(client: &Client, room_id: OwnedRoomId, log: &Log, link: &str) -> Result<()> {
let line = format!("{} {}", state_emoji(log), link);
let logbag: LogBag = vec![(log.category().to_string(), vec![line])].into();
let body = logbag.render_plain();
let html = logbag.render_html();
client
- .room_send(
- room_id,
- AnyMessageEventContent::RoomMessage(MessageEventContent::text_html(body, html)),
- None,
+ .get_room(&room_id)
+ .unwrap()
+ .send(
+ AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_html(body, html)),
)
.await?;
Ok(())
@@ -164,18 +396,17 @@ async fn post_new(client: &Client, room_id: &RoomId, log: &Log, link: &str) -> R
/// This constructs and sends the right Matrix API message.
async fn update_message(
client: &Client,
- room_id: &RoomId,
- old_id: &EventId,
+ room_id: OwnedRoomId,
+ old_id: OwnedEventId,
new_text: &str,
new_html: &str,
) -> Result<()> {
- let mut message = MessageEventContent::text_html(new_text, new_html);
- message.relates_to = Some(Relation::Replacement(Replacement::new(
- old_id.clone(),
- Box::new(MessageEventContent::text_html(new_text, new_html)),
- )));
+ let new_content = RoomMessageEventContent::text_html(new_text, new_html);
+ let content = new_content.make_replacement(ReplacementMetadata::new(old_id.clone(), None), None);
client
- .room_send(room_id, AnyMessageEventContent::RoomMessage(message), None)
+ .get_room(&room_id)
+ .unwrap()
+ .send(content)
.await?;
Ok(())
}