feat: add websocket support for real-time scrape job monitoring

This commit is contained in:
2026-01-29 19:31:04 -06:00
parent d861888e5e
commit cfe098d193
26 changed files with 1118 additions and 173 deletions
+1
View File
@@ -126,6 +126,7 @@ impl App {
self.banner_api.clone(),
self.app_state.reference_cache.clone(),
self.app_state.service_statuses.clone(),
self.app_state.scrape_job_tx.clone(),
));
self.service_manager
.register_service(ServiceName::Scraper.as_str(), scraper_service);
+35
View File
@@ -176,6 +176,20 @@ pub enum TargetType {
SingleCrn,
}
/// Computed status for a scrape job, derived from existing fields.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum ScrapeJobStatus {
Processing,
StaleLock,
Exhausted,
Scheduled,
Pending,
}
/// How long a lock can be held before it is considered stale (mirrors `scrape_jobs::LOCK_EXPIRY`).
const LOCK_EXPIRY_SECS: i64 = 10 * 60;
/// Represents a queryable job from the database.
#[allow(dead_code)]
#[derive(sqlx::FromRow, Debug, Clone)]
@@ -191,6 +205,27 @@ pub struct ScrapeJob {
pub retry_count: i32,
/// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint)
pub max_retries: i32,
/// When the job last entered the "ready to pick up" state.
/// Set to NOW() on creation; updated to NOW() on retry.
pub queued_at: DateTime<Utc>,
}
impl ScrapeJob {
/// Compute the current status of this job from its fields.
pub fn status(&self) -> ScrapeJobStatus {
let now = Utc::now();
match self.locked_at {
Some(locked) if (now - locked).num_seconds() < LOCK_EXPIRY_SECS => {
ScrapeJobStatus::Processing
}
Some(_) => ScrapeJobStatus::StaleLock,
None if self.retry_count >= self.max_retries && self.max_retries > 0 => {
ScrapeJobStatus::Exhausted
}
None if self.execute_at > now => ScrapeJobStatus::Scheduled,
None => ScrapeJobStatus::Pending,
}
}
}
/// A user authenticated via Discord OAuth.
+24 -20
View File
@@ -13,9 +13,11 @@ use std::collections::HashSet;
/// # Returns
/// The number of jobs that were unlocked.
pub async fn force_unlock_all(db_pool: &PgPool) -> Result<u64> {
let result = sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE locked_at IS NOT NULL")
.execute(db_pool)
.await?;
let result = sqlx::query(
"UPDATE scrape_jobs SET locked_at = NULL, queued_at = NOW() WHERE locked_at IS NOT NULL",
)
.execute(db_pool)
.await?;
Ok(result.rows_affected())
}
@@ -97,10 +99,11 @@ pub async fn unlock_job(job_id: i32, db_pool: &PgPool) -> Result<()> {
Ok(())
}
/// Atomically unlock a job and increment its retry count.
/// Atomically unlock a job, increment its retry count, and reset `queued_at`.
///
/// Returns whether the job still has retries remaining. This is determined
/// atomically in the database to avoid race conditions between workers.
/// Returns the new `queued_at` timestamp if retries remain, or `None` if
/// the job has exhausted its retries. This is determined atomically in the
/// database to avoid race conditions between workers.
///
/// # Arguments
/// * `job_id` - The database ID of the job
@@ -108,25 +111,25 @@ pub async fn unlock_job(job_id: i32, db_pool: &PgPool) -> Result<()> {
/// * `db_pool` - PostgreSQL connection pool
///
/// # Returns
/// * `Ok(true)` if the job was unlocked and retries remain
/// * `Ok(false)` if the job has exhausted its retries
/// * `Ok(Some(queued_at))` if the job was unlocked and retries remain
/// * `Ok(None)` if the job has exhausted its retries
pub async fn unlock_and_increment_retry(
job_id: i32,
max_retries: i32,
db_pool: &PgPool,
) -> Result<bool> {
let result = sqlx::query_scalar::<_, Option<i32>>(
) -> Result<Option<chrono::DateTime<chrono::Utc>>> {
let result = sqlx::query_scalar::<_, Option<chrono::DateTime<chrono::Utc>>>(
"UPDATE scrape_jobs
SET locked_at = NULL, retry_count = retry_count + 1
SET locked_at = NULL, retry_count = retry_count + 1, queued_at = NOW()
WHERE id = $1
RETURNING CASE WHEN retry_count <= $2 THEN retry_count ELSE NULL END",
RETURNING CASE WHEN retry_count <= $2 THEN queued_at ELSE NULL END",
)
.bind(job_id)
.bind(max_retries)
.fetch_one(db_pool)
.await?;
Ok(result.is_some())
Ok(result)
}
/// Find existing job payloads matching the given target type and candidates.
@@ -173,9 +176,9 @@ pub async fn find_existing_job_payloads(
pub async fn batch_insert_jobs(
jobs: &[(serde_json::Value, TargetType, ScrapePriority)],
db_pool: &PgPool,
) -> Result<()> {
) -> Result<Vec<ScrapeJob>> {
if jobs.is_empty() {
return Ok(());
return Ok(Vec::new());
}
let mut target_types: Vec<String> = Vec::with_capacity(jobs.len());
@@ -188,19 +191,20 @@ pub async fn batch_insert_jobs(
priorities.push(format!("{priority:?}"));
}
sqlx::query(
let inserted = sqlx::query_as::<_, ScrapeJob>(
r#"
INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at)
SELECT v.target_type::target_type, v.payload, v.priority::scrape_priority, NOW()
INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at, queued_at)
SELECT v.target_type::target_type, v.payload, v.priority::scrape_priority, NOW(), NOW()
FROM UNNEST($1::text[], $2::jsonb[], $3::text[])
AS v(target_type, payload, priority)
RETURNING *
"#,
)
.bind(&target_types)
.bind(&payloads)
.bind(&priorities)
.execute(db_pool)
.fetch_all(db_pool)
.await?;
Ok(())
Ok(inserted)
}
+11 -1
View File
@@ -7,6 +7,7 @@ use crate::data::scrape_jobs;
use crate::services::Service;
use crate::state::ReferenceCache;
use crate::status::{ServiceStatus, ServiceStatusRegistry};
use crate::web::ws::ScrapeJobEvent;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
@@ -25,6 +26,7 @@ pub struct ScraperService {
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
service_statuses: ServiceStatusRegistry,
job_events_tx: broadcast::Sender<ScrapeJobEvent>,
scheduler_handle: Option<JoinHandle<()>>,
worker_handles: Vec<JoinHandle<()>>,
shutdown_tx: Option<broadcast::Sender<()>>,
@@ -37,12 +39,14 @@ impl ScraperService {
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
service_statuses: ServiceStatusRegistry,
job_events_tx: broadcast::Sender<ScrapeJobEvent>,
) -> Self {
Self {
db_pool,
banner_api,
reference_cache,
service_statuses,
job_events_tx,
scheduler_handle: None,
worker_handles: Vec::new(),
shutdown_tx: None,
@@ -71,6 +75,7 @@ impl ScraperService {
self.db_pool.clone(),
self.banner_api.clone(),
self.reference_cache.clone(),
self.job_events_tx.clone(),
);
let shutdown_rx = shutdown_tx.subscribe();
let scheduler_handle = tokio::spawn(async move {
@@ -81,7 +86,12 @@ impl ScraperService {
let worker_count = 4; // This could be configurable
for i in 0..worker_count {
let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone());
let worker = Worker::new(
i,
self.db_pool.clone(),
self.banner_api.clone(),
self.job_events_tx.clone(),
);
let shutdown_rx = shutdown_tx.subscribe();
let worker_handle = tokio::spawn(async move {
worker.run(shutdown_rx).await;
+21 -3
View File
@@ -5,6 +5,7 @@ use crate::error::Result;
use crate::rmp::RmpClient;
use crate::scraper::jobs::subject::SubjectJob;
use crate::state::ReferenceCache;
use crate::web::ws::{ScrapeJobDto, ScrapeJobEvent};
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
@@ -25,6 +26,7 @@ pub struct Scheduler {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
job_events_tx: broadcast::Sender<ScrapeJobEvent>,
}
impl Scheduler {
@@ -32,11 +34,13 @@ impl Scheduler {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
job_events_tx: broadcast::Sender<ScrapeJobEvent>,
) -> Self {
Self {
db_pool,
banner_api,
reference_cache,
job_events_tx,
}
}
@@ -74,6 +78,7 @@ impl Scheduler {
let banner_api = self.banner_api.clone();
let cancel_token = cancel_token.clone();
let reference_cache = self.reference_cache.clone();
let job_events_tx = self.job_events_tx.clone();
async move {
tokio::select! {
@@ -99,7 +104,7 @@ impl Scheduler {
tokio::join!(rmp_fut, ref_fut);
if let Err(e) = Self::schedule_jobs_impl(&db_pool, &banner_api).await {
if let Err(e) = Self::schedule_jobs_impl(&db_pool, &banner_api, Some(&job_events_tx)).await {
error!(error = ?e, "Failed to schedule jobs");
}
} => {}
@@ -150,7 +155,11 @@ impl Scheduler {
///
/// This is a static method (not &self) to allow it to be called from spawned tasks.
#[tracing::instrument(skip_all, fields(term))]
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
async fn schedule_jobs_impl(
db_pool: &PgPool,
banner_api: &BannerApi,
job_events_tx: Option<&broadcast::Sender<ScrapeJobEvent>>,
) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API.
// 2. Query existing jobs for all subjects in a single query.
@@ -213,7 +222,16 @@ impl Scheduler {
.map(|(payload, _)| (payload, TargetType::Subject, ScrapePriority::Low))
.collect();
scrape_jobs::batch_insert_jobs(&jobs, db_pool).await?;
let inserted = scrape_jobs::batch_insert_jobs(&jobs, db_pool).await?;
if let Some(tx) = job_events_tx {
inserted.iter().for_each(|job| {
debug!(job_id = job.id, "Emitting JobCreated event");
let _ = tx.send(ScrapeJobEvent::JobCreated {
job: ScrapeJobDto::from(job),
});
});
}
}
debug!("Job scheduling complete");
+49 -5
View File
@@ -1,8 +1,10 @@
use crate::banner::{BannerApi, BannerApiError};
use crate::data::models::ScrapeJob;
use crate::data::models::{ScrapeJob, ScrapeJobStatus};
use crate::data::scrape_jobs;
use crate::error::Result;
use crate::scraper::jobs::{JobError, JobType};
use crate::web::ws::ScrapeJobEvent;
use chrono::Utc;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
@@ -21,14 +23,21 @@ pub struct Worker {
id: usize, // For logging purposes
db_pool: PgPool,
banner_api: Arc<BannerApi>,
job_events_tx: broadcast::Sender<ScrapeJobEvent>,
}
impl Worker {
pub fn new(id: usize, db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
pub fn new(
id: usize,
db_pool: PgPool,
banner_api: Arc<BannerApi>,
job_events_tx: broadcast::Sender<ScrapeJobEvent>,
) -> Self {
Self {
id,
db_pool,
banner_api,
job_events_tx,
}
}
@@ -65,6 +74,15 @@ impl Worker {
let max_retries = job.max_retries;
let start = std::time::Instant::now();
// Emit JobLocked event
let locked_at = Utc::now().to_rfc3339();
debug!(job_id, "Emitting JobLocked event");
let _ = self.job_events_tx.send(ScrapeJobEvent::JobLocked {
id: job_id,
locked_at,
status: ScrapeJobStatus::Processing,
});
// Process the job, racing against shutdown signal and timeout
let process_result = tokio::select! {
_ = shutdown_rx.recv() => {
@@ -143,7 +161,11 @@ impl Worker {
scrape_jobs::unlock_job(job_id, &self.db_pool).await
}
async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result<bool> {
async fn unlock_and_increment_retry(
&self,
job_id: i32,
max_retries: i32,
) -> Result<Option<chrono::DateTime<chrono::Utc>>> {
scrape_jobs::unlock_and_increment_retry(job_id, max_retries, &self.db_pool).await
}
@@ -188,6 +210,10 @@ impl Worker {
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
}
debug!(job_id, "Emitting JobCompleted event");
let _ = self
.job_events_tx
.send(ScrapeJobEvent::JobCompleted { id: job_id });
}
Err(JobError::Recoverable(e)) => {
self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration)
@@ -204,6 +230,10 @@ impl Worker {
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
}
debug!(job_id, "Emitting JobDeleted event");
let _ = self
.job_events_tx
.send(ScrapeJobEvent::JobDeleted { id: job_id });
}
}
}
@@ -246,7 +276,7 @@ impl Worker {
// Atomically unlock and increment retry count, checking if retry is allowed
match self.unlock_and_increment_retry(job_id, max_retries).await {
Ok(can_retry) if can_retry => {
Ok(Some(queued_at)) => {
debug!(
worker_id = self.id,
job_id,
@@ -254,8 +284,15 @@ impl Worker {
remaining_retries = remaining_retries,
"Job unlocked for retry"
);
debug!(job_id, "Emitting JobRetried event");
let _ = self.job_events_tx.send(ScrapeJobEvent::JobRetried {
id: job_id,
retry_count: next_attempt,
queued_at: queued_at.to_rfc3339(),
status: ScrapeJobStatus::Pending,
});
}
Ok(_) => {
Ok(None) => {
// Max retries exceeded (detected atomically)
error!(
worker_id = self.id,
@@ -269,6 +306,13 @@ impl Worker {
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job");
}
debug!(job_id, "Emitting JobExhausted and JobDeleted events");
let _ = self
.job_events_tx
.send(ScrapeJobEvent::JobExhausted { id: job_id });
let _ = self
.job_events_tx
.send(ScrapeJobEvent::JobDeleted { id: job_id });
}
Err(e) => {
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count");
+10 -1
View File
@@ -5,11 +5,12 @@ use crate::banner::Course;
use crate::data::models::ReferenceData;
use crate::status::ServiceStatusRegistry;
use crate::web::session_cache::{OAuthStateStore, SessionCache};
use crate::web::ws::ScrapeJobEvent;
use anyhow::Result;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, broadcast};
/// In-memory cache for reference data (code→description lookups).
///
@@ -75,10 +76,12 @@ pub struct AppState {
pub reference_cache: Arc<RwLock<ReferenceCache>>,
pub session_cache: SessionCache,
pub oauth_state_store: OAuthStateStore,
pub scrape_job_tx: broadcast::Sender<ScrapeJobEvent>,
}
impl AppState {
pub fn new(banner_api: Arc<BannerApi>, db_pool: PgPool) -> Self {
let (scrape_job_tx, _) = broadcast::channel(64);
Self {
session_cache: SessionCache::new(db_pool.clone()),
oauth_state_store: OAuthStateStore::new(),
@@ -86,9 +89,15 @@ impl AppState {
db_pool,
service_statuses: ServiceStatusRegistry::new(),
reference_cache: Arc::new(RwLock::new(ReferenceCache::new())),
scrape_job_tx,
}
}
/// Subscribe to scrape job lifecycle events.
pub fn scrape_job_events(&self) -> broadcast::Receiver<ScrapeJobEvent> {
self.scrape_job_tx.subscribe()
}
/// Initialize the reference cache from the database.
pub async fn load_reference_cache(&self) -> Result<()> {
let entries = crate::data::reference::get_all(&self.db_pool).await?;
+2
View File
@@ -163,6 +163,8 @@ pub async fn list_scrape_jobs(
"lockedAt": j.locked_at.map(|t| t.to_rfc3339()),
"retryCount": j.retry_count,
"maxRetries": j.max_retries,
"queuedAt": j.queued_at.to_rfc3339(),
"status": j.status(),
})
})
.collect();
+1
View File
@@ -9,5 +9,6 @@ pub mod encoding;
pub mod extractors;
pub mod routes;
pub mod session_cache;
pub mod ws;
pub use routes::*;
+2
View File
@@ -11,6 +11,7 @@ use axum::{
use crate::web::admin;
use crate::web::auth::{self, AuthConfig};
use crate::web::ws;
#[cfg(feature = "embed-assets")]
use axum::{
http::{HeaderMap, StatusCode, Uri},
@@ -63,6 +64,7 @@ pub fn create_router(app_state: AppState, auth_config: AuthConfig) -> Router {
put(admin::set_user_admin),
)
.route("/admin/scrape-jobs", get(admin::list_scrape_jobs))
.route("/admin/scrape-jobs/ws", get(ws::scrape_jobs_ws))
.route("/admin/audit-log", get(admin::list_audit_log))
.with_state(app_state);
+205
View File
@@ -0,0 +1,205 @@
//! WebSocket event types and handler for real-time scrape job updates.
use axum::{
extract::{
State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
response::IntoResponse,
};
use futures::{SinkExt, StreamExt};
use serde::Serialize;
use sqlx::PgPool;
use tokio::sync::broadcast;
use tracing::debug;
use crate::data::models::{ScrapeJob, ScrapeJobStatus};
use crate::state::AppState;
use crate::web::extractors::AdminUser;
/// A serializable DTO for `ScrapeJob` with computed `status`.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ScrapeJobDto {
pub id: i32,
pub target_type: String,
pub target_payload: serde_json::Value,
pub priority: String,
pub execute_at: String,
pub created_at: String,
pub locked_at: Option<String>,
pub retry_count: i32,
pub max_retries: i32,
pub queued_at: String,
pub status: ScrapeJobStatus,
}
impl From<&ScrapeJob> for ScrapeJobDto {
fn from(job: &ScrapeJob) -> Self {
Self {
id: job.id,
target_type: format!("{:?}", job.target_type),
target_payload: job.target_payload.clone(),
priority: format!("{:?}", job.priority),
execute_at: job.execute_at.to_rfc3339(),
created_at: job.created_at.to_rfc3339(),
locked_at: job.locked_at.map(|t| t.to_rfc3339()),
retry_count: job.retry_count,
max_retries: job.max_retries,
queued_at: job.queued_at.to_rfc3339(),
status: job.status(),
}
}
}
/// Events broadcast when scrape job state changes.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ScrapeJobEvent {
Init {
jobs: Vec<ScrapeJobDto>,
},
JobCreated {
job: ScrapeJobDto,
},
JobLocked {
id: i32,
locked_at: String,
status: ScrapeJobStatus,
},
JobCompleted {
id: i32,
},
JobRetried {
id: i32,
retry_count: i32,
queued_at: String,
status: ScrapeJobStatus,
},
JobExhausted {
id: i32,
},
JobDeleted {
id: i32,
},
}
/// Fetch current scrape jobs from the DB and build an `Init` event.
async fn build_init_event(db_pool: &PgPool) -> Result<ScrapeJobEvent, sqlx::Error> {
let rows = sqlx::query_as::<_, ScrapeJob>(
"SELECT * FROM scrape_jobs ORDER BY priority DESC, execute_at ASC LIMIT 100",
)
.fetch_all(db_pool)
.await?;
let jobs = rows.iter().map(ScrapeJobDto::from).collect();
Ok(ScrapeJobEvent::Init { jobs })
}
/// WebSocket endpoint for real-time scrape job updates.
///
/// Auth is checked via `AdminUser` before the upgrade occurs — if rejected,
/// a 401/403 is returned and the upgrade never happens.
pub async fn scrape_jobs_ws(
ws: WebSocketUpgrade,
AdminUser(_user): AdminUser,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_scrape_jobs_ws(socket, state))
}
/// Serialize an event and send it over the WebSocket sink.
/// Returns `true` if the message was sent, `false` if the client disconnected.
async fn send_event(
sink: &mut futures::stream::SplitSink<WebSocket, Message>,
event: &ScrapeJobEvent,
) -> bool {
let Ok(json) = serde_json::to_string(event) else {
return true; // serialization failed, but connection is still alive
};
sink.send(Message::Text(json.into())).await.is_ok()
}
async fn handle_scrape_jobs_ws(socket: WebSocket, state: AppState) {
debug!("scrape-jobs WebSocket connected");
let (mut sink, mut stream) = socket.split();
// Send initial state
let init_event = match build_init_event(&state.db_pool).await {
Ok(event) => event,
Err(e) => {
debug!(error = %e, "failed to build init event, closing WebSocket");
return;
}
};
if !send_event(&mut sink, &init_event).await {
debug!("client disconnected during init send");
return;
}
// Subscribe to broadcast events
let mut rx = state.scrape_job_events();
loop {
tokio::select! {
result = rx.recv() => {
match result {
Ok(ref event) => {
if !send_event(&mut sink, event).await {
debug!("client disconnected during event send");
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!(missed = n, "broadcast lagged, resyncing");
match build_init_event(&state.db_pool).await {
Ok(ref event) => {
if !send_event(&mut sink, event).await {
debug!("client disconnected during resync send");
break;
}
}
Err(e) => {
debug!(error = %e, "failed to build resync init event");
}
}
}
Err(broadcast::error::RecvError::Closed) => {
debug!("broadcast channel closed");
break;
}
}
}
msg = stream.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text)
&& parsed.get("type").and_then(|t| t.as_str()) == Some("resync")
{
debug!("client requested resync");
match build_init_event(&state.db_pool).await {
Ok(ref event) => {
if !send_event(&mut sink, event).await {
debug!("client disconnected during resync send");
break;
}
}
Err(e) => {
debug!(error = %e, "failed to build resync init event");
}
}
}
}
Some(Ok(Message::Close(_))) | None => {
debug!("client disconnected");
break;
}
_ => {}
}
}
}
}
debug!("scrape-jobs WebSocket disconnected");
}