From 9fed6516415bb11bd065fcbdab8721bd51eb9eaa Mon Sep 17 00:00:00 2001 From: Xevion Date: Fri, 30 Jan 2026 02:14:37 -0600 Subject: [PATCH] feat: add adaptive scheduling and scraper admin endpoints Subjects now have individually calculated scrape intervals based on their historical change ratio, consecutive zero-change runs, failure counts, and the current time of day. This reduces unnecessary scrapes during inactive periods while maintaining responsiveness during peak hours. Includes four new admin endpoints for monitoring scraper health and scheduling decisions. --- Cargo.lock | 35 ++++ Cargo.toml | 1 + src/data/scrape_jobs.rs | 64 ++++++ src/scraper/adaptive.rs | 326 +++++++++++++++++++++++++++++ src/scraper/mod.rs | 1 + src/scraper/scheduler.rs | 91 +++++++-- src/web/admin_scraper.rs | 431 +++++++++++++++++++++++++++++++++++++++ src/web/mod.rs | 1 + src/web/routes.rs | 14 ++ 9 files changed, 946 insertions(+), 18 deletions(-) create mode 100644 src/scraper/adaptive.rs create mode 100644 src/web/admin_scraper.rs diff --git a/Cargo.lock b/Cargo.lock index 581efdb..43e2062 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,7 @@ dependencies = [ "axum-extra", "bitflags 2.9.4", "chrono", + "chrono-tz", "clap", "compile-time", "cookie", @@ -484,6 +485,16 @@ dependencies = [ "windows-link 0.2.0", ] +[[package]] +name = "chrono-tz" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" +dependencies = [ + "chrono", + "phf", +] + [[package]] name = "clap" version = "4.5.47" @@ -2127,6 +2138,24 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2983,6 +3012,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "skeptic" version = "0.13.7" diff --git a/Cargo.toml b/Cargo.toml index 31f94bf..a261c67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ ts-rs = { version = "11.1.0", features = ["chrono-impl", "serde-compat", "serde- html-escape = "0.2.13" axum-extra = { version = "0.12.5", features = ["query"] } urlencoding = "2.1.3" +chrono-tz = "0.10.4" [dev-dependencies] diff --git a/src/data/scrape_jobs.rs b/src/data/scrape_jobs.rs index 3f0cf93..52dbbe3 100644 --- a/src/data/scrape_jobs.rs +++ b/src/data/scrape_jobs.rs @@ -213,6 +213,70 @@ pub async fn insert_job_result( Ok(()) } +/// Per-subject aggregated stats from recent scrape results. +/// +/// Populated by [`fetch_subject_stats`] and converted into +/// [`crate::scraper::adaptive::SubjectStats`] for interval computation. +#[derive(sqlx::FromRow, Debug, Clone)] +pub struct SubjectResultStats { + pub subject: String, + pub recent_runs: i64, + pub avg_change_ratio: f64, + pub consecutive_zero_changes: i64, + pub consecutive_empty_fetches: i64, + pub recent_failure_count: i64, + pub recent_success_count: i64, + pub last_completed: DateTime, +} + +/// Fetch aggregated per-subject statistics from the last 24 hours of results. +/// +/// For each subject, examines the 20 most recent results and computes: +/// - Average change ratio (courses_changed / courses_fetched) +/// - Consecutive zero-change runs from the most recent result +/// - Consecutive empty-fetch runs from the most recent result +/// - Failure and success counts +/// - Last completion timestamp +pub async fn fetch_subject_stats(db_pool: &PgPool) -> Result> { + let rows = sqlx::query_as::<_, SubjectResultStats>( + r#" + WITH recent AS ( + SELECT payload->>'subject' AS subject, success, + COALESCE(courses_fetched, 0) AS courses_fetched, + COALESCE(courses_changed, 0) AS courses_changed, + completed_at, + ROW_NUMBER() OVER (PARTITION BY payload->>'subject' ORDER BY completed_at DESC) AS rn + FROM scrape_job_results + WHERE target_type = 'Subject' AND completed_at > NOW() - INTERVAL '24 hours' + ), + filtered AS (SELECT * FROM recent WHERE rn <= 20), + zero_break AS ( + SELECT subject, + MIN(rn) FILTER (WHERE courses_changed > 0 AND success) AS first_nonzero_rn, + MIN(rn) FILTER (WHERE courses_fetched > 0 AND success) AS first_nonempty_rn + FROM filtered GROUP BY subject + ) + SELECT + f.subject::TEXT AS "subject!", + COUNT(*)::BIGINT AS "recent_runs!", + COALESCE(AVG(CASE WHEN f.success AND f.courses_fetched > 0 + THEN f.courses_changed::FLOAT / f.courses_fetched ELSE NULL END), 0.0)::FLOAT8 AS "avg_change_ratio!", + COALESCE(zb.first_nonzero_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_changed = 0))::BIGINT AS "consecutive_zero_changes!", + COALESCE(zb.first_nonempty_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_fetched = 0))::BIGINT AS "consecutive_empty_fetches!", + COUNT(*) FILTER (WHERE NOT f.success)::BIGINT AS "recent_failure_count!", + COUNT(*) FILTER (WHERE f.success)::BIGINT AS "recent_success_count!", + MAX(f.completed_at) AS "last_completed!" + FROM filtered f + LEFT JOIN zero_break zb ON f.subject = zb.subject + GROUP BY f.subject, zb.first_nonzero_rn, zb.first_nonempty_rn + "#, + ) + .fetch_all(db_pool) + .await?; + + Ok(rows) +} + /// Batch insert scrape jobs using UNNEST for a single round-trip. /// /// All jobs are inserted with `execute_at` set to the current time. diff --git a/src/scraper/adaptive.rs b/src/scraper/adaptive.rs new file mode 100644 index 0000000..0bb0ff0 --- /dev/null +++ b/src/scraper/adaptive.rs @@ -0,0 +1,326 @@ +//! Adaptive scraping interval computation. +//! +//! Assigns per-subject scrape intervals based on recent change rates, +//! consecutive zero-change runs, failure patterns, and time of day. + +use chrono::{DateTime, Datelike, Timelike, Utc}; +use chrono_tz::US::Central; +use std::time::Duration; + +use crate::data::scrape_jobs::SubjectResultStats; + +const FLOOR_INTERVAL: Duration = Duration::from_secs(3 * 60); +const MODERATE_HIGH_INTERVAL: Duration = Duration::from_secs(5 * 60); +const MODERATE_LOW_INTERVAL: Duration = Duration::from_secs(15 * 60); +const LOW_CHANGE_INTERVAL: Duration = Duration::from_secs(30 * 60); +const ZERO_5_INTERVAL: Duration = Duration::from_secs(60 * 60); +const ZERO_10_INTERVAL: Duration = Duration::from_secs(2 * 60 * 60); +const CEILING_INTERVAL: Duration = Duration::from_secs(4 * 60 * 60); +const COLD_START_INTERVAL: Duration = FLOOR_INTERVAL; +const PAUSE_PROBE_INTERVAL: Duration = Duration::from_secs(6 * 60 * 60); +const EMPTY_FETCH_PAUSE_THRESHOLD: i64 = 3; +const FAILURE_PAUSE_THRESHOLD: i64 = 5; + +/// Aggregated per-subject statistics derived from recent scrape results. +#[derive(Debug, Clone)] +pub struct SubjectStats { + pub subject: String, + pub recent_runs: i64, + pub avg_change_ratio: f64, + pub consecutive_zero_changes: i64, + pub consecutive_empty_fetches: i64, + pub recent_failure_count: i64, + pub recent_success_count: i64, + pub last_completed: DateTime, +} + +/// Scheduling decision for a subject. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubjectSchedule { + /// Subject is due for scraping, with the computed interval. + Eligible(Duration), + /// Subject was scraped recently; wait for the remaining cooldown. + Cooldown(Duration), + /// Subject is paused due to repeated empty fetches or failures. + Paused, + /// Subject belongs to a past term and should not be scraped. + ReadOnly, +} + +impl From for SubjectStats { + fn from(row: SubjectResultStats) -> Self { + Self { + subject: row.subject, + recent_runs: row.recent_runs, + avg_change_ratio: row.avg_change_ratio, + consecutive_zero_changes: row.consecutive_zero_changes, + consecutive_empty_fetches: row.consecutive_empty_fetches, + recent_failure_count: row.recent_failure_count, + recent_success_count: row.recent_success_count, + last_completed: row.last_completed, + } + } +} + +/// Compute the base interval tier from change-rate statistics. +pub fn compute_base_interval(stats: &SubjectStats) -> Duration { + if stats.recent_runs == 0 { + return COLD_START_INTERVAL; + } + + // Consecutive-zero tiers take precedence when change ratio is near zero + if stats.avg_change_ratio < 0.001 { + return match stats.consecutive_zero_changes { + 0..5 => LOW_CHANGE_INTERVAL, + 5..10 => ZERO_5_INTERVAL, + 10..20 => ZERO_10_INTERVAL, + _ => CEILING_INTERVAL, + }; + } + + match stats.avg_change_ratio { + r if r >= 0.10 => FLOOR_INTERVAL, + r if r >= 0.05 => MODERATE_HIGH_INTERVAL, + r if r >= 0.01 => MODERATE_LOW_INTERVAL, + _ => LOW_CHANGE_INTERVAL, + } +} + +/// Return a time-of-day multiplier for the given UTC timestamp. +/// +/// Peak hours (weekdays 8am-6pm CT) return 1; off-peak (weekdays 6pm-midnight CT) +/// return 2; night (midnight-8am CT) and weekends return 4. +pub fn time_of_day_multiplier(now: DateTime) -> u32 { + let ct = now.with_timezone(&Central); + let weekday = ct.weekday(); + let hour = ct.hour(); + + // Weekends get the slowest multiplier + if matches!(weekday, chrono::Weekday::Sat | chrono::Weekday::Sun) { + return 4; + } + + match hour { + 8..18 => 1, // peak + 18..24 => 2, // off-peak + _ => 4, // night (0..8) + } +} + +/// Evaluate whether a subject should be scraped now. +/// +/// Combines base interval, time-of-day multiplier, pause detection (empty +/// fetches / consecutive failures), and past-term read-only status. +pub fn evaluate_subject( + stats: &SubjectStats, + now: DateTime, + is_past_term: bool, +) -> SubjectSchedule { + if is_past_term { + return SubjectSchedule::ReadOnly; + } + + let elapsed = (now - stats.last_completed) + .to_std() + .unwrap_or(Duration::ZERO); + let probe_due = elapsed >= PAUSE_PROBE_INTERVAL; + + // Pause on repeated empty fetches + if stats.consecutive_empty_fetches >= EMPTY_FETCH_PAUSE_THRESHOLD { + return if probe_due { + SubjectSchedule::Eligible(PAUSE_PROBE_INTERVAL) + } else { + SubjectSchedule::Paused + }; + } + + // Pause on all-failures + if stats.recent_success_count == 0 && stats.recent_failure_count >= FAILURE_PAUSE_THRESHOLD { + return if probe_due { + SubjectSchedule::Eligible(PAUSE_PROBE_INTERVAL) + } else { + SubjectSchedule::Paused + }; + } + + let base = compute_base_interval(stats); + let multiplier = time_of_day_multiplier(now); + let effective = base * multiplier; + + if elapsed >= effective { + SubjectSchedule::Eligible(effective) + } else { + let remaining = effective - elapsed; + SubjectSchedule::Cooldown(remaining) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + /// Create a default `SubjectStats` for testing. Callers mutate fields as needed. + fn make_stats(subject: &str) -> SubjectStats { + SubjectStats { + subject: subject.to_string(), + recent_runs: 10, + avg_change_ratio: 0.0, + consecutive_zero_changes: 0, + consecutive_empty_fetches: 0, + recent_failure_count: 0, + recent_success_count: 10, + last_completed: Utc::now() - chrono::Duration::hours(1), + } + } + + // -- compute_base_interval tests -- + + #[test] + fn test_cold_start_returns_floor() { + let mut stats = make_stats("CS"); + stats.recent_runs = 0; + assert_eq!(compute_base_interval(&stats), COLD_START_INTERVAL); + } + + #[test] + fn test_high_change_rate() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.15; + assert_eq!(compute_base_interval(&stats), FLOOR_INTERVAL); + } + + #[test] + fn test_moderate_high_change() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.07; + assert_eq!(compute_base_interval(&stats), MODERATE_HIGH_INTERVAL); + } + + #[test] + fn test_moderate_low_change() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.03; + assert_eq!(compute_base_interval(&stats), MODERATE_LOW_INTERVAL); + } + + #[test] + fn test_low_change() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.005; + assert_eq!(compute_base_interval(&stats), LOW_CHANGE_INTERVAL); + } + + #[test] + fn test_zero_5_consecutive() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.0; + stats.consecutive_zero_changes = 5; + assert_eq!(compute_base_interval(&stats), ZERO_5_INTERVAL); + } + + #[test] + fn test_zero_10_consecutive() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.0; + stats.consecutive_zero_changes = 10; + assert_eq!(compute_base_interval(&stats), ZERO_10_INTERVAL); + } + + #[test] + fn test_zero_20_consecutive() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.0; + stats.consecutive_zero_changes = 20; + assert_eq!(compute_base_interval(&stats), CEILING_INTERVAL); + } + + // -- evaluate_subject tests -- + + #[test] + fn test_pause_empty_fetches() { + let mut stats = make_stats("CS"); + stats.consecutive_empty_fetches = 3; + stats.last_completed = Utc::now() - chrono::Duration::minutes(10); + let result = evaluate_subject(&stats, Utc::now(), false); + assert_eq!(result, SubjectSchedule::Paused); + } + + #[test] + fn test_pause_all_failures() { + let mut stats = make_stats("CS"); + stats.recent_success_count = 0; + stats.recent_failure_count = 5; + stats.last_completed = Utc::now() - chrono::Duration::minutes(10); + let result = evaluate_subject(&stats, Utc::now(), false); + assert_eq!(result, SubjectSchedule::Paused); + } + + #[test] + fn test_probe_after_pause() { + let mut stats = make_stats("CS"); + stats.consecutive_empty_fetches = 5; + stats.last_completed = Utc::now() - chrono::Duration::hours(7); + let result = evaluate_subject(&stats, Utc::now(), false); + assert_eq!(result, SubjectSchedule::Eligible(PAUSE_PROBE_INTERVAL)); + } + + #[test] + fn test_read_only_past_term() { + let stats = make_stats("CS"); + let result = evaluate_subject(&stats, Utc::now(), true); + assert_eq!(result, SubjectSchedule::ReadOnly); + } + + #[test] + fn test_cooldown_not_elapsed() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.15; // floor = 3 min + stats.last_completed = Utc::now() - chrono::Duration::seconds(30); + // Use a peak-hours timestamp so multiplier = 1 + let peak = Utc.with_ymd_and_hms(2025, 7, 14, 15, 0, 0).unwrap(); // Mon 10am CT + stats.last_completed = peak - chrono::Duration::seconds(30); + let result = evaluate_subject(&stats, peak, false); + assert!(matches!(result, SubjectSchedule::Cooldown(_))); + } + + #[test] + fn test_eligible_elapsed() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.15; // floor = 3 min + let peak = Utc.with_ymd_and_hms(2025, 7, 14, 15, 0, 0).unwrap(); // Mon 10am CT + stats.last_completed = peak - chrono::Duration::minutes(5); + let result = evaluate_subject(&stats, peak, false); + assert!(matches!(result, SubjectSchedule::Eligible(_))); + } + + // -- time_of_day_multiplier tests -- + + #[test] + fn test_time_multiplier_peak() { + // Monday 10am CT = 15:00 UTC + let dt = Utc.with_ymd_and_hms(2025, 7, 14, 15, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 1); + } + + #[test] + fn test_time_multiplier_offpeak() { + // Monday 8pm CT = 01:00 UTC next day, but let's use Tuesday 01:00 UTC = Mon 8pm CT + let dt = Utc.with_ymd_and_hms(2025, 7, 15, 1, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 2); + } + + #[test] + fn test_time_multiplier_night() { + // 3am CT = 08:00 UTC + let dt = Utc.with_ymd_and_hms(2025, 7, 14, 8, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 4); + } + + #[test] + fn test_time_multiplier_weekend() { + // Saturday noon CT = 17:00 UTC + let dt = Utc.with_ymd_and_hms(2025, 7, 12, 17, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 4); + } +} diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 6ef100a..5c2131b 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,3 +1,4 @@ +pub mod adaptive; pub mod jobs; pub mod scheduler; pub mod worker; diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs index 0e15d5a..c7a9d09 100644 --- a/src/scraper/scheduler.rs +++ b/src/scraper/scheduler.rs @@ -3,11 +3,14 @@ use crate::data::models::{ReferenceData, ScrapePriority, TargetType}; use crate::data::scrape_jobs; use crate::error::Result; use crate::rmp::RmpClient; +use crate::scraper::adaptive::{SubjectSchedule, SubjectStats, evaluate_subject}; use crate::scraper::jobs::subject::SubjectJob; use crate::state::ReferenceCache; use crate::web::ws::{ScrapeJobDto, ScrapeJobEvent}; +use chrono::{DateTime, Utc}; use serde_json::json; use sqlx::PgPool; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{RwLock, broadcast}; @@ -148,10 +151,9 @@ impl Scheduler { /// Core scheduling logic that analyzes data and creates scrape jobs. /// - /// Strategy: - /// 1. Fetch all subjects for the current term from Banner API - /// 2. Query existing jobs in a single batch query - /// 3. Create jobs only for subjects that don't have pending jobs + /// Uses adaptive scheduling to determine per-subject scrape intervals based + /// on recent change rates, failure patterns, and time of day. Only subjects + /// that are eligible (i.e. their cooldown has elapsed) are enqueued. /// /// This is a static method (not &self) to allow it to be called from spawned tasks. #[tracing::instrument(skip_all, fields(term))] @@ -160,10 +162,6 @@ impl Scheduler { banner_api: &BannerApi, job_events_tx: Option<&broadcast::Sender>, ) -> Result<()> { - // For now, we will implement a simple baseline scheduling strategy: - // 1. Get a list of all subjects from the Banner API. - // 2. Query existing jobs for all subjects in a single query. - // 3. Create new jobs only for subjects that don't have existing jobs. let term = Term::get_current().inner().to_string(); tracing::Span::current().record("term", term.as_str()); @@ -175,13 +173,70 @@ impl Scheduler { "Retrieved subjects from API" ); - // Create payloads for all subjects - let subject_payloads: Vec<_> = subjects - .iter() - .map(|subject| json!({ "subject": subject.code })) + // Fetch per-subject stats and build a lookup map + let stats_rows = scrape_jobs::fetch_subject_stats(db_pool).await?; + let stats_map: HashMap = stats_rows + .into_iter() + .map(|row| { + let subject = row.subject.clone(); + (subject, SubjectStats::from(row)) + }) .collect(); - // Query existing jobs for all subjects in a single query + // Evaluate each subject using adaptive scheduling + let now = Utc::now(); + let is_past_term = false; // Scheduler currently only fetches current term subjects + let mut eligible_subjects: Vec = Vec::new(); + let mut cooldown_count: usize = 0; + let mut paused_count: usize = 0; + let mut read_only_count: usize = 0; + + for subject in &subjects { + let stats = stats_map.get(&subject.code).cloned().unwrap_or_else(|| { + // Cold start: no history for this subject + SubjectStats { + subject: subject.code.clone(), + recent_runs: 0, + avg_change_ratio: 0.0, + consecutive_zero_changes: 0, + consecutive_empty_fetches: 0, + recent_failure_count: 0, + recent_success_count: 0, + last_completed: DateTime::::MIN_UTC, + } + }); + + match evaluate_subject(&stats, now, is_past_term) { + SubjectSchedule::Eligible(_) => { + eligible_subjects.push(subject.code.clone()); + } + SubjectSchedule::Cooldown(_) => cooldown_count += 1, + SubjectSchedule::Paused => paused_count += 1, + SubjectSchedule::ReadOnly => read_only_count += 1, + } + } + + info!( + total = subjects.len(), + eligible = eligible_subjects.len(), + cooldown = cooldown_count, + paused = paused_count, + read_only = read_only_count, + "Adaptive scheduling decisions" + ); + + if eligible_subjects.is_empty() { + debug!("No eligible subjects to schedule"); + return Ok(()); + } + + // Create payloads only for eligible subjects + let subject_payloads: Vec<_> = eligible_subjects + .iter() + .map(|code| json!({ "subject": code })) + .collect(); + + // Query existing jobs for eligible subjects only let existing_payloads = scrape_jobs::find_existing_job_payloads( TargetType::Subject, &subject_payloads, @@ -189,12 +244,12 @@ impl Scheduler { ) .await?; - // Filter out subjects that already have jobs and prepare new jobs + // Filter out subjects that already have pending jobs let mut skipped_count = 0; - let new_jobs: Vec<_> = subjects + let new_jobs: Vec<_> = eligible_subjects .into_iter() - .filter_map(|subject| { - let job = SubjectJob::new(subject.code.clone()); + .filter_map(|subject_code| { + let job = SubjectJob::new(subject_code.clone()); let payload = serde_json::to_value(&job).unwrap(); let payload_str = payload.to_string(); @@ -202,7 +257,7 @@ impl Scheduler { skipped_count += 1; None } else { - Some((payload, subject.code)) + Some((payload, subject_code)) } }) .collect(); diff --git a/src/web/admin_scraper.rs b/src/web/admin_scraper.rs new file mode 100644 index 0000000..2ccb9e7 --- /dev/null +++ b/src/web/admin_scraper.rs @@ -0,0 +1,431 @@ +//! Admin API handlers for scraper observability. +//! +//! All endpoints require the `AdminUser` extractor, returning 401/403 as needed. + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::Json; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use sqlx::Row; + +use crate::data::scrape_jobs; +use crate::scraper::adaptive::{self, SubjectSchedule, SubjectStats}; +use crate::state::AppState; +use crate::web::extractors::AdminUser; + +type ApiError = (StatusCode, Json); + +fn parse_period(period: &str) -> Result { + match period { + "1h" => Ok(chrono::Duration::hours(1)), + "6h" => Ok(chrono::Duration::hours(6)), + "24h" => Ok(chrono::Duration::hours(24)), + "7d" => Ok(chrono::Duration::days(7)), + "30d" => Ok(chrono::Duration::days(30)), + _ => Err(( + StatusCode::BAD_REQUEST, + Json( + json!({"error": format!("Invalid period '{period}'. Valid: 1h, 6h, 24h, 7d, 30d")}), + ), + )), + } +} + +fn period_to_interval_str(period: &str) -> &'static str { + match period { + "1h" => "1 hour", + "6h" => "6 hours", + "24h" => "24 hours", + "7d" => "7 days", + "30d" => "30 days", + _ => "24 hours", + } +} + +fn parse_bucket(bucket: &str) -> Result<&'static str, ApiError> { + match bucket { + "1m" => Ok("1 minute"), + "5m" => Ok("5 minutes"), + "15m" => Ok("15 minutes"), + "1h" => Ok("1 hour"), + "6h" => Ok("6 hours"), + _ => Err(( + StatusCode::BAD_REQUEST, + Json( + json!({"error": format!("Invalid bucket '{bucket}'. Valid: 1m, 5m, 15m, 1h, 6h")}), + ), + )), + } +} + +fn default_bucket_for_period(period: &str) -> &'static str { + match period { + "1h" => "1m", + "6h" => "5m", + "24h" => "15m", + "7d" => "1h", + "30d" => "6h", + _ => "15m", + } +} + +// --------------------------------------------------------------------------- +// Endpoint 1: GET /api/admin/scraper/stats +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct StatsParams { + #[serde(default = "default_period")] + period: String, +} + +fn default_period() -> String { + "24h".to_string() +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScraperStatsResponse { + period: String, + total_scrapes: i64, + successful_scrapes: i64, + failed_scrapes: i64, + success_rate: f64, + avg_duration_ms: f64, + total_courses_changed: i64, + total_courses_fetched: i64, + total_audits_generated: i64, + pending_jobs: i64, + locked_jobs: i64, +} + +pub async fn scraper_stats( + _admin: AdminUser, + State(state): State, + Query(params): Query, +) -> Result, ApiError> { + let _duration = parse_period(¶ms.period)?; + let interval_str = period_to_interval_str(¶ms.period); + + let row = sqlx::query( + "SELECT \ + COUNT(*) AS total_scrapes, \ + COUNT(*) FILTER (WHERE success) AS successful_scrapes, \ + COUNT(*) FILTER (WHERE NOT success) AS failed_scrapes, \ + COALESCE(AVG(duration_ms) FILTER (WHERE success), 0) AS avg_duration_ms, \ + COALESCE(SUM(courses_changed) FILTER (WHERE success), 0) AS total_courses_changed, \ + COALESCE(SUM(courses_fetched) FILTER (WHERE success), 0) AS total_courses_fetched, \ + COALESCE(SUM(audits_generated) FILTER (WHERE success), 0) AS total_audits_generated \ + FROM scrape_job_results \ + WHERE completed_at > NOW() - $1::interval", + ) + .bind(interval_str) + .fetch_one(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch scraper stats"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch scraper stats"})), + ) + })?; + + let total_scrapes: i64 = row.get("total_scrapes"); + let successful_scrapes: i64 = row.get("successful_scrapes"); + let failed_scrapes: i64 = row.get("failed_scrapes"); + let avg_duration_ms: f64 = row.get("avg_duration_ms"); + let total_courses_changed: i64 = row.get("total_courses_changed"); + let total_courses_fetched: i64 = row.get("total_courses_fetched"); + let total_audits_generated: i64 = row.get("total_audits_generated"); + + let queue_row = sqlx::query( + "SELECT \ + COUNT(*) FILTER (WHERE locked_at IS NULL) AS pending_jobs, \ + COUNT(*) FILTER (WHERE locked_at IS NOT NULL) AS locked_jobs \ + FROM scrape_jobs", + ) + .fetch_one(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch queue stats"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch queue stats"})), + ) + })?; + + let pending_jobs: i64 = queue_row.get("pending_jobs"); + let locked_jobs: i64 = queue_row.get("locked_jobs"); + + let success_rate = if total_scrapes > 0 { + successful_scrapes as f64 / total_scrapes as f64 + } else { + 0.0 + }; + + Ok(Json(ScraperStatsResponse { + period: params.period, + total_scrapes, + successful_scrapes, + failed_scrapes, + success_rate, + avg_duration_ms, + total_courses_changed, + total_courses_fetched, + total_audits_generated, + pending_jobs, + locked_jobs, + })) +} + +// --------------------------------------------------------------------------- +// Endpoint 2: GET /api/admin/scraper/timeseries +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct TimeseriesParams { + #[serde(default = "default_period")] + period: String, + bucket: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TimeseriesResponse { + period: String, + bucket: String, + points: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TimeseriesPoint { + timestamp: DateTime, + scrape_count: i64, + success_count: i64, + error_count: i64, + courses_changed: i64, + avg_duration_ms: f64, +} + +pub async fn scraper_timeseries( + _admin: AdminUser, + State(state): State, + Query(params): Query, +) -> Result, ApiError> { + let _duration = parse_period(¶ms.period)?; + let period_interval = period_to_interval_str(¶ms.period); + + let bucket_code = match ¶ms.bucket { + Some(b) => { + // Validate the bucket + parse_bucket(b)?; + b.as_str() + } + None => default_bucket_for_period(¶ms.period), + }; + let bucket_interval = parse_bucket(bucket_code)?; + + let rows = sqlx::query( + "SELECT \ + date_bin($1::interval, completed_at, '2020-01-01'::timestamptz) AS bucket_start, \ + COUNT(*)::BIGINT AS scrape_count, \ + COUNT(*) FILTER (WHERE success)::BIGINT AS success_count, \ + COUNT(*) FILTER (WHERE NOT success)::BIGINT AS error_count, \ + COALESCE(SUM(courses_changed) FILTER (WHERE success), 0)::BIGINT AS courses_changed, \ + COALESCE(AVG(duration_ms) FILTER (WHERE success), 0)::FLOAT8 AS avg_duration_ms \ + FROM scrape_job_results \ + WHERE completed_at > NOW() - $2::interval \ + GROUP BY bucket_start \ + ORDER BY bucket_start", + ) + .bind(bucket_interval) + .bind(period_interval) + .fetch_all(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch scraper timeseries"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch scraper timeseries"})), + ) + })?; + + let points = rows + .iter() + .map(|row| TimeseriesPoint { + timestamp: row.get("bucket_start"), + scrape_count: row.get("scrape_count"), + success_count: row.get("success_count"), + error_count: row.get("error_count"), + courses_changed: row.get("courses_changed"), + avg_duration_ms: row.get("avg_duration_ms"), + }) + .collect(); + + Ok(Json(TimeseriesResponse { + period: params.period, + bucket: bucket_code.to_string(), + points, + })) +} + +// --------------------------------------------------------------------------- +// Endpoint 3: GET /api/admin/scraper/subjects +// --------------------------------------------------------------------------- + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectsResponse { + subjects: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectSummary { + subject: String, + schedule_state: String, + current_interval_secs: u64, + time_multiplier: u32, + last_scraped: DateTime, + avg_change_ratio: f64, + consecutive_zero_changes: i64, + recent_runs: i64, + recent_failures: i64, +} + +pub async fn scraper_subjects( + _admin: AdminUser, + State(state): State, +) -> Result, ApiError> { + let raw_stats = scrape_jobs::fetch_subject_stats(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch subject stats"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch subject stats"})), + ) + })?; + + let now = Utc::now(); + let multiplier = adaptive::time_of_day_multiplier(now); + + let subjects = raw_stats + .into_iter() + .map(|row| { + let stats: SubjectStats = row.into(); + let schedule = adaptive::evaluate_subject(&stats, now, false); + let base_interval = adaptive::compute_base_interval(&stats); + + let schedule_state = match &schedule { + SubjectSchedule::Eligible(_) => "eligible", + SubjectSchedule::Cooldown(_) => "cooldown", + SubjectSchedule::Paused => "paused", + SubjectSchedule::ReadOnly => "read_only", + }; + + let current_interval_secs = base_interval.as_secs() * multiplier as u64; + + SubjectSummary { + subject: stats.subject, + schedule_state: schedule_state.to_string(), + current_interval_secs, + time_multiplier: multiplier, + last_scraped: stats.last_completed, + avg_change_ratio: stats.avg_change_ratio, + consecutive_zero_changes: stats.consecutive_zero_changes, + recent_runs: stats.recent_runs, + recent_failures: stats.recent_failure_count, + } + }) + .collect(); + + Ok(Json(SubjectsResponse { subjects })) +} + +// --------------------------------------------------------------------------- +// Endpoint 4: GET /api/admin/scraper/subjects/{subject} +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct SubjectDetailParams { + #[serde(default = "default_detail_limit")] + limit: i32, +} + +fn default_detail_limit() -> i32 { + 50 +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectDetailResponse { + subject: String, + results: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectResultEntry { + id: i64, + completed_at: DateTime, + duration_ms: i32, + success: bool, + error_message: Option, + courses_fetched: Option, + courses_changed: Option, + courses_unchanged: Option, + audits_generated: Option, + metrics_generated: Option, +} + +pub async fn scraper_subject_detail( + _admin: AdminUser, + State(state): State, + Path(subject): Path, + Query(params): Query, +) -> Result, ApiError> { + let limit = params.limit.clamp(1, 200); + + let rows = sqlx::query( + "SELECT id, completed_at, duration_ms, success, error_message, \ + courses_fetched, courses_changed, courses_unchanged, \ + audits_generated, metrics_generated \ + FROM scrape_job_results \ + WHERE target_type = 'Subject' AND payload->>'subject' = $1 \ + ORDER BY completed_at DESC \ + LIMIT $2", + ) + .bind(&subject) + .bind(limit) + .fetch_all(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, subject = %subject, "Failed to fetch subject detail"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch subject detail"})), + ) + })?; + + let results = rows + .iter() + .map(|row| SubjectResultEntry { + id: row.get("id"), + completed_at: row.get("completed_at"), + duration_ms: row.get("duration_ms"), + success: row.get("success"), + error_message: row.get("error_message"), + courses_fetched: row.get("courses_fetched"), + courses_changed: row.get("courses_changed"), + courses_unchanged: row.get("courses_unchanged"), + audits_generated: row.get("audits_generated"), + metrics_generated: row.get("metrics_generated"), + }) + .collect(); + + Ok(Json(SubjectDetailResponse { subject, results })) +} diff --git a/src/web/mod.rs b/src/web/mod.rs index 2a1e6fc..088843f 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -2,6 +2,7 @@ pub mod admin; pub mod admin_rmp; +pub mod admin_scraper; #[cfg(feature = "embed-assets")] pub mod assets; pub mod auth; diff --git a/src/web/routes.rs b/src/web/routes.rs index 5da990f..582c180 100644 --- a/src/web/routes.rs +++ b/src/web/routes.rs @@ -11,6 +11,7 @@ use axum::{ use crate::web::admin; use crate::web::admin_rmp; +use crate::web::admin_scraper; use crate::web::auth::{self, AuthConfig}; use crate::web::ws; #[cfg(feature = "embed-assets")] @@ -86,6 +87,19 @@ pub fn create_router(app_state: AppState, auth_config: AuthConfig) -> Router { post(admin_rmp::unmatch_instructor), ) .route("/admin/rmp/rescore", post(admin_rmp::rescore)) + .route("/admin/scraper/stats", get(admin_scraper::scraper_stats)) + .route( + "/admin/scraper/timeseries", + get(admin_scraper::scraper_timeseries), + ) + .route( + "/admin/scraper/subjects", + get(admin_scraper::scraper_subjects), + ) + .route( + "/admin/scraper/subjects/{subject}", + get(admin_scraper::scraper_subject_detail), + ) .with_state(app_state); let mut router = Router::new()