mirror of
https://github.com/Xevion/banner.git
synced 2026-01-31 00:23:31 -06:00
feat: add scraper analytics dashboard with timeseries and subject monitoring
This commit is contained in:
@@ -257,15 +257,15 @@ pub async fn fetch_subject_stats(db_pool: &PgPool) -> Result<Vec<SubjectResultSt
|
||||
FROM filtered GROUP BY subject
|
||||
)
|
||||
SELECT
|
||||
f.subject::TEXT AS "subject!",
|
||||
COUNT(*)::BIGINT AS "recent_runs!",
|
||||
f.subject::TEXT AS subject,
|
||||
COUNT(*)::BIGINT AS recent_runs,
|
||||
COALESCE(AVG(CASE WHEN f.success AND f.courses_fetched > 0
|
||||
THEN f.courses_changed::FLOAT / f.courses_fetched ELSE NULL END), 0.0)::FLOAT8 AS "avg_change_ratio!",
|
||||
COALESCE(zb.first_nonzero_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_changed = 0))::BIGINT AS "consecutive_zero_changes!",
|
||||
COALESCE(zb.first_nonempty_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_fetched = 0))::BIGINT AS "consecutive_empty_fetches!",
|
||||
COUNT(*) FILTER (WHERE NOT f.success)::BIGINT AS "recent_failure_count!",
|
||||
COUNT(*) FILTER (WHERE f.success)::BIGINT AS "recent_success_count!",
|
||||
MAX(f.completed_at) AS "last_completed!"
|
||||
THEN f.courses_changed::FLOAT / f.courses_fetched ELSE NULL END), 0.0)::FLOAT8 AS avg_change_ratio,
|
||||
COALESCE(zb.first_nonzero_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_changed = 0))::BIGINT AS consecutive_zero_changes,
|
||||
COALESCE(zb.first_nonempty_rn - 1, COUNT(*) FILTER (WHERE f.success AND f.courses_fetched = 0))::BIGINT AS consecutive_empty_fetches,
|
||||
COUNT(*) FILTER (WHERE NOT f.success)::BIGINT AS recent_failure_count,
|
||||
COUNT(*) FILTER (WHERE f.success)::BIGINT AS recent_success_count,
|
||||
MAX(f.completed_at) AS last_completed
|
||||
FROM filtered f
|
||||
LEFT JOIN zero_break zb ON f.subject = zb.subject
|
||||
GROUP BY f.subject, zb.first_nonzero_rn, zb.first_nonempty_rn
|
||||
|
||||
+116
-24
@@ -9,7 +9,9 @@ use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use sqlx::Row;
|
||||
use ts_rs::TS;
|
||||
|
||||
use crate::banner::models::terms::Term;
|
||||
use crate::data::scrape_jobs;
|
||||
use crate::scraper::adaptive::{self, SubjectSchedule, SubjectStats};
|
||||
use crate::state::AppState;
|
||||
@@ -85,19 +87,28 @@ fn default_period() -> String {
|
||||
"24h".to_string()
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ScraperStatsResponse {
|
||||
period: String,
|
||||
#[ts(type = "number")]
|
||||
total_scrapes: i64,
|
||||
#[ts(type = "number")]
|
||||
successful_scrapes: i64,
|
||||
#[ts(type = "number")]
|
||||
failed_scrapes: i64,
|
||||
success_rate: f64,
|
||||
avg_duration_ms: f64,
|
||||
success_rate: Option<f64>,
|
||||
avg_duration_ms: Option<f64>,
|
||||
#[ts(type = "number")]
|
||||
total_courses_changed: i64,
|
||||
#[ts(type = "number")]
|
||||
total_courses_fetched: i64,
|
||||
#[ts(type = "number")]
|
||||
total_audits_generated: i64,
|
||||
#[ts(type = "number")]
|
||||
pending_jobs: i64,
|
||||
#[ts(type = "number")]
|
||||
locked_jobs: i64,
|
||||
}
|
||||
|
||||
@@ -114,7 +125,7 @@ pub async fn scraper_stats(
|
||||
COUNT(*) AS total_scrapes, \
|
||||
COUNT(*) FILTER (WHERE success) AS successful_scrapes, \
|
||||
COUNT(*) FILTER (WHERE NOT success) AS failed_scrapes, \
|
||||
COALESCE(AVG(duration_ms) FILTER (WHERE success), 0) AS avg_duration_ms, \
|
||||
(AVG(duration_ms) FILTER (WHERE success))::FLOAT8 AS avg_duration_ms, \
|
||||
COALESCE(SUM(courses_changed) FILTER (WHERE success), 0) AS total_courses_changed, \
|
||||
COALESCE(SUM(courses_fetched) FILTER (WHERE success), 0) AS total_courses_fetched, \
|
||||
COALESCE(SUM(audits_generated) FILTER (WHERE success), 0) AS total_audits_generated \
|
||||
@@ -135,7 +146,7 @@ pub async fn scraper_stats(
|
||||
let total_scrapes: i64 = row.get("total_scrapes");
|
||||
let successful_scrapes: i64 = row.get("successful_scrapes");
|
||||
let failed_scrapes: i64 = row.get("failed_scrapes");
|
||||
let avg_duration_ms: f64 = row.get("avg_duration_ms");
|
||||
let avg_duration_ms: Option<f64> = row.get("avg_duration_ms");
|
||||
let total_courses_changed: i64 = row.get("total_courses_changed");
|
||||
let total_courses_fetched: i64 = row.get("total_courses_fetched");
|
||||
let total_audits_generated: i64 = row.get("total_audits_generated");
|
||||
@@ -160,9 +171,9 @@ pub async fn scraper_stats(
|
||||
let locked_jobs: i64 = queue_row.get("locked_jobs");
|
||||
|
||||
let success_rate = if total_scrapes > 0 {
|
||||
successful_scrapes as f64 / total_scrapes as f64
|
||||
Some(successful_scrapes as f64 / total_scrapes as f64)
|
||||
} else {
|
||||
0.0
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Json(ScraperStatsResponse {
|
||||
@@ -191,7 +202,8 @@ pub struct TimeseriesParams {
|
||||
bucket: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TimeseriesResponse {
|
||||
period: String,
|
||||
@@ -199,13 +211,18 @@ pub struct TimeseriesResponse {
|
||||
points: Vec<TimeseriesPoint>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TimeseriesPoint {
|
||||
timestamp: DateTime<Utc>,
|
||||
#[ts(type = "number")]
|
||||
scrape_count: i64,
|
||||
#[ts(type = "number")]
|
||||
success_count: i64,
|
||||
#[ts(type = "number")]
|
||||
error_count: i64,
|
||||
#[ts(type = "number")]
|
||||
courses_changed: i64,
|
||||
avg_duration_ms: f64,
|
||||
}
|
||||
@@ -229,17 +246,33 @@ pub async fn scraper_timeseries(
|
||||
let bucket_interval = parse_bucket(bucket_code)?;
|
||||
|
||||
let rows = sqlx::query(
|
||||
"SELECT \
|
||||
date_bin($1::interval, completed_at, '2020-01-01'::timestamptz) AS bucket_start, \
|
||||
COUNT(*)::BIGINT AS scrape_count, \
|
||||
COUNT(*) FILTER (WHERE success)::BIGINT AS success_count, \
|
||||
COUNT(*) FILTER (WHERE NOT success)::BIGINT AS error_count, \
|
||||
COALESCE(SUM(courses_changed) FILTER (WHERE success), 0)::BIGINT AS courses_changed, \
|
||||
COALESCE(AVG(duration_ms) FILTER (WHERE success), 0)::FLOAT8 AS avg_duration_ms \
|
||||
FROM scrape_job_results \
|
||||
WHERE completed_at > NOW() - $2::interval \
|
||||
GROUP BY bucket_start \
|
||||
ORDER BY bucket_start",
|
||||
"WITH buckets AS ( \
|
||||
SELECT generate_series( \
|
||||
date_bin($1::interval, NOW() - $2::interval, '2020-01-01'::timestamptz), \
|
||||
date_bin($1::interval, NOW(), '2020-01-01'::timestamptz), \
|
||||
$1::interval \
|
||||
) AS bucket_start \
|
||||
), \
|
||||
raw AS ( \
|
||||
SELECT date_bin($1::interval, completed_at, '2020-01-01'::timestamptz) AS bucket_start, \
|
||||
COUNT(*)::BIGINT AS scrape_count, \
|
||||
COUNT(*) FILTER (WHERE success)::BIGINT AS success_count, \
|
||||
COUNT(*) FILTER (WHERE NOT success)::BIGINT AS error_count, \
|
||||
COALESCE(SUM(courses_changed) FILTER (WHERE success), 0)::BIGINT AS courses_changed, \
|
||||
COALESCE(AVG(duration_ms) FILTER (WHERE success), 0)::FLOAT8 AS avg_duration_ms \
|
||||
FROM scrape_job_results \
|
||||
WHERE completed_at > NOW() - $2::interval \
|
||||
GROUP BY 1 \
|
||||
) \
|
||||
SELECT b.bucket_start, \
|
||||
COALESCE(r.scrape_count, 0) AS scrape_count, \
|
||||
COALESCE(r.success_count, 0) AS success_count, \
|
||||
COALESCE(r.error_count, 0) AS error_count, \
|
||||
COALESCE(r.courses_changed, 0) AS courses_changed, \
|
||||
COALESCE(r.avg_duration_ms, 0) AS avg_duration_ms \
|
||||
FROM buckets b \
|
||||
LEFT JOIN raw r ON b.bucket_start = r.bucket_start \
|
||||
ORDER BY b.bucket_start",
|
||||
)
|
||||
.bind(bucket_interval)
|
||||
.bind(period_interval)
|
||||
@@ -276,23 +309,35 @@ pub async fn scraper_timeseries(
|
||||
// Endpoint 3: GET /api/admin/scraper/subjects
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SubjectsResponse {
|
||||
subjects: Vec<SubjectSummary>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SubjectSummary {
|
||||
subject: String,
|
||||
subject_description: Option<String>,
|
||||
#[ts(type = "number")]
|
||||
tracked_course_count: i64,
|
||||
schedule_state: String,
|
||||
#[ts(type = "number")]
|
||||
current_interval_secs: u64,
|
||||
time_multiplier: u32,
|
||||
last_scraped: DateTime<Utc>,
|
||||
next_eligible_at: Option<DateTime<Utc>>,
|
||||
#[ts(type = "number | null")]
|
||||
cooldown_remaining_secs: Option<u64>,
|
||||
avg_change_ratio: f64,
|
||||
#[ts(type = "number")]
|
||||
consecutive_zero_changes: i64,
|
||||
#[ts(type = "number")]
|
||||
recent_runs: i64,
|
||||
#[ts(type = "number")]
|
||||
recent_failures: i64,
|
||||
}
|
||||
|
||||
@@ -313,6 +358,28 @@ pub async fn scraper_subjects(
|
||||
let now = Utc::now();
|
||||
let multiplier = adaptive::time_of_day_multiplier(now);
|
||||
|
||||
// Look up subject descriptions from the reference cache
|
||||
let ref_cache = state.reference_cache.read().await;
|
||||
|
||||
// Count tracked courses per subject for the current term
|
||||
let term = Term::get_current().inner().to_string();
|
||||
let course_counts: std::collections::HashMap<String, i64> = sqlx::query_as(
|
||||
"SELECT subject, COUNT(*)::BIGINT AS cnt FROM courses WHERE term_code = $1 GROUP BY subject",
|
||||
)
|
||||
.bind(&term)
|
||||
.fetch_all(&state.db_pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!(error = %e, "Failed to fetch course counts");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!({"error": "Failed to fetch course counts"})),
|
||||
)
|
||||
})?
|
||||
.into_iter()
|
||||
.map(|(subject, cnt): (String, i64)| (subject, cnt))
|
||||
.collect();
|
||||
|
||||
let subjects = raw_stats
|
||||
.into_iter()
|
||||
.map(|row| {
|
||||
@@ -329,12 +396,34 @@ pub async fn scraper_subjects(
|
||||
|
||||
let current_interval_secs = base_interval.as_secs() * multiplier as u64;
|
||||
|
||||
let (next_eligible_at, cooldown_remaining_secs) = match &schedule {
|
||||
SubjectSchedule::Eligible(_) => (Some(now), Some(0)),
|
||||
SubjectSchedule::Cooldown(remaining) => {
|
||||
let remaining_secs = remaining.as_secs();
|
||||
(
|
||||
Some(now + chrono::Duration::seconds(remaining_secs as i64)),
|
||||
Some(remaining_secs),
|
||||
)
|
||||
}
|
||||
SubjectSchedule::Paused | SubjectSchedule::ReadOnly => (None, None),
|
||||
};
|
||||
|
||||
let subject_description = ref_cache
|
||||
.lookup("subject", &stats.subject)
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let tracked_course_count = course_counts.get(&stats.subject).copied().unwrap_or(0);
|
||||
|
||||
SubjectSummary {
|
||||
subject: stats.subject,
|
||||
subject_description,
|
||||
tracked_course_count,
|
||||
schedule_state: schedule_state.to_string(),
|
||||
current_interval_secs,
|
||||
time_multiplier: multiplier,
|
||||
last_scraped: stats.last_completed,
|
||||
next_eligible_at,
|
||||
cooldown_remaining_secs,
|
||||
avg_change_ratio: stats.avg_change_ratio,
|
||||
consecutive_zero_changes: stats.consecutive_zero_changes,
|
||||
recent_runs: stats.recent_runs,
|
||||
@@ -360,16 +449,19 @@ fn default_detail_limit() -> i32 {
|
||||
50
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SubjectDetailResponse {
|
||||
subject: String,
|
||||
results: Vec<SubjectResultEntry>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SubjectResultEntry {
|
||||
#[ts(type = "number")]
|
||||
id: i64,
|
||||
completed_at: DateTime<Utc>,
|
||||
duration_ms: i32,
|
||||
|
||||
Reference in New Issue
Block a user