From 452f66388417746984f45cc3de5410b2193fc82e Mon Sep 17 00:00:00 2001 From: Xevion Date: Thu, 2 Jan 2025 14:39:27 -0600 Subject: [PATCH] Add delete_download incoming message handler, add future message logging for transmit --- src/main.rs | 43 ++++++++++++++++++++++++++++++++++++------- src/models.rs | 12 ++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2cd2dc2..4bee38f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -84,12 +84,25 @@ async fn handle_socket(session_id: u32, websocket: WebSocket) { // Use an unbounded channel to handle buffering and flushing of messages to the websocket... let (tx_channel, tx_channel_rx) = mpsc::unbounded_channel(); let transmit = UnboundedReceiverStream::new(tx_channel_rx); - let fut_handle_tx_buffer = transmit.forward(socket_tx).map(|result| { - tracing::debug!("WebSocket send result: {:?}", result); - if let Err(e) = result { - tracing::error!(error = ?e, "websocket send error"); - } - }); + let fut_handle_tx_buffer = transmit + .then(|message| async { + match message { + Ok(ref message) => { + tracing::debug!(message = ?message, "Outgoing Message"); + } + Err(ref e) => { + tracing::error!(error = ?e, "Outgoing Message Error"); + } + } + message + }) + .forward(socket_tx) + .map(|result| { + tracing::debug!("WebSocket send result: {:?}", result); + if let Err(e) = result { + tracing::error!(error = ?e, "websocket send error"); + } + }); tokio::task::spawn(fut_handle_tx_buffer); let store = &mut *STORE.lock().await; @@ -144,7 +157,23 @@ async fn handle_socket(session_id: u32, websocket: WebSocket) { // Deserialize match serde_json::from_str::(text) { Ok(message) => { - tracing::info!("Received message: {:?}", message); + tracing::debug!(message = ?message, "Received message"); + + match message { + IncomingMessage::DeleteDownloadToken { id } => { + let store = &mut *STORE.lock().await; + let session = store + .sessions + .get_mut(&session_id) + .expect("Session not found"); + + if session.delete_download(id) { + session + .send_state() + .expect("Failed to buffer state message"); + } + } + } } Err(e) => { tracing::error!("Error deserializing message: {} {}", text, e); diff --git a/src/models.rs b/src/models.rs index b6bb2b3..92cf61c 100644 --- a/src/models.rs +++ b/src/models.rs @@ -51,6 +51,18 @@ impl Session { return self.downloads.last().unwrap(); } + // Delete a download from the session + // Returns true if the download was deleted, false if it was not found + pub fn delete_download(&mut self, token: u32) -> bool { + if let Some(index) = self.downloads.iter().position(|d| d.token == token) { + self.downloads.remove(index); + true + } else { + tracing::warn!("Attempted to delete non-existent download token: {}", token); + false + } + } + // This function's failure is not a failure to transmit the message, but a failure to buffer it into the channel (or any preceding steps). pub fn send_message(&mut self, message: OutgoingMessage) -> Result<(), anyhow::Error> { if self.tx.is_none() {