diff --git a/Cargo.lock b/Cargo.lock index 581efdb..43e2062 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,7 @@ dependencies = [ "axum-extra", "bitflags 2.9.4", "chrono", + "chrono-tz", "clap", "compile-time", "cookie", @@ -484,6 +485,16 @@ dependencies = [ "windows-link 0.2.0", ] +[[package]] +name = "chrono-tz" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" +dependencies = [ + "chrono", + "phf", +] + [[package]] name = "clap" version = "4.5.47" @@ -2127,6 +2138,24 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2983,6 +3012,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "skeptic" version = "0.13.7" diff --git a/Cargo.toml b/Cargo.toml index 31f94bf..a261c67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ ts-rs = { version = "11.1.0", features = ["chrono-impl", "serde-compat", "serde- html-escape = "0.2.13" axum-extra = { version = "0.12.5", features = ["query"] } urlencoding = "2.1.3" +chrono-tz = "0.10.4" [dev-dependencies] diff --git a/src/data/scrape_jobs.rs b/src/data/scrape_jobs.rs index 3f0cf93..52dbbe3 100644 --- a/src/data/scrape_jobs.rs +++ b/src/data/scrape_jobs.rs @@ -213,6 +213,70 @@ pub async fn insert_job_result( Ok(()) } +/// Per-subject aggregated stats from recent scrape results. +/// +/// Populated by [`fetch_subject_stats`] and converted into +/// [`crate::scraper::adaptive::SubjectStats`] for interval computation. +#[derive(sqlx::FromRow, Debug, Clone)] +pub struct SubjectResultStats { + pub subject: String, + pub recent_runs: i64, + pub avg_change_ratio: f64, + pub consecutive_zero_changes: i64, + pub consecutive_empty_fetches: i64, + pub recent_failure_count: i64, + pub recent_success_count: i64, + pub last_completed: DateTime, +} + +/// Fetch aggregated per-subject statistics from the last 24 hours of results. +/// +/// For each subject, examines the 20 most recent results and computes: +/// - Average change ratio (courses_changed / courses_fetched) +/// - Consecutive zero-change runs from the most recent result +/// - Consecutive empty-fetch runs from the most recent result +/// - Failure and success counts +/// - Last completion timestamp +pub async fn fetch_subject_stats(db_pool: &PgPool) -> Result> { + let rows = sqlx::query_as::<_, SubjectResultStats>( + r#" + WITH recent AS ( + SELECT payload->>'subject' AS subject, success, + COALESCE(courses_fetched, 0) AS courses_fetched, + COALESCE(courses_changed, 0) AS courses_changed, + completed_at, + ROW_NUMBER() OVER (PARTITION BY payload->>'subject' ORDER BY completed_at DESC) AS rn + FROM scrape_job_results + WHERE target_type = 'Subject' AND completed_at > NOW() - INTERVAL '24 hours' + ), + filtered AS (SELECT * FROM recent WHERE rn <= 20), + zero_break AS ( + SELECT subject, + MIN(rn) FILTER (WHERE courses_changed > 0 AND success) AS first_nonzero_rn, + MIN(rn) FILTER (WHERE courses_fetched > 0 AND success) AS first_nonempty_rn + FROM filtered GROUP BY subject + ) + SELECT + f.subject::TEXT AS "subject!", + COUNT(*)::BIGINT AS "recent_runs!", + COALESCE(AVG(CASE WHEN f.success AND f.courses_fetched > 0 + THEN f.courses_changed::FLOAT / f.courses_fetched ELSE NULL END), 0.0)::FLOAT8 AS "avg_change_ratio!", + COALESCE(zb.first_nonzero_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_changed = 0))::BIGINT AS "consecutive_zero_changes!", + COALESCE(zb.first_nonempty_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_fetched = 0))::BIGINT AS "consecutive_empty_fetches!", + COUNT(*) FILTER (WHERE NOT f.success)::BIGINT AS "recent_failure_count!", + COUNT(*) FILTER (WHERE f.success)::BIGINT AS "recent_success_count!", + MAX(f.completed_at) AS "last_completed!" + FROM filtered f + LEFT JOIN zero_break zb ON f.subject = zb.subject + GROUP BY f.subject, zb.first_nonzero_rn, zb.first_nonempty_rn + "#, + ) + .fetch_all(db_pool) + .await?; + + Ok(rows) +} + /// Batch insert scrape jobs using UNNEST for a single round-trip. /// /// All jobs are inserted with `execute_at` set to the current time. diff --git a/src/scraper/adaptive.rs b/src/scraper/adaptive.rs new file mode 100644 index 0000000..0bb0ff0 --- /dev/null +++ b/src/scraper/adaptive.rs @@ -0,0 +1,326 @@ +//! Adaptive scraping interval computation. +//! +//! Assigns per-subject scrape intervals based on recent change rates, +//! consecutive zero-change runs, failure patterns, and time of day. + +use chrono::{DateTime, Datelike, Timelike, Utc}; +use chrono_tz::US::Central; +use std::time::Duration; + +use crate::data::scrape_jobs::SubjectResultStats; + +const FLOOR_INTERVAL: Duration = Duration::from_secs(3 * 60); +const MODERATE_HIGH_INTERVAL: Duration = Duration::from_secs(5 * 60); +const MODERATE_LOW_INTERVAL: Duration = Duration::from_secs(15 * 60); +const LOW_CHANGE_INTERVAL: Duration = Duration::from_secs(30 * 60); +const ZERO_5_INTERVAL: Duration = Duration::from_secs(60 * 60); +const ZERO_10_INTERVAL: Duration = Duration::from_secs(2 * 60 * 60); +const CEILING_INTERVAL: Duration = Duration::from_secs(4 * 60 * 60); +const COLD_START_INTERVAL: Duration = FLOOR_INTERVAL; +const PAUSE_PROBE_INTERVAL: Duration = Duration::from_secs(6 * 60 * 60); +const EMPTY_FETCH_PAUSE_THRESHOLD: i64 = 3; +const FAILURE_PAUSE_THRESHOLD: i64 = 5; + +/// Aggregated per-subject statistics derived from recent scrape results. +#[derive(Debug, Clone)] +pub struct SubjectStats { + pub subject: String, + pub recent_runs: i64, + pub avg_change_ratio: f64, + pub consecutive_zero_changes: i64, + pub consecutive_empty_fetches: i64, + pub recent_failure_count: i64, + pub recent_success_count: i64, + pub last_completed: DateTime, +} + +/// Scheduling decision for a subject. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubjectSchedule { + /// Subject is due for scraping, with the computed interval. + Eligible(Duration), + /// Subject was scraped recently; wait for the remaining cooldown. + Cooldown(Duration), + /// Subject is paused due to repeated empty fetches or failures. + Paused, + /// Subject belongs to a past term and should not be scraped. + ReadOnly, +} + +impl From for SubjectStats { + fn from(row: SubjectResultStats) -> Self { + Self { + subject: row.subject, + recent_runs: row.recent_runs, + avg_change_ratio: row.avg_change_ratio, + consecutive_zero_changes: row.consecutive_zero_changes, + consecutive_empty_fetches: row.consecutive_empty_fetches, + recent_failure_count: row.recent_failure_count, + recent_success_count: row.recent_success_count, + last_completed: row.last_completed, + } + } +} + +/// Compute the base interval tier from change-rate statistics. +pub fn compute_base_interval(stats: &SubjectStats) -> Duration { + if stats.recent_runs == 0 { + return COLD_START_INTERVAL; + } + + // Consecutive-zero tiers take precedence when change ratio is near zero + if stats.avg_change_ratio < 0.001 { + return match stats.consecutive_zero_changes { + 0..5 => LOW_CHANGE_INTERVAL, + 5..10 => ZERO_5_INTERVAL, + 10..20 => ZERO_10_INTERVAL, + _ => CEILING_INTERVAL, + }; + } + + match stats.avg_change_ratio { + r if r >= 0.10 => FLOOR_INTERVAL, + r if r >= 0.05 => MODERATE_HIGH_INTERVAL, + r if r >= 0.01 => MODERATE_LOW_INTERVAL, + _ => LOW_CHANGE_INTERVAL, + } +} + +/// Return a time-of-day multiplier for the given UTC timestamp. +/// +/// Peak hours (weekdays 8am-6pm CT) return 1; off-peak (weekdays 6pm-midnight CT) +/// return 2; night (midnight-8am CT) and weekends return 4. +pub fn time_of_day_multiplier(now: DateTime) -> u32 { + let ct = now.with_timezone(&Central); + let weekday = ct.weekday(); + let hour = ct.hour(); + + // Weekends get the slowest multiplier + if matches!(weekday, chrono::Weekday::Sat | chrono::Weekday::Sun) { + return 4; + } + + match hour { + 8..18 => 1, // peak + 18..24 => 2, // off-peak + _ => 4, // night (0..8) + } +} + +/// Evaluate whether a subject should be scraped now. +/// +/// Combines base interval, time-of-day multiplier, pause detection (empty +/// fetches / consecutive failures), and past-term read-only status. +pub fn evaluate_subject( + stats: &SubjectStats, + now: DateTime, + is_past_term: bool, +) -> SubjectSchedule { + if is_past_term { + return SubjectSchedule::ReadOnly; + } + + let elapsed = (now - stats.last_completed) + .to_std() + .unwrap_or(Duration::ZERO); + let probe_due = elapsed >= PAUSE_PROBE_INTERVAL; + + // Pause on repeated empty fetches + if stats.consecutive_empty_fetches >= EMPTY_FETCH_PAUSE_THRESHOLD { + return if probe_due { + SubjectSchedule::Eligible(PAUSE_PROBE_INTERVAL) + } else { + SubjectSchedule::Paused + }; + } + + // Pause on all-failures + if stats.recent_success_count == 0 && stats.recent_failure_count >= FAILURE_PAUSE_THRESHOLD { + return if probe_due { + SubjectSchedule::Eligible(PAUSE_PROBE_INTERVAL) + } else { + SubjectSchedule::Paused + }; + } + + let base = compute_base_interval(stats); + let multiplier = time_of_day_multiplier(now); + let effective = base * multiplier; + + if elapsed >= effective { + SubjectSchedule::Eligible(effective) + } else { + let remaining = effective - elapsed; + SubjectSchedule::Cooldown(remaining) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + /// Create a default `SubjectStats` for testing. Callers mutate fields as needed. + fn make_stats(subject: &str) -> SubjectStats { + SubjectStats { + subject: subject.to_string(), + recent_runs: 10, + avg_change_ratio: 0.0, + consecutive_zero_changes: 0, + consecutive_empty_fetches: 0, + recent_failure_count: 0, + recent_success_count: 10, + last_completed: Utc::now() - chrono::Duration::hours(1), + } + } + + // -- compute_base_interval tests -- + + #[test] + fn test_cold_start_returns_floor() { + let mut stats = make_stats("CS"); + stats.recent_runs = 0; + assert_eq!(compute_base_interval(&stats), COLD_START_INTERVAL); + } + + #[test] + fn test_high_change_rate() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.15; + assert_eq!(compute_base_interval(&stats), FLOOR_INTERVAL); + } + + #[test] + fn test_moderate_high_change() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.07; + assert_eq!(compute_base_interval(&stats), MODERATE_HIGH_INTERVAL); + } + + #[test] + fn test_moderate_low_change() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.03; + assert_eq!(compute_base_interval(&stats), MODERATE_LOW_INTERVAL); + } + + #[test] + fn test_low_change() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.005; + assert_eq!(compute_base_interval(&stats), LOW_CHANGE_INTERVAL); + } + + #[test] + fn test_zero_5_consecutive() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.0; + stats.consecutive_zero_changes = 5; + assert_eq!(compute_base_interval(&stats), ZERO_5_INTERVAL); + } + + #[test] + fn test_zero_10_consecutive() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.0; + stats.consecutive_zero_changes = 10; + assert_eq!(compute_base_interval(&stats), ZERO_10_INTERVAL); + } + + #[test] + fn test_zero_20_consecutive() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.0; + stats.consecutive_zero_changes = 20; + assert_eq!(compute_base_interval(&stats), CEILING_INTERVAL); + } + + // -- evaluate_subject tests -- + + #[test] + fn test_pause_empty_fetches() { + let mut stats = make_stats("CS"); + stats.consecutive_empty_fetches = 3; + stats.last_completed = Utc::now() - chrono::Duration::minutes(10); + let result = evaluate_subject(&stats, Utc::now(), false); + assert_eq!(result, SubjectSchedule::Paused); + } + + #[test] + fn test_pause_all_failures() { + let mut stats = make_stats("CS"); + stats.recent_success_count = 0; + stats.recent_failure_count = 5; + stats.last_completed = Utc::now() - chrono::Duration::minutes(10); + let result = evaluate_subject(&stats, Utc::now(), false); + assert_eq!(result, SubjectSchedule::Paused); + } + + #[test] + fn test_probe_after_pause() { + let mut stats = make_stats("CS"); + stats.consecutive_empty_fetches = 5; + stats.last_completed = Utc::now() - chrono::Duration::hours(7); + let result = evaluate_subject(&stats, Utc::now(), false); + assert_eq!(result, SubjectSchedule::Eligible(PAUSE_PROBE_INTERVAL)); + } + + #[test] + fn test_read_only_past_term() { + let stats = make_stats("CS"); + let result = evaluate_subject(&stats, Utc::now(), true); + assert_eq!(result, SubjectSchedule::ReadOnly); + } + + #[test] + fn test_cooldown_not_elapsed() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.15; // floor = 3 min + stats.last_completed = Utc::now() - chrono::Duration::seconds(30); + // Use a peak-hours timestamp so multiplier = 1 + let peak = Utc.with_ymd_and_hms(2025, 7, 14, 15, 0, 0).unwrap(); // Mon 10am CT + stats.last_completed = peak - chrono::Duration::seconds(30); + let result = evaluate_subject(&stats, peak, false); + assert!(matches!(result, SubjectSchedule::Cooldown(_))); + } + + #[test] + fn test_eligible_elapsed() { + let mut stats = make_stats("CS"); + stats.avg_change_ratio = 0.15; // floor = 3 min + let peak = Utc.with_ymd_and_hms(2025, 7, 14, 15, 0, 0).unwrap(); // Mon 10am CT + stats.last_completed = peak - chrono::Duration::minutes(5); + let result = evaluate_subject(&stats, peak, false); + assert!(matches!(result, SubjectSchedule::Eligible(_))); + } + + // -- time_of_day_multiplier tests -- + + #[test] + fn test_time_multiplier_peak() { + // Monday 10am CT = 15:00 UTC + let dt = Utc.with_ymd_and_hms(2025, 7, 14, 15, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 1); + } + + #[test] + fn test_time_multiplier_offpeak() { + // Monday 8pm CT = 01:00 UTC next day, but let's use Tuesday 01:00 UTC = Mon 8pm CT + let dt = Utc.with_ymd_and_hms(2025, 7, 15, 1, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 2); + } + + #[test] + fn test_time_multiplier_night() { + // 3am CT = 08:00 UTC + let dt = Utc.with_ymd_and_hms(2025, 7, 14, 8, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 4); + } + + #[test] + fn test_time_multiplier_weekend() { + // Saturday noon CT = 17:00 UTC + let dt = Utc.with_ymd_and_hms(2025, 7, 12, 17, 0, 0).unwrap(); + assert_eq!(time_of_day_multiplier(dt), 4); + } +} diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 6ef100a..5c2131b 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,3 +1,4 @@ +pub mod adaptive; pub mod jobs; pub mod scheduler; pub mod worker; diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs index 0e15d5a..c7a9d09 100644 --- a/src/scraper/scheduler.rs +++ b/src/scraper/scheduler.rs @@ -3,11 +3,14 @@ use crate::data::models::{ReferenceData, ScrapePriority, TargetType}; use crate::data::scrape_jobs; use crate::error::Result; use crate::rmp::RmpClient; +use crate::scraper::adaptive::{SubjectSchedule, SubjectStats, evaluate_subject}; use crate::scraper::jobs::subject::SubjectJob; use crate::state::ReferenceCache; use crate::web::ws::{ScrapeJobDto, ScrapeJobEvent}; +use chrono::{DateTime, Utc}; use serde_json::json; use sqlx::PgPool; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{RwLock, broadcast}; @@ -148,10 +151,9 @@ impl Scheduler { /// Core scheduling logic that analyzes data and creates scrape jobs. /// - /// Strategy: - /// 1. Fetch all subjects for the current term from Banner API - /// 2. Query existing jobs in a single batch query - /// 3. Create jobs only for subjects that don't have pending jobs + /// Uses adaptive scheduling to determine per-subject scrape intervals based + /// on recent change rates, failure patterns, and time of day. Only subjects + /// that are eligible (i.e. their cooldown has elapsed) are enqueued. /// /// This is a static method (not &self) to allow it to be called from spawned tasks. #[tracing::instrument(skip_all, fields(term))] @@ -160,10 +162,6 @@ impl Scheduler { banner_api: &BannerApi, job_events_tx: Option<&broadcast::Sender>, ) -> Result<()> { - // For now, we will implement a simple baseline scheduling strategy: - // 1. Get a list of all subjects from the Banner API. - // 2. Query existing jobs for all subjects in a single query. - // 3. Create new jobs only for subjects that don't have existing jobs. let term = Term::get_current().inner().to_string(); tracing::Span::current().record("term", term.as_str()); @@ -175,13 +173,70 @@ impl Scheduler { "Retrieved subjects from API" ); - // Create payloads for all subjects - let subject_payloads: Vec<_> = subjects - .iter() - .map(|subject| json!({ "subject": subject.code })) + // Fetch per-subject stats and build a lookup map + let stats_rows = scrape_jobs::fetch_subject_stats(db_pool).await?; + let stats_map: HashMap = stats_rows + .into_iter() + .map(|row| { + let subject = row.subject.clone(); + (subject, SubjectStats::from(row)) + }) .collect(); - // Query existing jobs for all subjects in a single query + // Evaluate each subject using adaptive scheduling + let now = Utc::now(); + let is_past_term = false; // Scheduler currently only fetches current term subjects + let mut eligible_subjects: Vec = Vec::new(); + let mut cooldown_count: usize = 0; + let mut paused_count: usize = 0; + let mut read_only_count: usize = 0; + + for subject in &subjects { + let stats = stats_map.get(&subject.code).cloned().unwrap_or_else(|| { + // Cold start: no history for this subject + SubjectStats { + subject: subject.code.clone(), + recent_runs: 0, + avg_change_ratio: 0.0, + consecutive_zero_changes: 0, + consecutive_empty_fetches: 0, + recent_failure_count: 0, + recent_success_count: 0, + last_completed: DateTime::::MIN_UTC, + } + }); + + match evaluate_subject(&stats, now, is_past_term) { + SubjectSchedule::Eligible(_) => { + eligible_subjects.push(subject.code.clone()); + } + SubjectSchedule::Cooldown(_) => cooldown_count += 1, + SubjectSchedule::Paused => paused_count += 1, + SubjectSchedule::ReadOnly => read_only_count += 1, + } + } + + info!( + total = subjects.len(), + eligible = eligible_subjects.len(), + cooldown = cooldown_count, + paused = paused_count, + read_only = read_only_count, + "Adaptive scheduling decisions" + ); + + if eligible_subjects.is_empty() { + debug!("No eligible subjects to schedule"); + return Ok(()); + } + + // Create payloads only for eligible subjects + let subject_payloads: Vec<_> = eligible_subjects + .iter() + .map(|code| json!({ "subject": code })) + .collect(); + + // Query existing jobs for eligible subjects only let existing_payloads = scrape_jobs::find_existing_job_payloads( TargetType::Subject, &subject_payloads, @@ -189,12 +244,12 @@ impl Scheduler { ) .await?; - // Filter out subjects that already have jobs and prepare new jobs + // Filter out subjects that already have pending jobs let mut skipped_count = 0; - let new_jobs: Vec<_> = subjects + let new_jobs: Vec<_> = eligible_subjects .into_iter() - .filter_map(|subject| { - let job = SubjectJob::new(subject.code.clone()); + .filter_map(|subject_code| { + let job = SubjectJob::new(subject_code.clone()); let payload = serde_json::to_value(&job).unwrap(); let payload_str = payload.to_string(); @@ -202,7 +257,7 @@ impl Scheduler { skipped_count += 1; None } else { - Some((payload, subject.code)) + Some((payload, subject_code)) } }) .collect(); diff --git a/src/web/admin_scraper.rs b/src/web/admin_scraper.rs new file mode 100644 index 0000000..2ccb9e7 --- /dev/null +++ b/src/web/admin_scraper.rs @@ -0,0 +1,431 @@ +//! Admin API handlers for scraper observability. +//! +//! All endpoints require the `AdminUser` extractor, returning 401/403 as needed. + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::Json; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use sqlx::Row; + +use crate::data::scrape_jobs; +use crate::scraper::adaptive::{self, SubjectSchedule, SubjectStats}; +use crate::state::AppState; +use crate::web::extractors::AdminUser; + +type ApiError = (StatusCode, Json); + +fn parse_period(period: &str) -> Result { + match period { + "1h" => Ok(chrono::Duration::hours(1)), + "6h" => Ok(chrono::Duration::hours(6)), + "24h" => Ok(chrono::Duration::hours(24)), + "7d" => Ok(chrono::Duration::days(7)), + "30d" => Ok(chrono::Duration::days(30)), + _ => Err(( + StatusCode::BAD_REQUEST, + Json( + json!({"error": format!("Invalid period '{period}'. Valid: 1h, 6h, 24h, 7d, 30d")}), + ), + )), + } +} + +fn period_to_interval_str(period: &str) -> &'static str { + match period { + "1h" => "1 hour", + "6h" => "6 hours", + "24h" => "24 hours", + "7d" => "7 days", + "30d" => "30 days", + _ => "24 hours", + } +} + +fn parse_bucket(bucket: &str) -> Result<&'static str, ApiError> { + match bucket { + "1m" => Ok("1 minute"), + "5m" => Ok("5 minutes"), + "15m" => Ok("15 minutes"), + "1h" => Ok("1 hour"), + "6h" => Ok("6 hours"), + _ => Err(( + StatusCode::BAD_REQUEST, + Json( + json!({"error": format!("Invalid bucket '{bucket}'. Valid: 1m, 5m, 15m, 1h, 6h")}), + ), + )), + } +} + +fn default_bucket_for_period(period: &str) -> &'static str { + match period { + "1h" => "1m", + "6h" => "5m", + "24h" => "15m", + "7d" => "1h", + "30d" => "6h", + _ => "15m", + } +} + +// --------------------------------------------------------------------------- +// Endpoint 1: GET /api/admin/scraper/stats +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct StatsParams { + #[serde(default = "default_period")] + period: String, +} + +fn default_period() -> String { + "24h".to_string() +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ScraperStatsResponse { + period: String, + total_scrapes: i64, + successful_scrapes: i64, + failed_scrapes: i64, + success_rate: f64, + avg_duration_ms: f64, + total_courses_changed: i64, + total_courses_fetched: i64, + total_audits_generated: i64, + pending_jobs: i64, + locked_jobs: i64, +} + +pub async fn scraper_stats( + _admin: AdminUser, + State(state): State, + Query(params): Query, +) -> Result, ApiError> { + let _duration = parse_period(¶ms.period)?; + let interval_str = period_to_interval_str(¶ms.period); + + let row = sqlx::query( + "SELECT \ + COUNT(*) AS total_scrapes, \ + COUNT(*) FILTER (WHERE success) AS successful_scrapes, \ + COUNT(*) FILTER (WHERE NOT success) AS failed_scrapes, \ + COALESCE(AVG(duration_ms) FILTER (WHERE success), 0) AS avg_duration_ms, \ + COALESCE(SUM(courses_changed) FILTER (WHERE success), 0) AS total_courses_changed, \ + COALESCE(SUM(courses_fetched) FILTER (WHERE success), 0) AS total_courses_fetched, \ + COALESCE(SUM(audits_generated) FILTER (WHERE success), 0) AS total_audits_generated \ + FROM scrape_job_results \ + WHERE completed_at > NOW() - $1::interval", + ) + .bind(interval_str) + .fetch_one(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch scraper stats"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch scraper stats"})), + ) + })?; + + let total_scrapes: i64 = row.get("total_scrapes"); + let successful_scrapes: i64 = row.get("successful_scrapes"); + let failed_scrapes: i64 = row.get("failed_scrapes"); + let avg_duration_ms: f64 = row.get("avg_duration_ms"); + let total_courses_changed: i64 = row.get("total_courses_changed"); + let total_courses_fetched: i64 = row.get("total_courses_fetched"); + let total_audits_generated: i64 = row.get("total_audits_generated"); + + let queue_row = sqlx::query( + "SELECT \ + COUNT(*) FILTER (WHERE locked_at IS NULL) AS pending_jobs, \ + COUNT(*) FILTER (WHERE locked_at IS NOT NULL) AS locked_jobs \ + FROM scrape_jobs", + ) + .fetch_one(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch queue stats"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch queue stats"})), + ) + })?; + + let pending_jobs: i64 = queue_row.get("pending_jobs"); + let locked_jobs: i64 = queue_row.get("locked_jobs"); + + let success_rate = if total_scrapes > 0 { + successful_scrapes as f64 / total_scrapes as f64 + } else { + 0.0 + }; + + Ok(Json(ScraperStatsResponse { + period: params.period, + total_scrapes, + successful_scrapes, + failed_scrapes, + success_rate, + avg_duration_ms, + total_courses_changed, + total_courses_fetched, + total_audits_generated, + pending_jobs, + locked_jobs, + })) +} + +// --------------------------------------------------------------------------- +// Endpoint 2: GET /api/admin/scraper/timeseries +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct TimeseriesParams { + #[serde(default = "default_period")] + period: String, + bucket: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TimeseriesResponse { + period: String, + bucket: String, + points: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TimeseriesPoint { + timestamp: DateTime, + scrape_count: i64, + success_count: i64, + error_count: i64, + courses_changed: i64, + avg_duration_ms: f64, +} + +pub async fn scraper_timeseries( + _admin: AdminUser, + State(state): State, + Query(params): Query, +) -> Result, ApiError> { + let _duration = parse_period(¶ms.period)?; + let period_interval = period_to_interval_str(¶ms.period); + + let bucket_code = match ¶ms.bucket { + Some(b) => { + // Validate the bucket + parse_bucket(b)?; + b.as_str() + } + None => default_bucket_for_period(¶ms.period), + }; + let bucket_interval = parse_bucket(bucket_code)?; + + let rows = sqlx::query( + "SELECT \ + date_bin($1::interval, completed_at, '2020-01-01'::timestamptz) AS bucket_start, \ + COUNT(*)::BIGINT AS scrape_count, \ + COUNT(*) FILTER (WHERE success)::BIGINT AS success_count, \ + COUNT(*) FILTER (WHERE NOT success)::BIGINT AS error_count, \ + COALESCE(SUM(courses_changed) FILTER (WHERE success), 0)::BIGINT AS courses_changed, \ + COALESCE(AVG(duration_ms) FILTER (WHERE success), 0)::FLOAT8 AS avg_duration_ms \ + FROM scrape_job_results \ + WHERE completed_at > NOW() - $2::interval \ + GROUP BY bucket_start \ + ORDER BY bucket_start", + ) + .bind(bucket_interval) + .bind(period_interval) + .fetch_all(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch scraper timeseries"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch scraper timeseries"})), + ) + })?; + + let points = rows + .iter() + .map(|row| TimeseriesPoint { + timestamp: row.get("bucket_start"), + scrape_count: row.get("scrape_count"), + success_count: row.get("success_count"), + error_count: row.get("error_count"), + courses_changed: row.get("courses_changed"), + avg_duration_ms: row.get("avg_duration_ms"), + }) + .collect(); + + Ok(Json(TimeseriesResponse { + period: params.period, + bucket: bucket_code.to_string(), + points, + })) +} + +// --------------------------------------------------------------------------- +// Endpoint 3: GET /api/admin/scraper/subjects +// --------------------------------------------------------------------------- + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectsResponse { + subjects: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectSummary { + subject: String, + schedule_state: String, + current_interval_secs: u64, + time_multiplier: u32, + last_scraped: DateTime, + avg_change_ratio: f64, + consecutive_zero_changes: i64, + recent_runs: i64, + recent_failures: i64, +} + +pub async fn scraper_subjects( + _admin: AdminUser, + State(state): State, +) -> Result, ApiError> { + let raw_stats = scrape_jobs::fetch_subject_stats(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to fetch subject stats"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch subject stats"})), + ) + })?; + + let now = Utc::now(); + let multiplier = adaptive::time_of_day_multiplier(now); + + let subjects = raw_stats + .into_iter() + .map(|row| { + let stats: SubjectStats = row.into(); + let schedule = adaptive::evaluate_subject(&stats, now, false); + let base_interval = adaptive::compute_base_interval(&stats); + + let schedule_state = match &schedule { + SubjectSchedule::Eligible(_) => "eligible", + SubjectSchedule::Cooldown(_) => "cooldown", + SubjectSchedule::Paused => "paused", + SubjectSchedule::ReadOnly => "read_only", + }; + + let current_interval_secs = base_interval.as_secs() * multiplier as u64; + + SubjectSummary { + subject: stats.subject, + schedule_state: schedule_state.to_string(), + current_interval_secs, + time_multiplier: multiplier, + last_scraped: stats.last_completed, + avg_change_ratio: stats.avg_change_ratio, + consecutive_zero_changes: stats.consecutive_zero_changes, + recent_runs: stats.recent_runs, + recent_failures: stats.recent_failure_count, + } + }) + .collect(); + + Ok(Json(SubjectsResponse { subjects })) +} + +// --------------------------------------------------------------------------- +// Endpoint 4: GET /api/admin/scraper/subjects/{subject} +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct SubjectDetailParams { + #[serde(default = "default_detail_limit")] + limit: i32, +} + +fn default_detail_limit() -> i32 { + 50 +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectDetailResponse { + subject: String, + results: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubjectResultEntry { + id: i64, + completed_at: DateTime, + duration_ms: i32, + success: bool, + error_message: Option, + courses_fetched: Option, + courses_changed: Option, + courses_unchanged: Option, + audits_generated: Option, + metrics_generated: Option, +} + +pub async fn scraper_subject_detail( + _admin: AdminUser, + State(state): State, + Path(subject): Path, + Query(params): Query, +) -> Result, ApiError> { + let limit = params.limit.clamp(1, 200); + + let rows = sqlx::query( + "SELECT id, completed_at, duration_ms, success, error_message, \ + courses_fetched, courses_changed, courses_unchanged, \ + audits_generated, metrics_generated \ + FROM scrape_job_results \ + WHERE target_type = 'Subject' AND payload->>'subject' = $1 \ + ORDER BY completed_at DESC \ + LIMIT $2", + ) + .bind(&subject) + .bind(limit) + .fetch_all(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, subject = %subject, "Failed to fetch subject detail"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "Failed to fetch subject detail"})), + ) + })?; + + let results = rows + .iter() + .map(|row| SubjectResultEntry { + id: row.get("id"), + completed_at: row.get("completed_at"), + duration_ms: row.get("duration_ms"), + success: row.get("success"), + error_message: row.get("error_message"), + courses_fetched: row.get("courses_fetched"), + courses_changed: row.get("courses_changed"), + courses_unchanged: row.get("courses_unchanged"), + audits_generated: row.get("audits_generated"), + metrics_generated: row.get("metrics_generated"), + }) + .collect(); + + Ok(Json(SubjectDetailResponse { subject, results })) +} diff --git a/src/web/mod.rs b/src/web/mod.rs index 2a1e6fc..088843f 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -2,6 +2,7 @@ pub mod admin; pub mod admin_rmp; +pub mod admin_scraper; #[cfg(feature = "embed-assets")] pub mod assets; pub mod auth; diff --git a/src/web/routes.rs b/src/web/routes.rs index 5da990f..582c180 100644 --- a/src/web/routes.rs +++ b/src/web/routes.rs @@ -11,6 +11,7 @@ use axum::{ use crate::web::admin; use crate::web::admin_rmp; +use crate::web::admin_scraper; use crate::web::auth::{self, AuthConfig}; use crate::web::ws; #[cfg(feature = "embed-assets")] @@ -86,6 +87,19 @@ pub fn create_router(app_state: AppState, auth_config: AuthConfig) -> Router { post(admin_rmp::unmatch_instructor), ) .route("/admin/rmp/rescore", post(admin_rmp::rescore)) + .route("/admin/scraper/stats", get(admin_scraper::scraper_stats)) + .route( + "/admin/scraper/timeseries", + get(admin_scraper::scraper_timeseries), + ) + .route( + "/admin/scraper/subjects", + get(admin_scraper::scraper_subjects), + ) + .route( + "/admin/scraper/subjects/{subject}", + get(admin_scraper::scraper_subject_detail), + ) .with_state(app_state); let mut router = Router::new()