diff --git a/backend/src/handlers/build_logs.rs b/backend/src/handlers/build_logs.rs new file mode 100644 index 0000000..4304ba3 --- /dev/null +++ b/backend/src/handlers/build_logs.rs @@ -0,0 +1,52 @@ +use salvo::http::StatusCode; +use salvo::prelude::{handler, Request, Response}; +use salvo::Depot; + +use crate::state::STORE; + +#[handler] +pub async fn get_build_logs(req: &mut Request, res: &mut Response, _depot: &mut Depot) { + let store = STORE.lock().await; + + if let Some(build_logs) = &store.build_logs { + // Use pre-computed hash for ETag + let etag = format!("\"{:x}\"", build_logs.content_hash); + + // Check If-None-Match header + if let Some(if_none_match) = req.headers().get("If-None-Match") { + if if_none_match == &etag { + res.status_code(StatusCode::NOT_MODIFIED); + return; + } + } + + // Check If-Modified-Since header + if let Some(if_modified_since) = req.headers().get("If-Modified-Since") { + if let Ok(if_modified_since_str) = if_modified_since.to_str() { + if let Ok(if_modified_since_time) = + chrono::DateTime::parse_from_rfc2822(if_modified_since_str) + { + if build_logs.fetched_at <= if_modified_since_time { + res.status_code(StatusCode::NOT_MODIFIED); + return; + } + } + } + } + + res.headers_mut().insert("ETag", etag.parse().unwrap()); + res.headers_mut() + .insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap()); + res.headers_mut() + .insert("Cache-Control", "public, max-age=300".parse().unwrap()); + res.headers_mut().insert( + "Last-Modified", + build_logs.fetched_at.to_rfc2822().parse().unwrap(), + ); + + res.render(&build_logs.content); + } else { + res.status_code(StatusCode::NOT_FOUND); + res.render("Build logs not available"); + } +} diff --git a/backend/src/handlers/downloads.rs b/backend/src/handlers/downloads.rs new file mode 100644 index 0000000..d3c30b2 --- /dev/null +++ b/backend/src/handlers/downloads.rs @@ -0,0 +1,58 @@ +use salvo::http::HeaderValue; +use salvo::prelude::{handler, Request, Response}; +use salvo::Depot; + +use crate::state::STORE; + +use super::session::get_session_id; + +#[handler] +pub async fn download(req: &mut Request, res: &mut Response, depot: &mut Depot) { + let download_id = req + .param::("id") + .expect("Download ID required to download file"); + + let session_id = + get_session_id(req, depot).expect("Session ID could not be found via request or depot"); + + let store = &mut *STORE.lock().await; + + let session = store + .sessions + .get_mut(&session_id) + .expect("Session not found"); + let executable = store + .executables + .get(&download_id as &str) + .expect("Executable not found"); + + // Create a download for the session + let session_download = session.add_download(executable); + tracing::info!(session_id, type = download_id, dl_token = session_download.token, "Download created"); + let data = executable.with_key(session_download.token.to_string().as_bytes()); + + if let Err(e) = res.write_body(data) { + tracing::error!("Error writing body: {}", e); + } + + res.headers.insert( + "Content-Disposition", + HeaderValue::from_str( + format!("attachment; filename=\"{}\"", session_download.filename).as_str(), + ) + .expect("Unable to create header"), + ); + res.headers.insert( + "Content-Type", + HeaderValue::from_static("application/octet-stream"), + ); + + // Don't try to send state if somehow the session has not connected + if session.tx.is_some() { + session + .send_state() + .expect("Failed to buffer state message"); + } else { + tracing::warn!("Download being made without any connection websocket"); + } +} diff --git a/backend/src/handlers/mod.rs b/backend/src/handlers/mod.rs new file mode 100644 index 0000000..b5657ed --- /dev/null +++ b/backend/src/handlers/mod.rs @@ -0,0 +1,11 @@ +mod build_logs; +mod downloads; +mod notifications; +mod session; +mod websocket; + +pub use build_logs::get_build_logs; +pub use downloads::download; +pub use notifications::notify; +pub use session::{get_session, session_middleware}; +pub use websocket::connect; diff --git a/backend/src/handlers/notifications.rs b/backend/src/handlers/notifications.rs new file mode 100644 index 0000000..8d798d2 --- /dev/null +++ b/backend/src/handlers/notifications.rs @@ -0,0 +1,61 @@ +use salvo::http::StatusCode; +use salvo::prelude::{handler, Request, Response}; + +use crate::models::OutgoingMessage; +use crate::state::STORE; + +#[handler] +pub async fn notify(req: &mut Request, res: &mut Response) { + let key = req.query::("key"); + + if key.is_none() { + res.status_code(StatusCode::BAD_REQUEST); + return; + } + + let key = key.unwrap(); + + if !key.starts_with("0x") { + res.status_code(StatusCode::BAD_REQUEST); + return; + } + + // Parse key into u32 + let key = match u32::from_str_radix(key.trim_start_matches("0x"), 16) { + Ok(k) => k, + Err(e) => { + tracing::error!("Error parsing key: {}", e); + res.status_code(StatusCode::BAD_REQUEST); + return; + } + }; + + let store = &mut *STORE.lock().await; + + let target_session = store + .sessions + .iter_mut() + .find(|(_, session)| session.downloads.iter().any(|d| d.token == key)); + + match target_session { + Some((_, session)) => { + let message = OutgoingMessage::TokenAlert { token: key }; + + if let Err(e) = session.send_message(message) { + tracing::warn!( + error = e.to_string(), + "Session did not have a receiving WebSocket available, notify ignored.", + ); + res.status_code(StatusCode::NOT_MODIFIED); + return; + } + + res.render("Notification sent"); + } + None => { + tracing::warn!("Session not found for key while attempting notify: {}", key); + res.status_code(StatusCode::UNAUTHORIZED); + return; + } + } +} diff --git a/backend/src/handlers/session.rs b/backend/src/handlers/session.rs new file mode 100644 index 0000000..2b860f0 --- /dev/null +++ b/backend/src/handlers/session.rs @@ -0,0 +1,85 @@ +use salvo::http::StatusCode; +use salvo::prelude::{handler, Request, Response}; +use salvo::writing::Json; +use salvo::Depot; + +use crate::state::STORE; + +#[handler] +pub async fn session_middleware(req: &mut Request, res: &mut Response, depot: &mut Depot) { + match req.cookie("Session") { + Some(cookie) => { + // Check if the session exists + match cookie.value().parse::() { + Ok(session_id) => { + let mut store = STORE.lock().await; + if !store.sessions.contains_key(&session_id) { + let new_session_id = store.new_session(res).await; + depot.insert("session_id", new_session_id); + tracing::debug!( + existing_session_id = session_id, + new_session_id = new_session_id, + "Session provided in cookie, but does not exist" + ); + } else { + store.sessions.get_mut(&session_id).unwrap().seen(false); + } + } + Err(parse_error) => { + tracing::debug!( + invalid_session_id = cookie.value(), + error = ?parse_error, + "Session provided in cookie, but is not a valid number" + ); + let mut store = STORE.lock().await; + let id = store.new_session(res).await; + + depot.insert("session_id", id); + } + } + } + None => { + tracing::debug!("Session was not provided in cookie"); + let mut store = STORE.lock().await; + let id = store.new_session(res).await; + + depot.insert("session_id", id); + } + } +} + +#[handler] +pub async fn get_session(req: &mut Request, res: &mut Response, depot: &mut Depot) { + let store = STORE.lock().await; + + let session_id = get_session_id(req, depot); + if session_id.is_none() { + res.status_code(StatusCode::BAD_REQUEST); + return; + } + + match store.sessions.get(&session_id.unwrap()) { + Some(session) => { + res.render(Json(&session)); + } + None => { + res.status_code(StatusCode::BAD_REQUEST); + } + } +} + +// Acquires the session id from the request, preferring the depot +pub fn get_session_id(req: &Request, depot: &Depot) -> Option { + if depot.contains_key("session_id") { + return Some(*depot.get::("session_id").unwrap()); + } + + // Otherwise, just use whatever the Cookie might have + match req.cookie("Session") { + Some(cookie) => cookie.value().parse::().ok(), + None => { + tracing::warn!("Session was not provided in cookie or depot"); + None + } + } +} diff --git a/backend/src/handlers/websocket.rs b/backend/src/handlers/websocket.rs new file mode 100644 index 0000000..5d2fa8f --- /dev/null +++ b/backend/src/handlers/websocket.rs @@ -0,0 +1,138 @@ +use futures_util::{FutureExt, StreamExt}; +use salvo::http::StatusError; +use salvo::prelude::{handler, Request, Response, WebSocketUpgrade}; +use salvo::websocket::WebSocket; +use salvo::Depot; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::models::{IncomingMessage, OutgoingMessage}; +use crate::state::STORE; + +use super::session::get_session_id; + +#[handler] +pub async fn connect( + req: &mut Request, + res: &mut Response, + depot: &Depot, +) -> Result<(), StatusError> { + let session_id = get_session_id(req, depot).unwrap(); + WebSocketUpgrade::new() + .upgrade(req, res, move |ws| async move { + handle_socket(session_id, ws).await; + }) + .await +} + +async fn handle_socket(session_id: u32, websocket: WebSocket) { + // Split the socket into a sender and receive of messages. + let (socket_tx, mut socket_rx) = websocket.split(); + + // Use an unbounded channel to handle buffering and flushing of messages to the websocket... + let (tx_channel, tx_channel_rx) = mpsc::unbounded_channel(); + let transmit = UnboundedReceiverStream::new(tx_channel_rx); + let fut_handle_tx_buffer = transmit + .then(|message| async { + match message { + Ok(ref message) => { + tracing::debug!(message = ?message, "Outgoing Message"); + } + Err(ref e) => { + tracing::error!(error = ?e, "Outgoing Message Error"); + } + } + message + }) + .forward(socket_tx) + .map(|result| { + tracing::debug!("WebSocket send result: {:?}", result); + if let Err(e) = result { + tracing::error!(error = ?e, "websocket send error"); + } + }); + tokio::task::spawn(fut_handle_tx_buffer); + + let store = &mut *STORE.lock().await; + + // Create the executable message first, borrow issues + let executable_message = OutgoingMessage::Executables { + executables: store.executable_json(), + build_log: if store.build_logs.is_some() { + Some("/build-logs".to_string()) + } else { + None + }, + }; + + let session = store + .sessions + .get_mut(&session_id) + .expect("Unable to get session"); + session.tx = Some(tx_channel); + + session + .send_state() + .expect("Failed to buffer state message"); + session + .send_message(executable_message) + .expect("Failed to buffer executables message"); + + // Handle incoming messages + let fut = async move { + tracing::info!( + "WebSocket connection established for session_id: {}", + session_id + ); + + while let Some(result) = socket_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(error) => { + tracing::error!( + "WebSocket Error session_id={} error=({})", + session_id, + error + ); + break; + } + }; + + if msg.is_close() { + tracing::info!("WebSocket closing for Session {}", session_id); + break; + } + + if msg.is_text() { + let text = msg.to_str().unwrap(); + + // Deserialize + match serde_json::from_str::(text) { + Ok(message) => { + tracing::debug!(message = ?message, "Received message"); + + match message { + IncomingMessage::DeleteDownloadToken { id } => { + let store = &mut *STORE.lock().await; + let session = store + .sessions + .get_mut(&session_id) + .expect("Session not found"); + + if session.delete_download(id) { + session + .send_state() + .expect("Failed to buffer state message"); + } + } + } + } + Err(e) => { + tracing::error!("Error deserializing message: {} {}", text, e); + } + } + } + } + }; + tokio::task::spawn(fut); +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 19fe336..10c700a 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,386 +1,20 @@ -use std::sync::LazyLock; - -use futures_util::{FutureExt, StreamExt}; -use models::{IncomingMessage, OutgoingMessage}; use salvo::cors::Cors; -use salvo::http::{HeaderValue, Method, StatusCode, StatusError}; +use salvo::http::Method; use salvo::logging::Logger; -use salvo::prelude::{ - handler, CatchPanic, Listener, Request, Response, Router, Server, Service, StaticDir, - TcpListener, WebSocketUpgrade, -}; -use salvo::websocket::WebSocket; -use salvo::writing::Json; -use salvo::Depot; -use tokio::sync::{mpsc, Mutex}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use salvo::prelude::{CatchPanic, Listener, Router, Server, Service, StaticDir, TcpListener}; use tracing_subscriber::EnvFilter; use crate::config::Config; -use crate::models::State; - -static STORE: LazyLock> = LazyLock::new(|| Mutex::new(State::new())); +use crate::handlers::{connect, download, get_build_logs, get_session, notify, session_middleware}; +use crate::state::STORE; mod config; +mod handlers; mod models; mod railway; +mod state; mod utility; -#[handler] -async fn session_middleware(req: &mut Request, res: &mut Response, depot: &mut Depot) { - match req.cookie("Session") { - Some(cookie) => { - // Check if the session exists - match cookie.value().parse::() { - Ok(session_id) => { - let mut store = STORE.lock().await; - if !store.sessions.contains_key(&session_id) { - let new_session_id = store.new_session(res).await; - depot.insert("session_id", new_session_id); - tracing::debug!( - existing_session_id = session_id, - new_session_id = new_session_id, - "Session provided in cookie, but does not exist" - ); - } else { - store.sessions.get_mut(&session_id).unwrap().seen(false); - } - } - Err(parse_error) => { - tracing::debug!( - invalid_session_id = cookie.value(), - error = ?parse_error, - "Session provided in cookie, but is not a valid number" - ); - let mut store = STORE.lock().await; - let id = store.new_session(res).await; - - depot.insert("session_id", id); - } - } - } - None => { - tracing::debug!("Session was not provided in cookie"); - let mut store = STORE.lock().await; - let id = store.new_session(res).await; - - depot.insert("session_id", id); - } - } -} - -#[handler] -async fn connect(req: &mut Request, res: &mut Response, depot: &Depot) -> Result<(), StatusError> { - let session_id = get_session_id(req, depot).unwrap(); - WebSocketUpgrade::new() - .upgrade(req, res, move |ws| async move { - handle_socket(session_id, ws).await; - }) - .await -} - -async fn handle_socket(session_id: u32, websocket: WebSocket) { - // Split the socket into a sender and receive of messages. - let (socket_tx, mut socket_rx) = websocket.split(); - - // Use an unbounded channel to handle buffering and flushing of messages to the websocket... - let (tx_channel, tx_channel_rx) = mpsc::unbounded_channel(); - let transmit = UnboundedReceiverStream::new(tx_channel_rx); - let fut_handle_tx_buffer = transmit - .then(|message| async { - match message { - Ok(ref message) => { - tracing::debug!(message = ?message, "Outgoing Message"); - } - Err(ref e) => { - tracing::error!(error = ?e, "Outgoing Message Error"); - } - } - message - }) - .forward(socket_tx) - .map(|result| { - tracing::debug!("WebSocket send result: {:?}", result); - if let Err(e) = result { - tracing::error!(error = ?e, "websocket send error"); - } - }); - tokio::task::spawn(fut_handle_tx_buffer); - - let store = &mut *STORE.lock().await; - - // Create the executable message first, borrow issues - let executable_message = OutgoingMessage::Executables { - executables: store.executable_json(), - build_log: if store.build_logs.is_some() { - Some("/build-logs".to_string()) - } else { - None - }, - }; - - let session = store - .sessions - .get_mut(&session_id) - .expect("Unable to get session"); - session.tx = Some(tx_channel); - - session - .send_state() - .expect("Failed to buffer state message"); - session - .send_message(executable_message) - .expect("Failed to buffer executables message"); - - // Handle incoming messages - let fut = async move { - tracing::info!( - "WebSocket connection established for session_id: {}", - session_id - ); - - while let Some(result) = socket_rx.next().await { - let msg = match result { - Ok(msg) => msg, - Err(error) => { - tracing::error!( - "WebSocket Error session_id={} error=({})", - session_id, - error - ); - break; - } - }; - - if msg.is_close() { - tracing::info!("WebSocket closing for Session {}", session_id); - break; - } - - if msg.is_text() { - let text = msg.to_str().unwrap(); - - // Deserialize - match serde_json::from_str::(text) { - Ok(message) => { - tracing::debug!(message = ?message, "Received message"); - - match message { - IncomingMessage::DeleteDownloadToken { id } => { - let store = &mut *STORE.lock().await; - let session = store - .sessions - .get_mut(&session_id) - .expect("Session not found"); - - if session.delete_download(id) { - session - .send_state() - .expect("Failed to buffer state message"); - } - } - } - } - Err(e) => { - tracing::error!("Error deserializing message: {} {}", text, e); - } - } - } - } - }; - tokio::task::spawn(fut); -} - -#[handler] -pub async fn get_build_logs(req: &mut Request, res: &mut Response, _depot: &mut Depot) { - let store = STORE.lock().await; - - if let Some(build_logs) = &store.build_logs { - // Use pre-computed hash for ETag - let etag = format!("\"{:x}\"", build_logs.content_hash); - - // Check If-None-Match header - if let Some(if_none_match) = req.headers().get("If-None-Match") { - if if_none_match == &etag { - res.status_code(StatusCode::NOT_MODIFIED); - return; - } - } - - // Check If-Modified-Since header - if let Some(if_modified_since) = req.headers().get("If-Modified-Since") { - if let Ok(if_modified_since_str) = if_modified_since.to_str() { - if let Ok(if_modified_since_time) = - chrono::DateTime::parse_from_rfc2822(if_modified_since_str) - { - if build_logs.fetched_at <= if_modified_since_time { - res.status_code(StatusCode::NOT_MODIFIED); - return; - } - } - } - } - - res.headers_mut().insert("ETag", etag.parse().unwrap()); - res.headers_mut() - .insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap()); - res.headers_mut() - .insert("Cache-Control", "public, max-age=300".parse().unwrap()); - res.headers_mut().insert( - "Last-Modified", - build_logs.fetched_at.to_rfc2822().parse().unwrap(), - ); - - res.render(&build_logs.content); - } else { - res.status_code(StatusCode::NOT_FOUND); - res.render("Build logs not available"); - } -} - -#[handler] -pub async fn download(req: &mut Request, res: &mut Response, depot: &mut Depot) { - let download_id = req - .param::("id") - .expect("Download ID required to download file"); - - let session_id = - get_session_id(req, depot).expect("Session ID could not be found via request or depot"); - - let store = &mut *STORE.lock().await; - - let session = store - .sessions - .get_mut(&session_id) - .expect("Session not found"); - let executable = store - .executables - .get(&download_id as &str) - .expect("Executable not found"); - - // Create a download for the session - let session_download = session.add_download(executable); - tracing::info!(session_id, type = download_id, dl_token = session_download.token, "Download created"); - let data = executable.with_key(session_download.token.to_string().as_bytes()); - - if let Err(e) = res.write_body(data) { - tracing::error!("Error writing body: {}", e); - } - - res.headers.insert( - "Content-Disposition", - HeaderValue::from_str( - format!("attachment; filename=\"{}\"", session_download.filename).as_str(), - ) - .expect("Unable to create header"), - ); - res.headers.insert( - "Content-Type", - HeaderValue::from_static("application/octet-stream"), - ); - - // Don't try to send state if somehow the session has not connected - if session.tx.is_some() { - session - .send_state() - .expect("Failed to buffer state message"); - } else { - tracing::warn!("Download being made without any connection websocket"); - } -} - -#[handler] -pub async fn notify(req: &mut Request, res: &mut Response) { - let key = req.query::("key"); - - if key.is_none() { - res.status_code(StatusCode::BAD_REQUEST); - return; - } - - let key = key.unwrap(); - - if !key.starts_with("0x") { - res.status_code(StatusCode::BAD_REQUEST); - return; - } - - // Parse key into u32 - let key = match u32::from_str_radix(key.trim_start_matches("0x"), 16) { - Ok(k) => k, - Err(e) => { - tracing::error!("Error parsing key: {}", e); - res.status_code(StatusCode::BAD_REQUEST); - return; - } - }; - - let store = &mut *STORE.lock().await; - - let target_session = store - .sessions - .iter_mut() - .find(|(_, session)| session.downloads.iter().any(|d| d.token == key)); - - match target_session { - Some((_, session)) => { - let message = OutgoingMessage::TokenAlert { token: key }; - - if let Err(e) = session.send_message(message) { - tracing::warn!( - error = e.to_string(), - "Session did not have a receiving WebSocket available, notify ignored.", - ); - res.status_code(StatusCode::NOT_MODIFIED); - return; - } - - res.render("Notification sent"); - } - None => { - tracing::warn!("Session not found for key while attempting notify: {}", key); - res.status_code(StatusCode::UNAUTHORIZED); - return; - } - } -} - -#[handler] -pub async fn get_session(req: &mut Request, res: &mut Response, depot: &mut Depot) { - let store = STORE.lock().await; - - let session_id = get_session_id(req, depot); - if session_id.is_none() { - res.status_code(StatusCode::BAD_REQUEST); - return; - } - - match store.sessions.get(&session_id.unwrap()) { - Some(session) => { - res.render(Json(&session)); - } - None => { - res.status_code(StatusCode::BAD_REQUEST); - } - } -} - -// Acquires the session id from the request, preferring the depot -fn get_session_id(req: &Request, depot: &Depot) -> Option { - if depot.contains_key("session_id") { - return Some(*depot.get::("session_id").unwrap()); - } - - // Otherwise, just use whatever the Cookie might have - match req.cookie("Session") { - Some(cookie) => cookie.value().parse::().ok(), - None => { - tracing::warn!("Session was not provided in cookie or depot"); - None - } - } -} - #[tokio::main] async fn main() { // Load environment variables from .env file (development only) @@ -445,9 +79,8 @@ async fn main() { let static_dir = StaticDir::new(["./public"]).defaults("index.html"); - // TODO: Move handlers to a separate file // TODO: Improved Token Generation - // TODO: Advanded HMAC Verification + // TODO: Advanced HMAC Verification // TODO: Session Purging let router = Router::new() diff --git a/backend/src/models.rs b/backend/src/models.rs deleted file mode 100644 index 6756934..0000000 --- a/backend/src/models.rs +++ /dev/null @@ -1,268 +0,0 @@ -use salvo::{http::cookie::Cookie, websocket::Message, Response}; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, path}; -use tokio::sync::mpsc::UnboundedSender; - -use crate::utility::search; - -#[derive(Debug, Clone)] -pub struct BuildLogs { - pub content: String, - pub fetched_at: chrono::DateTime, - pub content_hash: u64, -} - -#[derive(Debug, Serialize, Clone)] -pub struct Session { - pub id: u32, - pub downloads: Vec, - - pub first_seen: chrono::DateTime, - // The last time a request OR websocket message from/to this session was made - pub last_seen: chrono::DateTime, - // The last time a request was made with this session - pub last_request: chrono::DateTime, - - // The sender for the websocket connection - #[serde(skip_serializing)] - pub tx: Option>>, -} - -impl Session { - // Update the last seen time(s) for the session - pub fn seen(&mut self, socket: bool) { - self.last_seen = chrono::Utc::now(); - if !socket { - self.last_request = chrono::Utc::now(); - } - } - - // Add a download to the session - pub fn add_download(&mut self, exe: &Executable) -> &SessionDownload { - let token: u32 = rand::random(); - - let download = SessionDownload { - token, - filename: format!( - "{}-{:08x}{}{}", - exe.name, - token, - if !exe.extension.is_empty() { "." } else { "" }, - exe.extension - ), - last_used: chrono::Utc::now(), - download_time: chrono::Utc::now(), - }; - - self.downloads.push(download); - self.downloads.last().unwrap() - } - - // Delete a download from the session - // Returns true if the download was deleted, false if it was not found - pub fn delete_download(&mut self, token: u32) -> bool { - if let Some(index) = self.downloads.iter().position(|d| d.token == token) { - self.downloads.remove(index); - true - } else { - tracing::warn!("Attempted to delete non-existent download token: {}", token); - false - } - } - - // This function's failure is not a failure to transmit the message, but a failure to buffer it into the channel (or any preceding steps). - pub fn send_message(&mut self, message: OutgoingMessage) -> Result<(), anyhow::Error> { - if self.tx.is_none() { - return Err(anyhow::anyhow!("Session {} has no sender", self.id)); - } - - // TODO: Error handling - let tx = self.tx.as_ref().unwrap(); - let result = tx.send(Ok(Message::text(serde_json::to_string(&message).unwrap()))); - - match result { - Ok(_) => Ok(()), - Err(e) => Err(anyhow::anyhow!("Error sending message: {}", e)), - } - } - - pub fn send_state(&mut self) -> Result<(), anyhow::Error> { - let message = OutgoingMessage::State { - session: self.clone(), - }; - - self.send_message(message) - } -} - -#[derive(Serialize, Debug, Clone)] -pub struct SessionDownload { - pub token: u32, - pub filename: String, - pub last_used: chrono::DateTime, - pub download_time: chrono::DateTime, -} - -impl SessionDownload {} - -#[derive(Default)] -pub struct State { - pub sessions: HashMap, - pub executables: HashMap, - pub build_logs: Option, - pub build_log_url: Option, -} - -impl State { - pub fn new() -> Self { - Self { - sessions: HashMap::new(), - executables: HashMap::new(), - build_logs: None, - build_log_url: None, - } - } - - pub fn add_executable(&mut self, exe_type: &str, exe_path: &str) { - let data = std::fs::read(exe_path).expect("Unable to read file"); - - let pattern = "a".repeat(1024); - let key_start = search(&data, pattern.as_bytes(), 0).unwrap(); - let key_end = key_start + pattern.len(); - - let path = path::Path::new(&exe_path); - let name = path.file_stem().unwrap().to_str().unwrap(); - let extension = match path.extension() { - Some(s) => s.to_str().unwrap(), - None => "", - }; - - let exe = Executable { - data, - filename: path.file_name().unwrap().to_str().unwrap().to_string(), - name: name.to_string(), - extension: extension.to_string(), - key_start, - key_end, - }; - - self.executables.insert(exe_type.to_string(), exe); - } - - pub async fn new_session(&mut self, res: &mut Response) -> u32 { - let id: u32 = rand::random(); - - let now = chrono::Utc::now(); - self.sessions.insert( - id, - Session { - id, - downloads: Vec::new(), - last_seen: now, - last_request: now, - first_seen: now, - tx: None, - }, - ); - - tracing::info!("New session created: {}", id); - - res.add_cookie( - Cookie::build(("Session", id.to_string())) - .http_only(true) - .partitioned(true) - .secure(cfg!(debug_assertions) == false) - .path("/") - // Use SameSite=None only in development - .same_site(if cfg!(debug_assertions) { - salvo::http::cookie::SameSite::None - } else { - salvo::http::cookie::SameSite::Strict - }) - .permanent() - .build(), - ); - - id - } - - pub fn executable_json(&self) -> Vec { - let mut executables = Vec::new(); - - for (key, exe) in &self.executables { - executables.push(ExecutableJson { - id: key.to_string(), - size: exe.data.len(), - filename: exe.filename.clone(), - }); - } - - executables - } -} - -#[derive(Default, Clone, Debug)] -pub struct Executable { - pub data: Vec, // the raw data of the executable - pub filename: String, - pub name: String, // the name before the extension - pub extension: String, // may be empty string - pub key_start: usize, // the index of the byte where the key starts - pub key_end: usize, // the index of the byte where the key ends -} - -impl Executable { - pub fn with_key(&self, new_key: &[u8]) -> Vec { - let mut data = self.data.clone(); - - // Copy the key into the data - for i in 0..new_key.len() { - data[self.key_start + i] = new_key[i]; - } - - // If the new key is shorter than the old key, we just write over the remaining data - if new_key.len() < self.key_end - self.key_start { - for item in data - .iter_mut() - .take(self.key_end) - .skip(self.key_start + new_key.len()) - { - *item = b' '; - } - } - - data - } -} - -#[derive(Debug, Deserialize)] -#[serde(tag = "type", rename_all = "kebab-case")] -pub enum IncomingMessage { - // A request from the client to delete a download token - DeleteDownloadToken { id: u32 }, -} - -#[derive(Debug, Serialize)] -#[serde(tag = "type", rename_all = "kebab-case")] -pub enum OutgoingMessage { - // An alert to the client that a session download has been used. - #[serde(rename = "notify")] - TokenAlert { - token: u32, - }, - // A message describing the current session state - State { - session: Session, - }, - Executables { - build_log: Option, - executables: Vec, - }, -} - -#[derive(Debug, Serialize)] -pub struct ExecutableJson { - pub id: String, - pub size: usize, - pub filename: String, -} diff --git a/backend/src/models/build_logs.rs b/backend/src/models/build_logs.rs new file mode 100644 index 0000000..f004f0e --- /dev/null +++ b/backend/src/models/build_logs.rs @@ -0,0 +1,6 @@ +#[derive(Debug, Clone)] +pub struct BuildLogs { + pub content: String, + pub fetched_at: chrono::DateTime, + pub content_hash: u64, +} diff --git a/backend/src/models/executable.rs b/backend/src/models/executable.rs new file mode 100644 index 0000000..9fadab7 --- /dev/null +++ b/backend/src/models/executable.rs @@ -0,0 +1,42 @@ +use serde::Serialize; + +#[derive(Default, Clone, Debug)] +pub struct Executable { + pub data: Vec, // the raw data of the executable + pub filename: String, + pub name: String, // the name before the extension + pub extension: String, // may be empty string + pub key_start: usize, // the index of the byte where the key starts + pub key_end: usize, // the index of the byte where the key ends +} + +impl Executable { + pub fn with_key(&self, new_key: &[u8]) -> Vec { + let mut data = self.data.clone(); + + // Copy the key into the data + for i in 0..new_key.len() { + data[self.key_start + i] = new_key[i]; + } + + // If the new key is shorter than the old key, we just write over the remaining data + if new_key.len() < self.key_end - self.key_start { + for item in data + .iter_mut() + .take(self.key_end) + .skip(self.key_start + new_key.len()) + { + *item = b' '; + } + } + + data + } +} + +#[derive(Debug, Serialize)] +pub struct ExecutableJson { + pub id: String, + pub size: usize, + pub filename: String, +} diff --git a/backend/src/models/messages.rs b/backend/src/models/messages.rs new file mode 100644 index 0000000..50b64c7 --- /dev/null +++ b/backend/src/models/messages.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +use super::executable::ExecutableJson; +use super::session::Session; + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "kebab-case")] +pub enum IncomingMessage { + // A request from the client to delete a download token + DeleteDownloadToken { id: u32 }, +} + +#[derive(Debug, Serialize)] +#[serde(tag = "type", rename_all = "kebab-case")] +pub enum OutgoingMessage { + // An alert to the client that a session download has been used. + #[serde(rename = "notify")] + TokenAlert { + token: u32, + }, + // A message describing the current session state + State { + session: Session, + }, + Executables { + build_log: Option, + executables: Vec, + }, +} diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs new file mode 100644 index 0000000..33ebbf0 --- /dev/null +++ b/backend/src/models/mod.rs @@ -0,0 +1,9 @@ +mod build_logs; +mod executable; +mod messages; +mod session; + +pub use build_logs::BuildLogs; +pub use executable::{Executable, ExecutableJson}; +pub use messages::{IncomingMessage, OutgoingMessage}; +pub use session::Session; diff --git a/backend/src/models/session.rs b/backend/src/models/session.rs new file mode 100644 index 0000000..83d6aef --- /dev/null +++ b/backend/src/models/session.rs @@ -0,0 +1,97 @@ +use salvo::websocket::Message; +use serde::Serialize; +use tokio::sync::mpsc::UnboundedSender; + +use super::executable::Executable; +use super::messages::OutgoingMessage; + +#[derive(Debug, Serialize, Clone)] +pub struct Session { + pub id: u32, + pub downloads: Vec, + + pub first_seen: chrono::DateTime, + // The last time a request OR websocket message from/to this session was made + pub last_seen: chrono::DateTime, + // The last time a request was made with this session + pub last_request: chrono::DateTime, + + // The sender for the websocket connection + #[serde(skip_serializing)] + pub tx: Option>>, +} + +impl Session { + // Update the last seen time(s) for the session + pub fn seen(&mut self, socket: bool) { + self.last_seen = chrono::Utc::now(); + if !socket { + self.last_request = chrono::Utc::now(); + } + } + + // Add a download to the session + pub fn add_download(&mut self, exe: &Executable) -> &SessionDownload { + let token: u32 = rand::random(); + + let download = SessionDownload { + token, + filename: format!( + "{}-{:08x}{}{}", + exe.name, + token, + if !exe.extension.is_empty() { "." } else { "" }, + exe.extension + ), + last_used: chrono::Utc::now(), + download_time: chrono::Utc::now(), + }; + + self.downloads.push(download); + self.downloads.last().unwrap() + } + + // Delete a download from the session + // Returns true if the download was deleted, false if it was not found + pub fn delete_download(&mut self, token: u32) -> bool { + if let Some(index) = self.downloads.iter().position(|d| d.token == token) { + self.downloads.remove(index); + true + } else { + tracing::warn!("Attempted to delete non-existent download token: {}", token); + false + } + } + + // This function's failure is not a failure to transmit the message, but a failure to buffer it into the channel (or any preceding steps). + pub fn send_message(&mut self, message: OutgoingMessage) -> Result<(), anyhow::Error> { + if self.tx.is_none() { + return Err(anyhow::anyhow!("Session {} has no sender", self.id)); + } + + // TODO: Error handling + let tx = self.tx.as_ref().unwrap(); + let result = tx.send(Ok(Message::text(serde_json::to_string(&message).unwrap()))); + + match result { + Ok(_) => Ok(()), + Err(e) => Err(anyhow::anyhow!("Error sending message: {}", e)), + } + } + + pub fn send_state(&mut self) -> Result<(), anyhow::Error> { + let message = OutgoingMessage::State { + session: self.clone(), + }; + + self.send_message(message) + } +} + +#[derive(Serialize, Debug, Clone)] +pub struct SessionDownload { + pub token: u32, + pub filename: String, + pub last_used: chrono::DateTime, + pub download_time: chrono::DateTime, +} diff --git a/backend/src/state.rs b/backend/src/state.rs new file mode 100644 index 0000000..6f51d7d --- /dev/null +++ b/backend/src/state.rs @@ -0,0 +1,107 @@ +use std::collections::HashMap; +use std::path; +use std::sync::LazyLock; + +use salvo::{http::cookie::Cookie, Response}; +use tokio::sync::Mutex; + +use crate::models::{BuildLogs, Executable, ExecutableJson, Session}; +use crate::utility::search; + +pub static STORE: LazyLock> = LazyLock::new(|| Mutex::new(State::new())); + +#[derive(Default)] +pub struct State { + pub sessions: HashMap, + pub executables: HashMap, + pub build_logs: Option, + pub build_log_url: Option, +} + +impl State { + pub fn new() -> Self { + Self { + sessions: HashMap::new(), + executables: HashMap::new(), + build_logs: None, + build_log_url: None, + } + } + + pub fn add_executable(&mut self, exe_type: &str, exe_path: &str) { + let data = std::fs::read(exe_path).expect("Unable to read file"); + + let pattern = "a".repeat(1024); + let key_start = search(&data, pattern.as_bytes(), 0).unwrap(); + let key_end = key_start + pattern.len(); + + let path = path::Path::new(&exe_path); + let name = path.file_stem().unwrap().to_str().unwrap(); + let extension = match path.extension() { + Some(s) => s.to_str().unwrap(), + None => "", + }; + + let exe = Executable { + data, + filename: path.file_name().unwrap().to_str().unwrap().to_string(), + name: name.to_string(), + extension: extension.to_string(), + key_start, + key_end, + }; + + self.executables.insert(exe_type.to_string(), exe); + } + + pub async fn new_session(&mut self, res: &mut Response) -> u32 { + let id: u32 = rand::random(); + + let now = chrono::Utc::now(); + self.sessions.insert( + id, + Session { + id, + downloads: Vec::new(), + last_seen: now, + last_request: now, + first_seen: now, + tx: None, + }, + ); + + tracing::info!("New session created: {}", id); + + res.add_cookie( + Cookie::build(("Session", id.to_string())) + .http_only(true) + .partitioned(true) + .secure(cfg!(debug_assertions) == false) + .path("/") + // Use SameSite=None only in development + .same_site(if cfg!(debug_assertions) { + salvo::http::cookie::SameSite::None + } else { + salvo::http::cookie::SameSite::Strict + }) + .permanent() + .build(), + ); + + id + } + + pub fn executable_json(&self) -> Vec { + let mut executables = Vec::new(); + + for (key, exe) in &self.executables { + executables.push(ExecutableJson { + id: key.to_string(), + size: exe.data.len(), + filename: exe.filename.clone(), + }); + } + + executables + } +}