diff --git a/Cargo.lock b/Cargo.lock index 406b0a8..581efdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,6 +182,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", + "base64 0.22.1", "bytes", "form_urlencoded", "futures-util", @@ -200,8 +201,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper 1.0.2", "tokio", + "tokio-tungstenite 0.28.0", "tower", "tower-layer", "tower-service", @@ -2916,7 +2919,7 @@ dependencies = [ "static_assertions", "time", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tracing", "typemap_rev", "typesize", @@ -3604,10 +3607,22 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", - "tungstenite", + "tungstenite 0.21.0", "webpki-roots 0.26.11", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.28.0", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -3845,6 +3860,23 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror 2.0.16", + "utf-8", +] + [[package]] name = "typemap_rev" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 5103553..3df8556 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ embed-assets = ["dep:rust-embed", "dep:mime_guess"] [dependencies] anyhow = "1.0.99" async-trait = "0.1" -axum = "0.8.4" +axum = { version = "0.8.4", features = ["ws"] } bitflags = { version = "2.9.4", features = ["serde"] } chrono = { version = "0.4.42", features = ["serde"] } compile-time = "0.2.0" diff --git a/migrations/20260129183411_add_queued_at_to_scrape_jobs.sql b/migrations/20260129183411_add_queued_at_to_scrape_jobs.sql new file mode 100644 index 0000000..ac45c2e --- /dev/null +++ b/migrations/20260129183411_add_queued_at_to_scrape_jobs.sql @@ -0,0 +1,7 @@ +-- Add queued_at column to track when a job last entered the "ready to pick up" state. +-- For fresh jobs this equals execute_at; for retried jobs it is updated to NOW(). +ALTER TABLE scrape_jobs + ADD COLUMN queued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(); + +-- Backfill existing rows: set queued_at = execute_at (best approximation) +UPDATE scrape_jobs SET queued_at = execute_at; diff --git a/src/app.rs b/src/app.rs index 260c436..f3f44fe 100644 --- a/src/app.rs +++ b/src/app.rs @@ -126,6 +126,7 @@ impl App { self.banner_api.clone(), self.app_state.reference_cache.clone(), self.app_state.service_statuses.clone(), + self.app_state.scrape_job_tx.clone(), )); self.service_manager .register_service(ServiceName::Scraper.as_str(), scraper_service); diff --git a/src/data/models.rs b/src/data/models.rs index 32c70e5..5a37b11 100644 --- a/src/data/models.rs +++ b/src/data/models.rs @@ -176,6 +176,20 @@ pub enum TargetType { SingleCrn, } +/// Computed status for a scrape job, derived from existing fields. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum ScrapeJobStatus { + Processing, + StaleLock, + Exhausted, + Scheduled, + Pending, +} + +/// How long a lock can be held before it is considered stale (mirrors `scrape_jobs::LOCK_EXPIRY`). +const LOCK_EXPIRY_SECS: i64 = 10 * 60; + /// Represents a queryable job from the database. #[allow(dead_code)] #[derive(sqlx::FromRow, Debug, Clone)] @@ -191,6 +205,27 @@ pub struct ScrapeJob { pub retry_count: i32, /// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint) pub max_retries: i32, + /// When the job last entered the "ready to pick up" state. + /// Set to NOW() on creation; updated to NOW() on retry. + pub queued_at: DateTime, +} + +impl ScrapeJob { + /// Compute the current status of this job from its fields. + pub fn status(&self) -> ScrapeJobStatus { + let now = Utc::now(); + match self.locked_at { + Some(locked) if (now - locked).num_seconds() < LOCK_EXPIRY_SECS => { + ScrapeJobStatus::Processing + } + Some(_) => ScrapeJobStatus::StaleLock, + None if self.retry_count >= self.max_retries && self.max_retries > 0 => { + ScrapeJobStatus::Exhausted + } + None if self.execute_at > now => ScrapeJobStatus::Scheduled, + None => ScrapeJobStatus::Pending, + } + } } /// A user authenticated via Discord OAuth. diff --git a/src/data/scrape_jobs.rs b/src/data/scrape_jobs.rs index 44c8444..79e4fe6 100644 --- a/src/data/scrape_jobs.rs +++ b/src/data/scrape_jobs.rs @@ -13,9 +13,11 @@ use std::collections::HashSet; /// # Returns /// The number of jobs that were unlocked. pub async fn force_unlock_all(db_pool: &PgPool) -> Result { - let result = sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE locked_at IS NOT NULL") - .execute(db_pool) - .await?; + let result = sqlx::query( + "UPDATE scrape_jobs SET locked_at = NULL, queued_at = NOW() WHERE locked_at IS NOT NULL", + ) + .execute(db_pool) + .await?; Ok(result.rows_affected()) } @@ -97,10 +99,11 @@ pub async fn unlock_job(job_id: i32, db_pool: &PgPool) -> Result<()> { Ok(()) } -/// Atomically unlock a job and increment its retry count. +/// Atomically unlock a job, increment its retry count, and reset `queued_at`. /// -/// Returns whether the job still has retries remaining. This is determined -/// atomically in the database to avoid race conditions between workers. +/// Returns the new `queued_at` timestamp if retries remain, or `None` if +/// the job has exhausted its retries. This is determined atomically in the +/// database to avoid race conditions between workers. /// /// # Arguments /// * `job_id` - The database ID of the job @@ -108,25 +111,25 @@ pub async fn unlock_job(job_id: i32, db_pool: &PgPool) -> Result<()> { /// * `db_pool` - PostgreSQL connection pool /// /// # Returns -/// * `Ok(true)` if the job was unlocked and retries remain -/// * `Ok(false)` if the job has exhausted its retries +/// * `Ok(Some(queued_at))` if the job was unlocked and retries remain +/// * `Ok(None)` if the job has exhausted its retries pub async fn unlock_and_increment_retry( job_id: i32, max_retries: i32, db_pool: &PgPool, -) -> Result { - let result = sqlx::query_scalar::<_, Option>( +) -> Result>> { + let result = sqlx::query_scalar::<_, Option>>( "UPDATE scrape_jobs - SET locked_at = NULL, retry_count = retry_count + 1 + SET locked_at = NULL, retry_count = retry_count + 1, queued_at = NOW() WHERE id = $1 - RETURNING CASE WHEN retry_count <= $2 THEN retry_count ELSE NULL END", + RETURNING CASE WHEN retry_count <= $2 THEN queued_at ELSE NULL END", ) .bind(job_id) .bind(max_retries) .fetch_one(db_pool) .await?; - Ok(result.is_some()) + Ok(result) } /// Find existing job payloads matching the given target type and candidates. @@ -173,9 +176,9 @@ pub async fn find_existing_job_payloads( pub async fn batch_insert_jobs( jobs: &[(serde_json::Value, TargetType, ScrapePriority)], db_pool: &PgPool, -) -> Result<()> { +) -> Result> { if jobs.is_empty() { - return Ok(()); + return Ok(Vec::new()); } let mut target_types: Vec = Vec::with_capacity(jobs.len()); @@ -188,19 +191,20 @@ pub async fn batch_insert_jobs( priorities.push(format!("{priority:?}")); } - sqlx::query( + let inserted = sqlx::query_as::<_, ScrapeJob>( r#" - INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) - SELECT v.target_type::target_type, v.payload, v.priority::scrape_priority, NOW() + INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at, queued_at) + SELECT v.target_type::target_type, v.payload, v.priority::scrape_priority, NOW(), NOW() FROM UNNEST($1::text[], $2::jsonb[], $3::text[]) AS v(target_type, payload, priority) + RETURNING * "#, ) .bind(&target_types) .bind(&payloads) .bind(&priorities) - .execute(db_pool) + .fetch_all(db_pool) .await?; - Ok(()) + Ok(inserted) } diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 1914bf0..6ef100a 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -7,6 +7,7 @@ use crate::data::scrape_jobs; use crate::services::Service; use crate::state::ReferenceCache; use crate::status::{ServiceStatus, ServiceStatusRegistry}; +use crate::web::ws::ScrapeJobEvent; use sqlx::PgPool; use std::sync::Arc; use tokio::sync::{RwLock, broadcast}; @@ -25,6 +26,7 @@ pub struct ScraperService { banner_api: Arc, reference_cache: Arc>, service_statuses: ServiceStatusRegistry, + job_events_tx: broadcast::Sender, scheduler_handle: Option>, worker_handles: Vec>, shutdown_tx: Option>, @@ -37,12 +39,14 @@ impl ScraperService { banner_api: Arc, reference_cache: Arc>, service_statuses: ServiceStatusRegistry, + job_events_tx: broadcast::Sender, ) -> Self { Self { db_pool, banner_api, reference_cache, service_statuses, + job_events_tx, scheduler_handle: None, worker_handles: Vec::new(), shutdown_tx: None, @@ -71,6 +75,7 @@ impl ScraperService { self.db_pool.clone(), self.banner_api.clone(), self.reference_cache.clone(), + self.job_events_tx.clone(), ); let shutdown_rx = shutdown_tx.subscribe(); let scheduler_handle = tokio::spawn(async move { @@ -81,7 +86,12 @@ impl ScraperService { let worker_count = 4; // This could be configurable for i in 0..worker_count { - let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone()); + let worker = Worker::new( + i, + self.db_pool.clone(), + self.banner_api.clone(), + self.job_events_tx.clone(), + ); let shutdown_rx = shutdown_tx.subscribe(); let worker_handle = tokio::spawn(async move { worker.run(shutdown_rx).await; diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs index 11956de..57a00df 100644 --- a/src/scraper/scheduler.rs +++ b/src/scraper/scheduler.rs @@ -5,6 +5,7 @@ use crate::error::Result; use crate::rmp::RmpClient; use crate::scraper::jobs::subject::SubjectJob; use crate::state::ReferenceCache; +use crate::web::ws::{ScrapeJobDto, ScrapeJobEvent}; use serde_json::json; use sqlx::PgPool; use std::sync::Arc; @@ -25,6 +26,7 @@ pub struct Scheduler { db_pool: PgPool, banner_api: Arc, reference_cache: Arc>, + job_events_tx: broadcast::Sender, } impl Scheduler { @@ -32,11 +34,13 @@ impl Scheduler { db_pool: PgPool, banner_api: Arc, reference_cache: Arc>, + job_events_tx: broadcast::Sender, ) -> Self { Self { db_pool, banner_api, reference_cache, + job_events_tx, } } @@ -74,6 +78,7 @@ impl Scheduler { let banner_api = self.banner_api.clone(); let cancel_token = cancel_token.clone(); let reference_cache = self.reference_cache.clone(); + let job_events_tx = self.job_events_tx.clone(); async move { tokio::select! { @@ -99,7 +104,7 @@ impl Scheduler { tokio::join!(rmp_fut, ref_fut); - if let Err(e) = Self::schedule_jobs_impl(&db_pool, &banner_api).await { + if let Err(e) = Self::schedule_jobs_impl(&db_pool, &banner_api, Some(&job_events_tx)).await { error!(error = ?e, "Failed to schedule jobs"); } } => {} @@ -150,7 +155,11 @@ impl Scheduler { /// /// This is a static method (not &self) to allow it to be called from spawned tasks. #[tracing::instrument(skip_all, fields(term))] - async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> { + async fn schedule_jobs_impl( + db_pool: &PgPool, + banner_api: &BannerApi, + job_events_tx: Option<&broadcast::Sender>, + ) -> Result<()> { // For now, we will implement a simple baseline scheduling strategy: // 1. Get a list of all subjects from the Banner API. // 2. Query existing jobs for all subjects in a single query. @@ -213,7 +222,16 @@ impl Scheduler { .map(|(payload, _)| (payload, TargetType::Subject, ScrapePriority::Low)) .collect(); - scrape_jobs::batch_insert_jobs(&jobs, db_pool).await?; + let inserted = scrape_jobs::batch_insert_jobs(&jobs, db_pool).await?; + + if let Some(tx) = job_events_tx { + inserted.iter().for_each(|job| { + debug!(job_id = job.id, "Emitting JobCreated event"); + let _ = tx.send(ScrapeJobEvent::JobCreated { + job: ScrapeJobDto::from(job), + }); + }); + } } debug!("Job scheduling complete"); diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index 3fdc1f0..c611a11 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -1,8 +1,10 @@ use crate::banner::{BannerApi, BannerApiError}; -use crate::data::models::ScrapeJob; +use crate::data::models::{ScrapeJob, ScrapeJobStatus}; use crate::data::scrape_jobs; use crate::error::Result; use crate::scraper::jobs::{JobError, JobType}; +use crate::web::ws::ScrapeJobEvent; +use chrono::Utc; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; @@ -21,14 +23,21 @@ pub struct Worker { id: usize, // For logging purposes db_pool: PgPool, banner_api: Arc, + job_events_tx: broadcast::Sender, } impl Worker { - pub fn new(id: usize, db_pool: PgPool, banner_api: Arc) -> Self { + pub fn new( + id: usize, + db_pool: PgPool, + banner_api: Arc, + job_events_tx: broadcast::Sender, + ) -> Self { Self { id, db_pool, banner_api, + job_events_tx, } } @@ -65,6 +74,15 @@ impl Worker { let max_retries = job.max_retries; let start = std::time::Instant::now(); + // Emit JobLocked event + let locked_at = Utc::now().to_rfc3339(); + debug!(job_id, "Emitting JobLocked event"); + let _ = self.job_events_tx.send(ScrapeJobEvent::JobLocked { + id: job_id, + locked_at, + status: ScrapeJobStatus::Processing, + }); + // Process the job, racing against shutdown signal and timeout let process_result = tokio::select! { _ = shutdown_rx.recv() => { @@ -143,7 +161,11 @@ impl Worker { scrape_jobs::unlock_job(job_id, &self.db_pool).await } - async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result { + async fn unlock_and_increment_retry( + &self, + job_id: i32, + max_retries: i32, + ) -> Result>> { scrape_jobs::unlock_and_increment_retry(job_id, max_retries, &self.db_pool).await } @@ -188,6 +210,10 @@ impl Worker { if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job"); } + debug!(job_id, "Emitting JobCompleted event"); + let _ = self + .job_events_tx + .send(ScrapeJobEvent::JobCompleted { id: job_id }); } Err(JobError::Recoverable(e)) => { self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration) @@ -204,6 +230,10 @@ impl Worker { if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job"); } + debug!(job_id, "Emitting JobDeleted event"); + let _ = self + .job_events_tx + .send(ScrapeJobEvent::JobDeleted { id: job_id }); } } } @@ -246,7 +276,7 @@ impl Worker { // Atomically unlock and increment retry count, checking if retry is allowed match self.unlock_and_increment_retry(job_id, max_retries).await { - Ok(can_retry) if can_retry => { + Ok(Some(queued_at)) => { debug!( worker_id = self.id, job_id, @@ -254,8 +284,15 @@ impl Worker { remaining_retries = remaining_retries, "Job unlocked for retry" ); + debug!(job_id, "Emitting JobRetried event"); + let _ = self.job_events_tx.send(ScrapeJobEvent::JobRetried { + id: job_id, + retry_count: next_attempt, + queued_at: queued_at.to_rfc3339(), + status: ScrapeJobStatus::Pending, + }); } - Ok(_) => { + Ok(None) => { // Max retries exceeded (detected atomically) error!( worker_id = self.id, @@ -269,6 +306,13 @@ impl Worker { if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job"); } + debug!(job_id, "Emitting JobExhausted and JobDeleted events"); + let _ = self + .job_events_tx + .send(ScrapeJobEvent::JobExhausted { id: job_id }); + let _ = self + .job_events_tx + .send(ScrapeJobEvent::JobDeleted { id: job_id }); } Err(e) => { error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count"); diff --git a/src/state.rs b/src/state.rs index 10c14da..e2f546d 100644 --- a/src/state.rs +++ b/src/state.rs @@ -5,11 +5,12 @@ use crate::banner::Course; use crate::data::models::ReferenceData; use crate::status::ServiceStatusRegistry; use crate::web::session_cache::{OAuthStateStore, SessionCache}; +use crate::web::ws::ScrapeJobEvent; use anyhow::Result; use sqlx::PgPool; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, broadcast}; /// In-memory cache for reference data (code→description lookups). /// @@ -75,10 +76,12 @@ pub struct AppState { pub reference_cache: Arc>, pub session_cache: SessionCache, pub oauth_state_store: OAuthStateStore, + pub scrape_job_tx: broadcast::Sender, } impl AppState { pub fn new(banner_api: Arc, db_pool: PgPool) -> Self { + let (scrape_job_tx, _) = broadcast::channel(64); Self { session_cache: SessionCache::new(db_pool.clone()), oauth_state_store: OAuthStateStore::new(), @@ -86,9 +89,15 @@ impl AppState { db_pool, service_statuses: ServiceStatusRegistry::new(), reference_cache: Arc::new(RwLock::new(ReferenceCache::new())), + scrape_job_tx, } } + /// Subscribe to scrape job lifecycle events. + pub fn scrape_job_events(&self) -> broadcast::Receiver { + self.scrape_job_tx.subscribe() + } + /// Initialize the reference cache from the database. pub async fn load_reference_cache(&self) -> Result<()> { let entries = crate::data::reference::get_all(&self.db_pool).await?; diff --git a/src/web/admin.rs b/src/web/admin.rs index 82068e0..20fd212 100644 --- a/src/web/admin.rs +++ b/src/web/admin.rs @@ -163,6 +163,8 @@ pub async fn list_scrape_jobs( "lockedAt": j.locked_at.map(|t| t.to_rfc3339()), "retryCount": j.retry_count, "maxRetries": j.max_retries, + "queuedAt": j.queued_at.to_rfc3339(), + "status": j.status(), }) }) .collect(); diff --git a/src/web/mod.rs b/src/web/mod.rs index 5b3c675..f8d4995 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -9,5 +9,6 @@ pub mod encoding; pub mod extractors; pub mod routes; pub mod session_cache; +pub mod ws; pub use routes::*; diff --git a/src/web/routes.rs b/src/web/routes.rs index 8c619dd..d925ab7 100644 --- a/src/web/routes.rs +++ b/src/web/routes.rs @@ -11,6 +11,7 @@ use axum::{ use crate::web::admin; use crate::web::auth::{self, AuthConfig}; +use crate::web::ws; #[cfg(feature = "embed-assets")] use axum::{ http::{HeaderMap, StatusCode, Uri}, @@ -63,6 +64,7 @@ pub fn create_router(app_state: AppState, auth_config: AuthConfig) -> Router { put(admin::set_user_admin), ) .route("/admin/scrape-jobs", get(admin::list_scrape_jobs)) + .route("/admin/scrape-jobs/ws", get(ws::scrape_jobs_ws)) .route("/admin/audit-log", get(admin::list_audit_log)) .with_state(app_state); diff --git a/src/web/ws.rs b/src/web/ws.rs new file mode 100644 index 0000000..348a194 --- /dev/null +++ b/src/web/ws.rs @@ -0,0 +1,205 @@ +//! WebSocket event types and handler for real-time scrape job updates. + +use axum::{ + extract::{ + State, + ws::{Message, WebSocket, WebSocketUpgrade}, + }, + response::IntoResponse, +}; +use futures::{SinkExt, StreamExt}; +use serde::Serialize; +use sqlx::PgPool; +use tokio::sync::broadcast; +use tracing::debug; + +use crate::data::models::{ScrapeJob, ScrapeJobStatus}; +use crate::state::AppState; +use crate::web::extractors::AdminUser; + +/// A serializable DTO for `ScrapeJob` with computed `status`. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScrapeJobDto { + pub id: i32, + pub target_type: String, + pub target_payload: serde_json::Value, + pub priority: String, + pub execute_at: String, + pub created_at: String, + pub locked_at: Option, + pub retry_count: i32, + pub max_retries: i32, + pub queued_at: String, + pub status: ScrapeJobStatus, +} + +impl From<&ScrapeJob> for ScrapeJobDto { + fn from(job: &ScrapeJob) -> Self { + Self { + id: job.id, + target_type: format!("{:?}", job.target_type), + target_payload: job.target_payload.clone(), + priority: format!("{:?}", job.priority), + execute_at: job.execute_at.to_rfc3339(), + created_at: job.created_at.to_rfc3339(), + locked_at: job.locked_at.map(|t| t.to_rfc3339()), + retry_count: job.retry_count, + max_retries: job.max_retries, + queued_at: job.queued_at.to_rfc3339(), + status: job.status(), + } + } +} + +/// Events broadcast when scrape job state changes. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum ScrapeJobEvent { + Init { + jobs: Vec, + }, + JobCreated { + job: ScrapeJobDto, + }, + JobLocked { + id: i32, + locked_at: String, + status: ScrapeJobStatus, + }, + JobCompleted { + id: i32, + }, + JobRetried { + id: i32, + retry_count: i32, + queued_at: String, + status: ScrapeJobStatus, + }, + JobExhausted { + id: i32, + }, + JobDeleted { + id: i32, + }, +} + +/// Fetch current scrape jobs from the DB and build an `Init` event. +async fn build_init_event(db_pool: &PgPool) -> Result { + let rows = sqlx::query_as::<_, ScrapeJob>( + "SELECT * FROM scrape_jobs ORDER BY priority DESC, execute_at ASC LIMIT 100", + ) + .fetch_all(db_pool) + .await?; + + let jobs = rows.iter().map(ScrapeJobDto::from).collect(); + Ok(ScrapeJobEvent::Init { jobs }) +} + +/// WebSocket endpoint for real-time scrape job updates. +/// +/// Auth is checked via `AdminUser` before the upgrade occurs — if rejected, +/// a 401/403 is returned and the upgrade never happens. +pub async fn scrape_jobs_ws( + ws: WebSocketUpgrade, + AdminUser(_user): AdminUser, + State(state): State, +) -> impl IntoResponse { + ws.on_upgrade(|socket| handle_scrape_jobs_ws(socket, state)) +} + +/// Serialize an event and send it over the WebSocket sink. +/// Returns `true` if the message was sent, `false` if the client disconnected. +async fn send_event( + sink: &mut futures::stream::SplitSink, + event: &ScrapeJobEvent, +) -> bool { + let Ok(json) = serde_json::to_string(event) else { + return true; // serialization failed, but connection is still alive + }; + sink.send(Message::Text(json.into())).await.is_ok() +} + +async fn handle_scrape_jobs_ws(socket: WebSocket, state: AppState) { + debug!("scrape-jobs WebSocket connected"); + + let (mut sink, mut stream) = socket.split(); + + // Send initial state + let init_event = match build_init_event(&state.db_pool).await { + Ok(event) => event, + Err(e) => { + debug!(error = %e, "failed to build init event, closing WebSocket"); + return; + } + }; + if !send_event(&mut sink, &init_event).await { + debug!("client disconnected during init send"); + return; + } + + // Subscribe to broadcast events + let mut rx = state.scrape_job_events(); + + loop { + tokio::select! { + result = rx.recv() => { + match result { + Ok(ref event) => { + if !send_event(&mut sink, event).await { + debug!("client disconnected during event send"); + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + debug!(missed = n, "broadcast lagged, resyncing"); + match build_init_event(&state.db_pool).await { + Ok(ref event) => { + if !send_event(&mut sink, event).await { + debug!("client disconnected during resync send"); + break; + } + } + Err(e) => { + debug!(error = %e, "failed to build resync init event"); + } + } + } + Err(broadcast::error::RecvError::Closed) => { + debug!("broadcast channel closed"); + break; + } + } + } + msg = stream.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + if let Ok(parsed) = serde_json::from_str::(&text) + && parsed.get("type").and_then(|t| t.as_str()) == Some("resync") + { + debug!("client requested resync"); + match build_init_event(&state.db_pool).await { + Ok(ref event) => { + if !send_event(&mut sink, event).await { + debug!("client disconnected during resync send"); + break; + } + } + Err(e) => { + debug!(error = %e, "failed to build resync init event"); + } + } + } + } + Some(Ok(Message::Close(_))) | None => { + debug!("client disconnected"); + break; + } + _ => {} + } + } + } + } + + debug!("scrape-jobs WebSocket disconnected"); +} diff --git a/tests/db_scrape_jobs.rs b/tests/db_scrape_jobs.rs index 039b6d5..ae69b7e 100644 --- a/tests/db_scrape_jobs.rs +++ b/tests/db_scrape_jobs.rs @@ -217,10 +217,13 @@ async fn unlock_and_increment_retry_has_retries_remaining(pool: PgPool) { ) .await; - let has_retries = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) + let result = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) .await .unwrap(); - assert!(has_retries, "should have retries remaining (0→1, max=3)"); + assert!( + result.is_some(), + "should have retries remaining (0→1, max=3)" + ); // Verify state in DB let (retry_count, locked_at): (i32, Option>) = @@ -246,11 +249,11 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) { ) .await; - let has_retries = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) + let result = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) .await .unwrap(); assert!( - !has_retries, + result.is_none(), "should NOT have retries remaining (3→4, max=3)" ); @@ -276,11 +279,11 @@ async fn unlock_and_increment_retry_already_exceeded(pool: PgPool) { ) .await; - let has_retries = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) + let result = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) .await .unwrap(); assert!( - !has_retries, + result.is_none(), "should NOT have retries remaining (5→6, max=3)" ); diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index 9d79dfe..483f72f 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -53,6 +53,8 @@ export interface ScrapeJob { lockedAt: string | null; retryCount: number; maxRetries: number; + queuedAt: string; + status: "processing" | "staleLock" | "exhausted" | "scheduled" | "pending"; } export interface ScrapeJobsResponse { diff --git a/web/src/lib/components/ErrorBoundaryFallback.svelte b/web/src/lib/components/ErrorBoundaryFallback.svelte new file mode 100644 index 0000000..7938823 --- /dev/null +++ b/web/src/lib/components/ErrorBoundaryFallback.svelte @@ -0,0 +1,56 @@ + + +
+
+
+
+ + {title} +
+ {page.url.pathname} +
+ +
+ {errorName} +
{errorMessage}
+
+ + {#if errorStack} +
+ + Stack trace + +
{errorStack}
+
+ {/if} + +
+ Retries this section, not the full page + +
+
+
diff --git a/web/src/lib/components/PageTransition.svelte b/web/src/lib/components/PageTransition.svelte index c3c3a19..7a50568 100644 --- a/web/src/lib/components/PageTransition.svelte +++ b/web/src/lib/components/PageTransition.svelte @@ -48,7 +48,7 @@ function inTransition(_node: HTMLElement): TransitionConfig { function outTransition(_node: HTMLElement): TransitionConfig { const dir = navigationStore.direction; - const base = "position: absolute; top: 0; left: 0; width: 100%"; + const base = "position: absolute; top: 0; left: 0; width: 100%; height: 100%"; if (dir === "fade") { return { duration: DURATION, @@ -67,9 +67,9 @@ function outTransition(_node: HTMLElement): TransitionConfig { } -
+
{#key key} -
+
{@render children()}
{/key} diff --git a/web/src/lib/components/ui/data-table/data-table.svelte.ts b/web/src/lib/components/ui/data-table/data-table.svelte.ts index 5858c90..946f215 100644 --- a/web/src/lib/components/ui/data-table/data-table.svelte.ts +++ b/web/src/lib/components/ui/data-table/data-table.svelte.ts @@ -39,7 +39,7 @@ export function createSvelteTable(options: TableOptions { if (updater instanceof Function) state = updater(state); - else state = mergeObjects(state, updater as Partial); + else state = { ...state, ...(updater as Partial) }; options.onStateChange?.(updater); }, diff --git a/web/src/lib/date.ts b/web/src/lib/date.ts index 60a3df9..e8b7a64 100644 --- a/web/src/lib/date.ts +++ b/web/src/lib/date.ts @@ -7,7 +7,9 @@ export function formatRelativeDate(date: string | Date): string { } /** Returns a full absolute datetime string for tooltip display, e.g. "Jan 29, 2026, 3:45:12 PM". */ -export function formatAbsoluteDate(date: string | Date): string { +export function formatAbsoluteDate(date: string | Date | null | undefined): string { + if (date == null) return "—"; const d = typeof date === "string" ? new Date(date) : date; + if (Number.isNaN(d.getTime())) return "—"; return format(d, "MMM d, yyyy, h:mm:ss a"); } diff --git a/web/src/lib/time.ts b/web/src/lib/time.ts index cb9759b..796174a 100644 --- a/web/src/lib/time.ts +++ b/web/src/lib/time.ts @@ -14,7 +14,7 @@ interface RelativeTimeResult { } /** - * Compute a compact relative time string and the interval until it next changes. + * Format a duration in milliseconds as a compact human-readable string. * * Format tiers: * - < 60s: seconds only ("45s") @@ -22,6 +22,33 @@ interface RelativeTimeResult { * - < 24h: hours + minutes ("1h 23m") * - >= 24h: days only ("3d") */ +export function formatDuration(ms: number): string { + const totalSeconds = Math.floor(Math.abs(ms) / 1000); + + if (totalSeconds < 60) return `${totalSeconds}s`; + + const totalMinutes = Math.floor(totalSeconds / 60); + if (totalMinutes < 60) { + const secs = totalSeconds % 60; + return `${totalMinutes}m ${secs}s`; + } + + const totalHours = Math.floor(totalMinutes / 60); + if (totalHours < 24) { + const mins = totalMinutes % 60; + return `${totalHours}h ${mins}m`; + } + + const days = Math.floor(totalHours / 24); + return `${days}d`; +} + +/** + * Compute a compact relative time string and the interval until it next changes. + * + * Uses {@link formatDuration} for the text, plus computes the optimal refresh + * interval so callers can schedule the next update efficiently. + */ export function relativeTime(date: Date, ref: Date): RelativeTimeResult { const diffMs = ref.getTime() - date.getTime(); const totalSeconds = Math.floor(diffMs / 1000); @@ -30,40 +57,22 @@ export function relativeTime(date: Date, ref: Date): RelativeTimeResult { return { text: "now", nextUpdateMs: 1000 - (diffMs % 1000) || 1000 }; } - if (totalSeconds < 60) { - const remainder = 1000 - (diffMs % 1000); - return { - text: `${totalSeconds}s`, - nextUpdateMs: remainder || 1000, - }; - } + const text = formatDuration(diffMs); + // Compute optimal next-update interval based on the current tier const totalMinutes = Math.floor(totalSeconds / 60); - if (totalMinutes < 60) { - const secs = totalSeconds % 60; - const remainder = 1000 - (diffMs % 1000); - return { - text: `${totalMinutes}m ${secs}s`, - nextUpdateMs: remainder || 1000, - }; - } - const totalHours = Math.floor(totalMinutes / 60); - if (totalHours < 24) { - const mins = totalMinutes % 60; + + let nextUpdateMs: number; + if (totalHours >= 24) { + const msIntoCurrentDay = diffMs % 86_400_000; + nextUpdateMs = 86_400_000 - msIntoCurrentDay || 86_400_000; + } else if (totalMinutes >= 60) { const msIntoCurrentMinute = diffMs % 60_000; - const msUntilNextMinute = 60_000 - msIntoCurrentMinute; - return { - text: `${totalHours}h ${mins}m`, - nextUpdateMs: msUntilNextMinute || 60_000, - }; + nextUpdateMs = 60_000 - msIntoCurrentMinute || 60_000; + } else { + nextUpdateMs = 1000 - (diffMs % 1000) || 1000; } - const days = Math.floor(totalHours / 24); - const msIntoCurrentDay = diffMs % 86_400_000; - const msUntilNextDay = 86_400_000 - msIntoCurrentDay; - return { - text: `${days}d`, - nextUpdateMs: msUntilNextDay || 86_400_000, - }; + return { text, nextUpdateMs }; } diff --git a/web/src/lib/ws.ts b/web/src/lib/ws.ts new file mode 100644 index 0000000..cdf5ffb --- /dev/null +++ b/web/src/lib/ws.ts @@ -0,0 +1,210 @@ +import type { ScrapeJob } from "$lib/api"; + +export type ScrapeJobStatus = "processing" | "staleLock" | "exhausted" | "scheduled" | "pending"; + +export type ScrapeJobEvent = + | { type: "init"; jobs: ScrapeJob[] } + | { type: "jobCreated"; job: ScrapeJob } + | { type: "jobLocked"; id: number; lockedAt: string; status: ScrapeJobStatus } + | { type: "jobCompleted"; id: number } + | { + type: "jobRetried"; + id: number; + retryCount: number; + queuedAt: string; + status: ScrapeJobStatus; + } + | { type: "jobExhausted"; id: number } + | { type: "jobDeleted"; id: number }; + +export type ConnectionState = "connected" | "reconnecting" | "disconnected"; + +const PRIORITY_ORDER: Record = { + critical: 0, + high: 1, + medium: 2, + low: 3, +}; + +const MAX_RECONNECT_DELAY = 30_000; +const MAX_RECONNECT_ATTEMPTS = 10; + +function sortJobs(jobs: Iterable): ScrapeJob[] { + return Array.from(jobs).sort((a, b) => { + const pa = PRIORITY_ORDER[a.priority.toLowerCase()] ?? 2; + const pb = PRIORITY_ORDER[b.priority.toLowerCase()] ?? 2; + if (pa !== pb) return pa - pb; + return new Date(a.executeAt).getTime() - new Date(b.executeAt).getTime(); + }); +} + +export class ScrapeJobsStore { + private ws: WebSocket | null = null; + private jobs = new Map(); + private _connectionState: ConnectionState = "disconnected"; + private _initialized = false; + private onUpdate: () => void; + private reconnectAttempts = 0; + private reconnectTimer: ReturnType | null = null; + private intentionalClose = false; + + /** Cached sorted array, invalidated on data mutations. */ + private cachedJobs: ScrapeJob[] = []; + private cacheDirty = false; + + constructor(onUpdate: () => void) { + this.onUpdate = onUpdate; + } + + getJobs(): ScrapeJob[] { + if (this.cacheDirty) { + this.cachedJobs = sortJobs(this.jobs.values()); + this.cacheDirty = false; + } + return this.cachedJobs; + } + + getConnectionState(): ConnectionState { + return this._connectionState; + } + + isInitialized(): boolean { + return this._initialized; + } + + connect(): void { + this.intentionalClose = false; + const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; + const url = `${protocol}//${window.location.host}/api/admin/scrape-jobs/ws`; + + try { + this.ws = new WebSocket(url); + } catch { + this.scheduleReconnect(); + return; + } + + this.ws.onopen = () => { + this._connectionState = "connected"; + this.reconnectAttempts = 0; + this.onUpdate(); + }; + + this.ws.onmessage = (event) => { + try { + const parsed = JSON.parse(event.data as string) as ScrapeJobEvent; + this.handleEvent(parsed); + } catch { + // Ignore malformed messages + } + }; + + this.ws.onclose = () => { + this.ws = null; + if (!this.intentionalClose) { + this.scheduleReconnect(); + } + }; + + this.ws.onerror = () => { + // onclose will fire after onerror, so reconnect is handled there + }; + } + + handleEvent(event: ScrapeJobEvent): void { + switch (event.type) { + case "init": + this.jobs.clear(); + for (const job of event.jobs) { + this.jobs.set(job.id, job); + } + this._initialized = true; + break; + case "jobCreated": + this.jobs.set(event.job.id, event.job); + break; + case "jobLocked": { + const job = this.jobs.get(event.id); + if (job) { + this.jobs.set(event.id, { ...job, lockedAt: event.lockedAt, status: event.status }); + } + break; + } + case "jobCompleted": + this.jobs.delete(event.id); + break; + case "jobRetried": { + const job = this.jobs.get(event.id); + if (job) { + this.jobs.set(event.id, { + ...job, + retryCount: event.retryCount, + queuedAt: event.queuedAt, + status: event.status, + lockedAt: null, + }); + } + break; + } + case "jobExhausted": { + const job = this.jobs.get(event.id); + if (job) { + this.jobs.set(event.id, { ...job, status: "exhausted" }); + } + break; + } + case "jobDeleted": + this.jobs.delete(event.id); + break; + } + this.cacheDirty = true; + this.onUpdate(); + } + + disconnect(): void { + this.intentionalClose = true; + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + if (this.ws) { + this.ws.close(); + this.ws = null; + } + this._connectionState = "disconnected"; + this.onUpdate(); + } + + resync(): void { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ type: "resync" })); + } + } + + /** Attempt to reconnect after being disconnected. Resets attempt counter. */ + retry(): void { + this.reconnectAttempts = 0; + this._connectionState = "reconnecting"; + this.onUpdate(); + this.connect(); + } + + private scheduleReconnect(): void { + if (this.reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { + this._connectionState = "disconnected"; + this.onUpdate(); + return; + } + + this._connectionState = "reconnecting"; + this.onUpdate(); + + const delay = Math.min(1000 * 2 ** this.reconnectAttempts, MAX_RECONNECT_DELAY); + this.reconnectAttempts++; + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.connect(); + }, delay); + } +} diff --git a/web/src/routes/(app)/+layout.svelte b/web/src/routes/(app)/+layout.svelte index 53a15bd..02d61a9 100644 --- a/web/src/routes/(app)/+layout.svelte +++ b/web/src/routes/(app)/+layout.svelte @@ -3,6 +3,7 @@ import { goto } from "$app/navigation"; import { page } from "$app/state"; import { authStore } from "$lib/auth.svelte"; import PageTransition from "$lib/components/PageTransition.svelte"; +import ErrorBoundaryFallback from "$lib/components/ErrorBoundaryFallback.svelte"; import { ClipboardList, FileText, @@ -12,10 +13,32 @@ import { User, Users, } from "@lucide/svelte"; -import { onMount } from "svelte"; +import { onMount, tick } from "svelte"; let { children } = $props(); +// Track boundary reset function so navigation can auto-clear errors +let boundaryReset = $state<(() => void) | null>(null); +let errorPathname = $state(null); + +function onBoundaryError(e: unknown, reset: () => void) { + console.error("[page boundary]", e); + boundaryReset = reset; + errorPathname = page.url.pathname; +} + +// Auto-reset the boundary only when the user navigates away from the errored page +$effect(() => { + const currentPath = page.url.pathname; + + if (boundaryReset && errorPathname && currentPath !== errorPathname) { + const reset = boundaryReset; + boundaryReset = null; + errorPathname = null; + tick().then(() => reset()); + } +}); + onMount(async () => { if (authStore.isLoading) { await authStore.init(); @@ -115,9 +138,15 @@ function isActive(href: string): boolean {
- - {@render children()} - + + + {@render children()} + + + {#snippet failed(error, reset)} + + {/snippet} +
diff --git a/web/src/routes/(app)/admin/jobs/+page.svelte b/web/src/routes/(app)/admin/jobs/+page.svelte index e80fb5f..75f3540 100644 --- a/web/src/routes/(app)/admin/jobs/+page.svelte +++ b/web/src/routes/(app)/admin/jobs/+page.svelte @@ -1,9 +1,9 @@ -

Scrape Jobs

+
+

Scrape Jobs

+
+ {#if connectionState === "connected"} + + + Live + + {:else if connectionState === "reconnecting"} + + + Reconnecting... + + {:else} + + + + Disconnected + + + + {/if} +
+
{#if error}

{error}

{:else}
- + +
{#each table.getHeaderGroups() as headerGroup} {#each headerGroup.headers as header} {/each} - {#if !data} + {#if !initialized} {#each Array(5) as _} {#each columns as col} - {/each} {/each} - {:else if data.jobs.length === 0} + {:else if jobs.length === 0} {:else} - {#each table.getRowModel().rows as row} + {#each table.getRowModel().rows as row (row.id)} {@const job = row.original} - + {@const sc = statusColor(job.status)} + {@const timingDisplay = getTimingDisplay(job, tick)} + {#each row.getVisibleCells() as cell (cell.id)} {@const colId = cell.column.id} {#if colId === "id"} - + + {:else if colId === "status"} + {:else if colId === "targetType"} - + {:else if colId === "details"} + {:else if colId === "priority"} - - {:else if colId === "executeAt"} - - {:else if colId === "createdAt"} - - {:else if colId === "retries"} - - {:else if colId === "status"} - {/if} {/each} @@ -263,4 +510,13 @@ const skeletonWidths: Record = { {/if}
= {
+
@@ -193,68 +445,63 @@ const skeletonWidths: Record = {
{job.id}{job.id} + + + + {formatStatusLabel(job.status)} + {#if job.maxRetries > 0} + + {job.retryCount}/{job.maxRetries} retries + + {/if} + + + + {job.targetType} + {formatJobDetails(job, subjectMap)} + + {job.priority} - - - {formatRelativeDate(job.executeAt)} + {:else if colId === "timing"} + + + + {#if timingDisplay.icon === "warning"} + + {/if} - - - - - {formatRelativeDate(job.createdAt)} - - - - - {job.retryCount}/{job.maxRetries} + {timingDisplay.text} - {#if job.lockedAt} - - - - Locked - - - {:else} - - - Pending - - {/if} -
+ + {#if tooltipText !== null} +
+ {tooltipText} +
+ {/if} {/if} diff --git a/web/src/routes/+layout.svelte b/web/src/routes/+layout.svelte index 2d297f8..b1d5793 100644 --- a/web/src/routes/+layout.svelte +++ b/web/src/routes/+layout.svelte @@ -8,6 +8,7 @@ import { useOverlayScrollbars } from "$lib/composables/useOverlayScrollbars.svel import { initNavigation } from "$lib/stores/navigation.svelte"; import { themeStore } from "$lib/stores/theme.svelte"; import { Tooltip } from "bits-ui"; +import ErrorBoundaryFallback from "$lib/components/ErrorBoundaryFallback.svelte"; import { onMount } from "svelte"; let { children } = $props(); @@ -40,8 +41,14 @@ onMount(() => {
- - {@render children()} - + console.error("[root boundary]", e)}> + + {@render children()} + + + {#snippet failed(error, reset)} + + {/snippet} +
diff --git a/web/vite.config.ts b/web/vite.config.ts index ff9b447..d3fda57 100644 --- a/web/vite.config.ts +++ b/web/vite.config.ts @@ -38,6 +38,7 @@ export default defineConfig({ target: "http://localhost:8080", changeOrigin: true, secure: false, + ws: true, }, }, },