Add delete_download incoming message handler, add future message logging for transmit

This commit is contained in:
2025-01-02 14:39:27 -06:00
parent 7736b0694e
commit 452f663884
2 changed files with 48 additions and 7 deletions

View File

@@ -84,7 +84,20 @@ async fn handle_socket(session_id: u32, websocket: WebSocket) {
// Use an unbounded channel to handle buffering and flushing of messages to the websocket...
let (tx_channel, tx_channel_rx) = mpsc::unbounded_channel();
let transmit = UnboundedReceiverStream::new(tx_channel_rx);
let fut_handle_tx_buffer = transmit.forward(socket_tx).map(|result| {
let fut_handle_tx_buffer = transmit
.then(|message| async {
match message {
Ok(ref message) => {
tracing::debug!(message = ?message, "Outgoing Message");
}
Err(ref e) => {
tracing::error!(error = ?e, "Outgoing Message Error");
}
}
message
})
.forward(socket_tx)
.map(|result| {
tracing::debug!("WebSocket send result: {:?}", result);
if let Err(e) = result {
tracing::error!(error = ?e, "websocket send error");
@@ -144,7 +157,23 @@ async fn handle_socket(session_id: u32, websocket: WebSocket) {
// Deserialize
match serde_json::from_str::<IncomingMessage>(text) {
Ok(message) => {
tracing::info!("Received message: {:?}", message);
tracing::debug!(message = ?message, "Received message");
match message {
IncomingMessage::DeleteDownloadToken { id } => {
let store = &mut *STORE.lock().await;
let session = store
.sessions
.get_mut(&session_id)
.expect("Session not found");
if session.delete_download(id) {
session
.send_state()
.expect("Failed to buffer state message");
}
}
}
}
Err(e) => {
tracing::error!("Error deserializing message: {} {}", text, e);

View File

@@ -51,6 +51,18 @@ impl Session {
return self.downloads.last().unwrap();
}
// Delete a download from the session
// Returns true if the download was deleted, false if it was not found
pub fn delete_download(&mut self, token: u32) -> bool {
if let Some(index) = self.downloads.iter().position(|d| d.token == token) {
self.downloads.remove(index);
true
} else {
tracing::warn!("Attempted to delete non-existent download token: {}", token);
false
}
}
// This function's failure is not a failure to transmit the message, but a failure to buffer it into the channel (or any preceding steps).
pub fn send_message(&mut self, message: OutgoingMessage) -> Result<(), anyhow::Error> {
if self.tx.is_none() {