mirror of
https://github.com/Xevion/dynamic-preauth.git
synced 2025-12-17 16:11:45 -06:00
refactor: convert to Cargo workspace structure
This commit is contained in:
25
backend/Cargo.toml
Normal file
25
backend/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "dynamic-preauth"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[[bin]]
|
||||
name = "dynamic-preauth"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
dotenvy.workspace = true
|
||||
envy.workspace = true
|
||||
futures-util.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["json", "rustls-tls"] }
|
||||
salvo.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
70
backend/src/config.rs
Normal file
70
backend/src/config.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
use serde::Deserialize;
|
||||
|
||||
fn default_port() -> u16 {
|
||||
5800
|
||||
}
|
||||
|
||||
/// Railway-specific configuration parsed from environment variables.
|
||||
#[derive(Deserialize, Debug, Default)]
|
||||
pub struct RailwayConfig {
|
||||
pub railway_token: Option<String>,
|
||||
pub railway_project_id: Option<String>,
|
||||
pub railway_service_id: Option<String>,
|
||||
pub railway_environment_id: Option<String>,
|
||||
pub railway_deployment_id: Option<String>,
|
||||
pub railway_public_domain: Option<String>,
|
||||
}
|
||||
|
||||
impl RailwayConfig {
|
||||
/// Returns true if running on Railway (project ID is set).
|
||||
pub fn is_railway(&self) -> bool {
|
||||
self.railway_project_id.is_some()
|
||||
}
|
||||
|
||||
/// Returns true if Railway API token is configured.
|
||||
pub fn has_token(&self) -> bool {
|
||||
self.railway_token.is_some()
|
||||
}
|
||||
|
||||
/// Build the Railway dashboard URL for viewing build logs.
|
||||
pub fn build_logs_url(&self) -> Option<String> {
|
||||
let project_id = self.railway_project_id.as_ref()?;
|
||||
let service_id = self.railway_service_id.as_ref()?;
|
||||
let environment_id = self.railway_environment_id.as_ref()?;
|
||||
let deployment_id = self.railway_deployment_id.as_deref().unwrap_or("latest");
|
||||
|
||||
Some(format!(
|
||||
"https://railway.com/project/{}/service/{}?environmentId={}&id={}#build",
|
||||
project_id, service_id, environment_id, deployment_id
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the CORS origin based on public domain.
|
||||
pub fn cors_origin(&self) -> String {
|
||||
if cfg!(debug_assertions) {
|
||||
return "*".to_string();
|
||||
}
|
||||
|
||||
match &self.railway_public_domain {
|
||||
Some(domain) => format!("https://{}", domain),
|
||||
None => "*".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main configuration struct parsed from environment variables.
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct Config {
|
||||
#[serde(default = "default_port")]
|
||||
pub port: u16,
|
||||
|
||||
#[serde(flatten)]
|
||||
pub railway: RailwayConfig,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Returns the socket address to bind to.
|
||||
pub fn bind_addr(&self) -> String {
|
||||
format!("0.0.0.0:{}", self.port)
|
||||
}
|
||||
}
|
||||
474
backend/src/main.rs
Normal file
474
backend/src/main.rs
Normal file
@@ -0,0 +1,474 @@
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use models::{IncomingMessage, OutgoingMessage};
|
||||
use salvo::cors::Cors;
|
||||
use salvo::http::{HeaderValue, Method, StatusCode, StatusError};
|
||||
use salvo::logging::Logger;
|
||||
use salvo::prelude::{
|
||||
handler, CatchPanic, Listener, Request, Response, Router, Server, Service, StaticDir,
|
||||
TcpListener, WebSocketUpgrade,
|
||||
};
|
||||
use salvo::websocket::WebSocket;
|
||||
use salvo::writing::Json;
|
||||
use salvo::Depot;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::models::State;
|
||||
|
||||
static STORE: LazyLock<Mutex<State>> = LazyLock::new(|| Mutex::new(State::new()));
|
||||
|
||||
mod config;
|
||||
mod models;
|
||||
mod railway;
|
||||
mod utility;
|
||||
|
||||
#[handler]
|
||||
async fn session_middleware(req: &mut Request, res: &mut Response, depot: &mut Depot) {
|
||||
match req.cookie("Session") {
|
||||
Some(cookie) => {
|
||||
// Check if the session exists
|
||||
match cookie.value().parse::<u32>() {
|
||||
Ok(session_id) => {
|
||||
let mut store = STORE.lock().await;
|
||||
if !store.sessions.contains_key(&session_id) {
|
||||
let new_session_id = store.new_session(res).await;
|
||||
depot.insert("session_id", new_session_id);
|
||||
tracing::debug!(
|
||||
existing_session_id = session_id,
|
||||
new_session_id = new_session_id,
|
||||
"Session provided in cookie, but does not exist"
|
||||
);
|
||||
} else {
|
||||
store.sessions.get_mut(&session_id).unwrap().seen(false);
|
||||
}
|
||||
}
|
||||
Err(parse_error) => {
|
||||
tracing::debug!(
|
||||
invalid_session_id = cookie.value(),
|
||||
error = ?parse_error,
|
||||
"Session provided in cookie, but is not a valid number"
|
||||
);
|
||||
let mut store = STORE.lock().await;
|
||||
let id = store.new_session(res).await;
|
||||
|
||||
depot.insert("session_id", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::debug!("Session was not provided in cookie");
|
||||
let mut store = STORE.lock().await;
|
||||
let id = store.new_session(res).await;
|
||||
|
||||
depot.insert("session_id", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[handler]
|
||||
async fn connect(req: &mut Request, res: &mut Response, depot: &Depot) -> Result<(), StatusError> {
|
||||
let session_id = get_session_id(req, depot).unwrap();
|
||||
WebSocketUpgrade::new()
|
||||
.upgrade(req, res, move |ws| async move {
|
||||
handle_socket(session_id, ws).await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_socket(session_id: u32, websocket: WebSocket) {
|
||||
// Split the socket into a sender and receive of messages.
|
||||
let (socket_tx, mut socket_rx) = websocket.split();
|
||||
|
||||
// Use an unbounded channel to handle buffering and flushing of messages to the websocket...
|
||||
let (tx_channel, tx_channel_rx) = mpsc::unbounded_channel();
|
||||
let transmit = UnboundedReceiverStream::new(tx_channel_rx);
|
||||
let fut_handle_tx_buffer = transmit
|
||||
.then(|message| async {
|
||||
match message {
|
||||
Ok(ref message) => {
|
||||
tracing::debug!(message = ?message, "Outgoing Message");
|
||||
}
|
||||
Err(ref e) => {
|
||||
tracing::error!(error = ?e, "Outgoing Message Error");
|
||||
}
|
||||
}
|
||||
message
|
||||
})
|
||||
.forward(socket_tx)
|
||||
.map(|result| {
|
||||
tracing::debug!("WebSocket send result: {:?}", result);
|
||||
if let Err(e) = result {
|
||||
tracing::error!(error = ?e, "websocket send error");
|
||||
}
|
||||
});
|
||||
tokio::task::spawn(fut_handle_tx_buffer);
|
||||
|
||||
let store = &mut *STORE.lock().await;
|
||||
|
||||
// Create the executable message first, borrow issues
|
||||
let executable_message = OutgoingMessage::Executables {
|
||||
executables: store.executable_json(),
|
||||
build_log: if store.build_logs.is_some() {
|
||||
Some("/build-logs".to_string())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
};
|
||||
|
||||
let session = store
|
||||
.sessions
|
||||
.get_mut(&session_id)
|
||||
.expect("Unable to get session");
|
||||
session.tx = Some(tx_channel);
|
||||
|
||||
session
|
||||
.send_state()
|
||||
.expect("Failed to buffer state message");
|
||||
session
|
||||
.send_message(executable_message)
|
||||
.expect("Failed to buffer executables message");
|
||||
|
||||
// Handle incoming messages
|
||||
let fut = async move {
|
||||
tracing::info!(
|
||||
"WebSocket connection established for session_id: {}",
|
||||
session_id
|
||||
);
|
||||
|
||||
while let Some(result) = socket_rx.next().await {
|
||||
let msg = match result {
|
||||
Ok(msg) => msg,
|
||||
Err(error) => {
|
||||
tracing::error!(
|
||||
"WebSocket Error session_id={} error=({})",
|
||||
session_id,
|
||||
error
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if msg.is_close() {
|
||||
tracing::info!("WebSocket closing for Session {}", session_id);
|
||||
break;
|
||||
}
|
||||
|
||||
if msg.is_text() {
|
||||
let text = msg.to_str().unwrap();
|
||||
|
||||
// Deserialize
|
||||
match serde_json::from_str::<IncomingMessage>(text) {
|
||||
Ok(message) => {
|
||||
tracing::debug!(message = ?message, "Received message");
|
||||
|
||||
match message {
|
||||
IncomingMessage::DeleteDownloadToken { id } => {
|
||||
let store = &mut *STORE.lock().await;
|
||||
let session = store
|
||||
.sessions
|
||||
.get_mut(&session_id)
|
||||
.expect("Session not found");
|
||||
|
||||
if session.delete_download(id) {
|
||||
session
|
||||
.send_state()
|
||||
.expect("Failed to buffer state message");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error deserializing message: {} {}", text, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
tokio::task::spawn(fut);
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn get_build_logs(req: &mut Request, res: &mut Response, _depot: &mut Depot) {
|
||||
let store = STORE.lock().await;
|
||||
|
||||
if let Some(build_logs) = &store.build_logs {
|
||||
// Use pre-computed hash for ETag
|
||||
let etag = format!("\"{:x}\"", build_logs.content_hash);
|
||||
|
||||
// Check If-None-Match header
|
||||
if let Some(if_none_match) = req.headers().get("If-None-Match") {
|
||||
if if_none_match == &etag {
|
||||
res.status_code(StatusCode::NOT_MODIFIED);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Check If-Modified-Since header
|
||||
if let Some(if_modified_since) = req.headers().get("If-Modified-Since") {
|
||||
if let Ok(if_modified_since_str) = if_modified_since.to_str() {
|
||||
if let Ok(if_modified_since_time) =
|
||||
chrono::DateTime::parse_from_rfc2822(if_modified_since_str)
|
||||
{
|
||||
if build_logs.fetched_at <= if_modified_since_time {
|
||||
res.status_code(StatusCode::NOT_MODIFIED);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res.headers_mut().insert("ETag", etag.parse().unwrap());
|
||||
res.headers_mut()
|
||||
.insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap());
|
||||
res.headers_mut()
|
||||
.insert("Cache-Control", "public, max-age=300".parse().unwrap());
|
||||
res.headers_mut().insert(
|
||||
"Last-Modified",
|
||||
build_logs.fetched_at.to_rfc2822().parse().unwrap(),
|
||||
);
|
||||
|
||||
res.render(&build_logs.content);
|
||||
} else {
|
||||
res.status_code(StatusCode::NOT_FOUND);
|
||||
res.render("Build logs not available");
|
||||
}
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn download(req: &mut Request, res: &mut Response, depot: &mut Depot) {
|
||||
let download_id = req
|
||||
.param::<String>("id")
|
||||
.expect("Download ID required to download file");
|
||||
|
||||
let session_id =
|
||||
get_session_id(req, depot).expect("Session ID could not be found via request or depot");
|
||||
|
||||
let store = &mut *STORE.lock().await;
|
||||
|
||||
let session = store
|
||||
.sessions
|
||||
.get_mut(&session_id)
|
||||
.expect("Session not found");
|
||||
let executable = store
|
||||
.executables
|
||||
.get(&download_id as &str)
|
||||
.expect("Executable not found");
|
||||
|
||||
// Create a download for the session
|
||||
let session_download = session.add_download(executable);
|
||||
tracing::info!(session_id, type = download_id, dl_token = session_download.token, "Download created");
|
||||
let data = executable.with_key(session_download.token.to_string().as_bytes());
|
||||
|
||||
if let Err(e) = res.write_body(data) {
|
||||
tracing::error!("Error writing body: {}", e);
|
||||
}
|
||||
|
||||
res.headers.insert(
|
||||
"Content-Disposition",
|
||||
HeaderValue::from_str(
|
||||
format!("attachment; filename=\"{}\"", session_download.filename).as_str(),
|
||||
)
|
||||
.expect("Unable to create header"),
|
||||
);
|
||||
res.headers.insert(
|
||||
"Content-Type",
|
||||
HeaderValue::from_static("application/octet-stream"),
|
||||
);
|
||||
|
||||
// Don't try to send state if somehow the session has not connected
|
||||
if session.tx.is_some() {
|
||||
session
|
||||
.send_state()
|
||||
.expect("Failed to buffer state message");
|
||||
} else {
|
||||
tracing::warn!("Download being made without any connection websocket");
|
||||
}
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn notify(req: &mut Request, res: &mut Response) {
|
||||
let key = req.query::<String>("key");
|
||||
|
||||
if key.is_none() {
|
||||
res.status_code(StatusCode::BAD_REQUEST);
|
||||
return;
|
||||
}
|
||||
|
||||
let key = key.unwrap();
|
||||
|
||||
if !key.starts_with("0x") {
|
||||
res.status_code(StatusCode::BAD_REQUEST);
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse key into u32
|
||||
let key = match u32::from_str_radix(key.trim_start_matches("0x"), 16) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
tracing::error!("Error parsing key: {}", e);
|
||||
res.status_code(StatusCode::BAD_REQUEST);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let store = &mut *STORE.lock().await;
|
||||
|
||||
let target_session = store
|
||||
.sessions
|
||||
.iter_mut()
|
||||
.find(|(_, session)| session.downloads.iter().any(|d| d.token == key));
|
||||
|
||||
match target_session {
|
||||
Some((_, session)) => {
|
||||
let message = OutgoingMessage::TokenAlert { token: key };
|
||||
|
||||
if let Err(e) = session.send_message(message) {
|
||||
tracing::warn!(
|
||||
error = e.to_string(),
|
||||
"Session did not have a receiving WebSocket available, notify ignored.",
|
||||
);
|
||||
res.status_code(StatusCode::NOT_MODIFIED);
|
||||
return;
|
||||
}
|
||||
|
||||
res.render("Notification sent");
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("Session not found for key while attempting notify: {}", key);
|
||||
res.status_code(StatusCode::UNAUTHORIZED);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn get_session(req: &mut Request, res: &mut Response, depot: &mut Depot) {
|
||||
let store = STORE.lock().await;
|
||||
|
||||
let session_id = get_session_id(req, depot);
|
||||
if session_id.is_none() {
|
||||
res.status_code(StatusCode::BAD_REQUEST);
|
||||
return;
|
||||
}
|
||||
|
||||
match store.sessions.get(&session_id.unwrap()) {
|
||||
Some(session) => {
|
||||
res.render(Json(&session));
|
||||
}
|
||||
None => {
|
||||
res.status_code(StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Acquires the session id from the request, preferring the depot
|
||||
fn get_session_id(req: &Request, depot: &Depot) -> Option<u32> {
|
||||
if depot.contains_key("session_id") {
|
||||
return Some(*depot.get::<u32>("session_id").unwrap());
|
||||
}
|
||||
|
||||
// Otherwise, just use whatever the Cookie might have
|
||||
match req.cookie("Session") {
|
||||
Some(cookie) => cookie.value().parse::<u32>().ok(),
|
||||
None => {
|
||||
tracing::warn!("Session was not provided in cookie or depot");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Load environment variables from .env file (development only)
|
||||
#[cfg(debug_assertions)]
|
||||
dotenvy::dotenv().ok();
|
||||
|
||||
// Parse configuration from environment
|
||||
let config: Config = envy::from_env().expect("Failed to parse environment configuration");
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::new(format!(
|
||||
"info,dynamic_preauth={}",
|
||||
if cfg!(debug_assertions) {
|
||||
"debug"
|
||||
} else {
|
||||
"info"
|
||||
}
|
||||
)))
|
||||
.init();
|
||||
|
||||
// Add the build log & executables to the store
|
||||
let mut store = STORE.lock().await;
|
||||
|
||||
// Check if we are deployed on Railway
|
||||
if config.railway.is_railway() {
|
||||
if let Some(build_logs_url) = config.railway.build_logs_url() {
|
||||
tracing::info!("Build logs available here: {}", build_logs_url);
|
||||
store.build_log_url = Some(build_logs_url);
|
||||
}
|
||||
|
||||
// Try to fetch actual build logs using Railway API
|
||||
if config.railway.has_token() {
|
||||
match crate::railway::fetch_build_logs().await {
|
||||
Ok(build_logs) => {
|
||||
tracing::info!(
|
||||
"Successfully fetched build logs ({} bytes)",
|
||||
build_logs.content.len()
|
||||
);
|
||||
store.build_logs = Some(build_logs);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to fetch build logs from Railway API: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::warn!("RAILWAY_TOKEN not set, skipping build log fetch");
|
||||
}
|
||||
}
|
||||
|
||||
store.add_executable("Windows", "./demo.exe");
|
||||
store.add_executable("Linux", "./demo-linux");
|
||||
// store.add_executable("MacOS", "./demo-macos");
|
||||
|
||||
drop(store); // critical: Drop the lock to avoid deadlock, otherwise the server will hang
|
||||
|
||||
let origin = config.railway.cors_origin();
|
||||
let cors = Cors::new()
|
||||
.allow_origin(&origin)
|
||||
.allow_methods(vec![Method::GET])
|
||||
.into_handler();
|
||||
tracing::debug!("CORS Allowed Origin: {}", &origin);
|
||||
|
||||
let static_dir = StaticDir::new(["./public"]).defaults("index.html");
|
||||
|
||||
// TODO: Move handlers to a separate file
|
||||
// TODO: Improved Token Generation
|
||||
// TODO: Advanded HMAC Verification
|
||||
// TODO: Session Purging
|
||||
|
||||
let router = Router::new()
|
||||
.hoop(CatchPanic::new())
|
||||
// /notify does not need a session, nor should it have one
|
||||
.push(Router::with_path("notify").post(notify))
|
||||
// /build-logs does not need a session
|
||||
.push(Router::with_path("build-logs").get(get_build_logs))
|
||||
.push(
|
||||
Router::new()
|
||||
.hoop(session_middleware)
|
||||
.push(Router::with_path("download/<id>").get(download))
|
||||
.push(Router::with_path("session").get(get_session))
|
||||
// websocket /ws
|
||||
.push(Router::with_path("ws").goal(connect))
|
||||
// static files
|
||||
.push(Router::with_path("<**path>").get(static_dir)),
|
||||
);
|
||||
|
||||
let service = Service::new(router).hoop(cors).hoop(Logger::new());
|
||||
|
||||
let acceptor = TcpListener::new(config.bind_addr()).bind().await;
|
||||
Server::new(acceptor).serve(service).await;
|
||||
}
|
||||
268
backend/src/models.rs
Normal file
268
backend/src/models.rs
Normal file
@@ -0,0 +1,268 @@
|
||||
use salvo::{http::cookie::Cookie, websocket::Message, Response};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::HashMap, path};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::utility::search;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BuildLogs {
|
||||
pub content: String,
|
||||
pub fetched_at: chrono::DateTime<chrono::Utc>,
|
||||
pub content_hash: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct Session {
|
||||
pub id: u32,
|
||||
pub downloads: Vec<SessionDownload>,
|
||||
|
||||
pub first_seen: chrono::DateTime<chrono::Utc>,
|
||||
// The last time a request OR websocket message from/to this session was made
|
||||
pub last_seen: chrono::DateTime<chrono::Utc>,
|
||||
// The last time a request was made with this session
|
||||
pub last_request: chrono::DateTime<chrono::Utc>,
|
||||
|
||||
// The sender for the websocket connection
|
||||
#[serde(skip_serializing)]
|
||||
pub tx: Option<UnboundedSender<Result<Message, salvo::Error>>>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
// Update the last seen time(s) for the session
|
||||
pub fn seen(&mut self, socket: bool) {
|
||||
self.last_seen = chrono::Utc::now();
|
||||
if !socket {
|
||||
self.last_request = chrono::Utc::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Add a download to the session
|
||||
pub fn add_download(&mut self, exe: &Executable) -> &SessionDownload {
|
||||
let token: u32 = rand::random();
|
||||
|
||||
let download = SessionDownload {
|
||||
token,
|
||||
filename: format!(
|
||||
"{}-{:08x}{}{}",
|
||||
exe.name,
|
||||
token,
|
||||
if !exe.extension.is_empty() { "." } else { "" },
|
||||
exe.extension
|
||||
),
|
||||
last_used: chrono::Utc::now(),
|
||||
download_time: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
self.downloads.push(download);
|
||||
self.downloads.last().unwrap()
|
||||
}
|
||||
|
||||
// Delete a download from the session
|
||||
// Returns true if the download was deleted, false if it was not found
|
||||
pub fn delete_download(&mut self, token: u32) -> bool {
|
||||
if let Some(index) = self.downloads.iter().position(|d| d.token == token) {
|
||||
self.downloads.remove(index);
|
||||
true
|
||||
} else {
|
||||
tracing::warn!("Attempted to delete non-existent download token: {}", token);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// This function's failure is not a failure to transmit the message, but a failure to buffer it into the channel (or any preceding steps).
|
||||
pub fn send_message(&mut self, message: OutgoingMessage) -> Result<(), anyhow::Error> {
|
||||
if self.tx.is_none() {
|
||||
return Err(anyhow::anyhow!("Session {} has no sender", self.id));
|
||||
}
|
||||
|
||||
// TODO: Error handling
|
||||
let tx = self.tx.as_ref().unwrap();
|
||||
let result = tx.send(Ok(Message::text(serde_json::to_string(&message).unwrap())));
|
||||
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(anyhow::anyhow!("Error sending message: {}", e)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_state(&mut self) -> Result<(), anyhow::Error> {
|
||||
let message = OutgoingMessage::State {
|
||||
session: self.clone(),
|
||||
};
|
||||
|
||||
self.send_message(message)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct SessionDownload {
|
||||
pub token: u32,
|
||||
pub filename: String,
|
||||
pub last_used: chrono::DateTime<chrono::Utc>,
|
||||
pub download_time: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
impl SessionDownload {}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct State {
|
||||
pub sessions: HashMap<u32, Session>,
|
||||
pub executables: HashMap<String, Executable>,
|
||||
pub build_logs: Option<BuildLogs>,
|
||||
pub build_log_url: Option<String>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
sessions: HashMap::new(),
|
||||
executables: HashMap::new(),
|
||||
build_logs: None,
|
||||
build_log_url: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_executable(&mut self, exe_type: &str, exe_path: &str) {
|
||||
let data = std::fs::read(exe_path).expect("Unable to read file");
|
||||
|
||||
let pattern = "a".repeat(1024);
|
||||
let key_start = search(&data, pattern.as_bytes(), 0).unwrap();
|
||||
let key_end = key_start + pattern.len();
|
||||
|
||||
let path = path::Path::new(&exe_path);
|
||||
let name = path.file_stem().unwrap().to_str().unwrap();
|
||||
let extension = match path.extension() {
|
||||
Some(s) => s.to_str().unwrap(),
|
||||
None => "",
|
||||
};
|
||||
|
||||
let exe = Executable {
|
||||
data,
|
||||
filename: path.file_name().unwrap().to_str().unwrap().to_string(),
|
||||
name: name.to_string(),
|
||||
extension: extension.to_string(),
|
||||
key_start,
|
||||
key_end,
|
||||
};
|
||||
|
||||
self.executables.insert(exe_type.to_string(), exe);
|
||||
}
|
||||
|
||||
pub async fn new_session(&mut self, res: &mut Response) -> u32 {
|
||||
let id: u32 = rand::random();
|
||||
|
||||
let now = chrono::Utc::now();
|
||||
self.sessions.insert(
|
||||
id,
|
||||
Session {
|
||||
id,
|
||||
downloads: Vec::new(),
|
||||
last_seen: now,
|
||||
last_request: now,
|
||||
first_seen: now,
|
||||
tx: None,
|
||||
},
|
||||
);
|
||||
|
||||
tracing::info!("New session created: {}", id);
|
||||
|
||||
res.add_cookie(
|
||||
Cookie::build(("Session", id.to_string()))
|
||||
.http_only(true)
|
||||
.partitioned(true)
|
||||
.secure(cfg!(debug_assertions) == false)
|
||||
.path("/")
|
||||
// Use SameSite=None only in development
|
||||
.same_site(if cfg!(debug_assertions) {
|
||||
salvo::http::cookie::SameSite::None
|
||||
} else {
|
||||
salvo::http::cookie::SameSite::Strict
|
||||
})
|
||||
.permanent()
|
||||
.build(),
|
||||
);
|
||||
|
||||
id
|
||||
}
|
||||
|
||||
pub fn executable_json(&self) -> Vec<ExecutableJson> {
|
||||
let mut executables = Vec::new();
|
||||
|
||||
for (key, exe) in &self.executables {
|
||||
executables.push(ExecutableJson {
|
||||
id: key.to_string(),
|
||||
size: exe.data.len(),
|
||||
filename: exe.filename.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
executables
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub struct Executable {
|
||||
pub data: Vec<u8>, // the raw data of the executable
|
||||
pub filename: String,
|
||||
pub name: String, // the name before the extension
|
||||
pub extension: String, // may be empty string
|
||||
pub key_start: usize, // the index of the byte where the key starts
|
||||
pub key_end: usize, // the index of the byte where the key ends
|
||||
}
|
||||
|
||||
impl Executable {
|
||||
pub fn with_key(&self, new_key: &[u8]) -> Vec<u8> {
|
||||
let mut data = self.data.clone();
|
||||
|
||||
// Copy the key into the data
|
||||
for i in 0..new_key.len() {
|
||||
data[self.key_start + i] = new_key[i];
|
||||
}
|
||||
|
||||
// If the new key is shorter than the old key, we just write over the remaining data
|
||||
if new_key.len() < self.key_end - self.key_start {
|
||||
for item in data
|
||||
.iter_mut()
|
||||
.take(self.key_end)
|
||||
.skip(self.key_start + new_key.len())
|
||||
{
|
||||
*item = b' ';
|
||||
}
|
||||
}
|
||||
|
||||
data
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "kebab-case")]
|
||||
pub enum IncomingMessage {
|
||||
// A request from the client to delete a download token
|
||||
DeleteDownloadToken { id: u32 },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "kebab-case")]
|
||||
pub enum OutgoingMessage {
|
||||
// An alert to the client that a session download has been used.
|
||||
#[serde(rename = "notify")]
|
||||
TokenAlert {
|
||||
token: u32,
|
||||
},
|
||||
// A message describing the current session state
|
||||
State {
|
||||
session: Session,
|
||||
},
|
||||
Executables {
|
||||
build_log: Option<String>,
|
||||
executables: Vec<ExecutableJson>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ExecutableJson {
|
||||
pub id: String,
|
||||
pub size: usize,
|
||||
pub filename: String,
|
||||
}
|
||||
276
backend/src/railway.rs
Normal file
276
backend/src/railway.rs
Normal file
@@ -0,0 +1,276 @@
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::env;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct GraphQLRequest {
|
||||
query: String,
|
||||
variables: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct GraphQLResponse {
|
||||
data: Option<serde_json::Value>,
|
||||
errors: Option<Vec<GraphQLError>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct GraphQLError {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BuildLogEntry {
|
||||
message: String,
|
||||
severity: String,
|
||||
timestamp: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct DeploymentNode {
|
||||
id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct DeploymentEdge {
|
||||
node: DeploymentNode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct DeploymentsConnection {
|
||||
edges: Vec<DeploymentEdge>,
|
||||
}
|
||||
|
||||
fn strip_ansi_codes(text: &str) -> String {
|
||||
// Simple regex to remove ANSI escape sequences
|
||||
let re = regex::Regex::new(r"\x1b\[[0-9;]*[a-zA-Z]").unwrap();
|
||||
re.replace_all(text, "").to_string()
|
||||
}
|
||||
|
||||
fn should_stop_at_message(message: &str) -> bool {
|
||||
let clean_message = strip_ansi_codes(message);
|
||||
|
||||
// Check for "Build time: X seconds" pattern (case insensitive)
|
||||
let build_time_pattern = regex::Regex::new(r"(?i)Build\s+time:\s+\d+").unwrap();
|
||||
if build_time_pattern.is_match(&clean_message) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check for "Starting Container" (case insensitive)
|
||||
let starting_container_pattern = regex::Regex::new(r"(?i)Starting\s+Container").unwrap();
|
||||
if starting_container_pattern.is_match(&clean_message) {
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
async fn fetch_latest_deployment_id() -> Result<String> {
|
||||
let token = env::var("RAILWAY_TOKEN")?;
|
||||
let service_id = env::var("RAILWAY_SERVICE_ID")?;
|
||||
let project_id = env::var("RAILWAY_PROJECT_ID")?;
|
||||
let environment_id = env::var("RAILWAY_ENVIRONMENT_ID")?;
|
||||
|
||||
let query = r#"
|
||||
query deployments($input: DeploymentListInput!, $first: Int) {
|
||||
deployments(input: $input, first: $first) {
|
||||
edges {
|
||||
node {
|
||||
id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"#;
|
||||
|
||||
let variables = serde_json::json!({
|
||||
"input": {
|
||||
"projectId": project_id,
|
||||
"serviceId": service_id,
|
||||
"environmentId": environment_id,
|
||||
"status": {"in": ["SUCCESS", "DEPLOYING", "SLEEPING", "BUILDING"]}
|
||||
},
|
||||
"first": 1
|
||||
});
|
||||
|
||||
let request = GraphQLRequest {
|
||||
query: query.to_string(),
|
||||
variables,
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.post("https://backboard.railway.app/graphql/v2")
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.json(&request)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let response_text = response.text().await?;
|
||||
let graphql_response: GraphQLResponse = serde_json::from_str(&response_text)?;
|
||||
|
||||
if let Some(errors) = graphql_response.errors {
|
||||
let error_messages: Vec<String> = errors.iter().map(|e| e.message.clone()).collect();
|
||||
return Err(anyhow::anyhow!(
|
||||
"GraphQL errors: {}",
|
||||
error_messages.join(", ")
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(data) = graphql_response.data {
|
||||
if let Some(deployments_value) = data.get("deployments") {
|
||||
if let Ok(deployments) =
|
||||
serde_json::from_value::<DeploymentsConnection>(deployments_value.clone())
|
||||
{
|
||||
if let Some(first_edge) = deployments.edges.first() {
|
||||
return Ok(first_edge.node.id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow::anyhow!(
|
||||
"No deployments found or unexpected response structure"
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn fetch_build_logs() -> Result<crate::models::BuildLogs> {
|
||||
let token = env::var("RAILWAY_TOKEN")?;
|
||||
|
||||
// Get deployment ID - in debug mode, fetch latest if not specified
|
||||
let deployment_id = if cfg!(debug_assertions) {
|
||||
match env::var("RAILWAY_DEPLOYMENT_ID") {
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
tracing::debug!(
|
||||
"No RAILWAY_DEPLOYMENT_ID specified in debug mode, fetching latest deployment"
|
||||
);
|
||||
fetch_latest_deployment_id().await?
|
||||
}
|
||||
}
|
||||
} else {
|
||||
env::var("RAILWAY_DEPLOYMENT_ID")?
|
||||
};
|
||||
|
||||
let query = r#"
|
||||
query buildLogs($deploymentId: String!, $endDate: DateTime, $filter: String, $limit: Int, $startDate: DateTime) {
|
||||
buildLogs(
|
||||
deploymentId: $deploymentId
|
||||
endDate: $endDate
|
||||
filter: $filter
|
||||
limit: $limit
|
||||
startDate: $startDate
|
||||
) {
|
||||
message
|
||||
severity
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
"#;
|
||||
|
||||
let variables = serde_json::json!({
|
||||
"deploymentId": deployment_id,
|
||||
"limit": 1000
|
||||
});
|
||||
|
||||
let request = GraphQLRequest {
|
||||
query: query.to_string(),
|
||||
variables,
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.post("https://backboard.railway.app/graphql/v2")
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.json(&request)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let response_text = response.text().await?;
|
||||
let graphql_response: GraphQLResponse = serde_json::from_str(&response_text)?;
|
||||
|
||||
if let Some(errors) = graphql_response.errors {
|
||||
let error_messages: Vec<String> = errors.iter().map(|e| e.message.clone()).collect();
|
||||
return Err(anyhow::anyhow!(
|
||||
"GraphQL errors: {}",
|
||||
error_messages.join(", ")
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(data) = graphql_response.data {
|
||||
if let Some(build_logs_value) = data.get("buildLogs") {
|
||||
if let Ok(build_logs) =
|
||||
serde_json::from_value::<Vec<BuildLogEntry>>(build_logs_value.clone())
|
||||
{
|
||||
let mut filtered_logs = Vec::new();
|
||||
let starting_container_pattern =
|
||||
regex::Regex::new(r"(?i)Starting\s+Container").unwrap();
|
||||
|
||||
for entry in build_logs {
|
||||
// Check if we should stop at this message
|
||||
if should_stop_at_message(&entry.message) {
|
||||
// For "Build time" messages, include them
|
||||
// For "Starting Container" messages, stop before them
|
||||
let clean_message = strip_ansi_codes(&entry.message);
|
||||
|
||||
if starting_container_pattern.is_match(&clean_message) {
|
||||
// Stop before "Starting Container" message
|
||||
break;
|
||||
} else {
|
||||
// Include "Build time" message and stop
|
||||
let formatted_entry = format!(
|
||||
"{} {} {}",
|
||||
entry.timestamp,
|
||||
entry.severity,
|
||||
clean_message.trim()
|
||||
);
|
||||
filtered_logs.push(formatted_entry);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Include this log entry
|
||||
let clean_message = strip_ansi_codes(&entry.message);
|
||||
let formatted_entry = format!(
|
||||
"{} {} {}",
|
||||
entry.timestamp,
|
||||
entry.severity,
|
||||
clean_message.trim()
|
||||
);
|
||||
filtered_logs.push(formatted_entry);
|
||||
}
|
||||
|
||||
// Add Railway URL header to the logs
|
||||
let railway_url = format!(
|
||||
"Railway Build Logs: https://railway.com/project/{}/service/{}?environmentId={}&id={}#build\n\n",
|
||||
env::var("RAILWAY_PROJECT_ID").unwrap_or_default(),
|
||||
env::var("RAILWAY_SERVICE_ID").unwrap_or_default(),
|
||||
env::var("RAILWAY_ENVIRONMENT_ID").unwrap_or_default(),
|
||||
deployment_id
|
||||
);
|
||||
|
||||
let content = format!("{}{}", railway_url, filtered_logs.join("\n"));
|
||||
let fetched_at = chrono::Utc::now();
|
||||
|
||||
// Generate hash for the content
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
let mut hasher = DefaultHasher::new();
|
||||
content.hash(&mut hasher);
|
||||
let content_hash = hasher.finish();
|
||||
|
||||
return Ok(crate::models::BuildLogs {
|
||||
content,
|
||||
fetched_at,
|
||||
content_hash,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow::anyhow!(
|
||||
"Unexpected response structure from Railway API"
|
||||
))
|
||||
} else {
|
||||
Err(anyhow::anyhow!("No data received from Railway API"))
|
||||
}
|
||||
}
|
||||
40
backend/src/utility.rs
Normal file
40
backend/src/utility.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
pub(crate) fn search(buf: &[u8], pattern: &[u8], start_index: usize) -> Option<usize> {
|
||||
let mut i = start_index;
|
||||
|
||||
// If the buffer is empty, the pattern is too long
|
||||
if pattern.len() > buf.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If the pattern is empty
|
||||
if pattern.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If the starting index is too high
|
||||
if start_index >= buf.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
while i < buf.len() {
|
||||
for j in 0..pattern.len() {
|
||||
// If the pattern is too long to fit in the buffer anymore
|
||||
if i + j >= buf.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If the pattern stops matching
|
||||
if buf[i + j] != pattern[j] {
|
||||
break;
|
||||
}
|
||||
|
||||
// If the pattern is found
|
||||
if j == pattern.len() - 1 {
|
||||
return Some(i);
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
None
|
||||
}
|
||||
Reference in New Issue
Block a user