From 98a6d978c6020c9df36b300017f3cb21a99070bc Mon Sep 17 00:00:00 2001 From: Xevion Date: Thu, 29 Jan 2026 14:19:36 -0600 Subject: [PATCH] feat: implement course change auditing with time-series metrics endpoint --- src/data/batch.rs | 516 +++++++++++++++++++++++++++++++++------ src/web/routes.rs | 126 +++++++++- tests/db_batch_upsert.rs | 113 +++++++++ web/src/lib/api.ts | 34 +++ 4 files changed, 703 insertions(+), 86 deletions(-) diff --git a/src/data/batch.rs b/src/data/batch.rs index e9c55f6..a12817e 100644 --- a/src/data/batch.rs +++ b/src/data/batch.rs @@ -3,6 +3,7 @@ use crate::banner::Course; use crate::data::models::DbMeetingTime; use crate::error::Result; +use sqlx::PgConnection; use sqlx::PgPool; use std::collections::HashSet; use std::time::Instant; @@ -57,15 +58,315 @@ fn extract_campus_code(course: &Course) -> Option { .and_then(|mf| mf.meeting_time.campus.clone()) } +// --------------------------------------------------------------------------- +// Task 1: UpsertDiffRow — captures pre- and post-upsert state for diffing +// --------------------------------------------------------------------------- + +/// Row returned by the CTE-based upsert query, carrying both old and new values +/// for every auditable field. `old_id` is `None` for fresh inserts. +#[derive(sqlx::FromRow, Debug)] +struct UpsertDiffRow { + id: i32, + old_id: Option, + + // enrollment fields + old_enrollment: Option, + new_enrollment: i32, + old_max_enrollment: Option, + new_max_enrollment: i32, + old_wait_count: Option, + new_wait_count: i32, + old_wait_capacity: Option, + new_wait_capacity: i32, + + // text fields (non-nullable in DB) + old_subject: Option, + new_subject: String, + old_course_number: Option, + new_course_number: String, + old_title: Option, + new_title: String, + + // nullable text fields + old_sequence_number: Option, + new_sequence_number: Option, + old_part_of_term: Option, + new_part_of_term: Option, + old_instructional_method: Option, + new_instructional_method: Option, + old_campus: Option, + new_campus: Option, + + // nullable int fields + old_credit_hours: Option, + new_credit_hours: Option, + old_credit_hour_low: Option, + new_credit_hour_low: Option, + old_credit_hour_high: Option, + new_credit_hour_high: Option, + + // cross-list fields + old_cross_list: Option, + new_cross_list: Option, + old_cross_list_capacity: Option, + new_cross_list_capacity: Option, + old_cross_list_count: Option, + new_cross_list_count: Option, + + // link fields + old_link_identifier: Option, + new_link_identifier: Option, + old_is_section_linked: Option, + new_is_section_linked: Option, + + // JSONB fields + old_meeting_times: Option, + new_meeting_times: serde_json::Value, + old_attributes: Option, + new_attributes: serde_json::Value, +} + +// --------------------------------------------------------------------------- +// Task 3: Entry types and diff logic +// --------------------------------------------------------------------------- + +struct AuditEntry { + course_id: i32, + field_changed: &'static str, + old_value: String, + new_value: String, +} + +struct MetricEntry { + course_id: i32, + enrollment: i32, + wait_count: i32, + seats_available: i32, +} + +/// Compare old vs new for a single field, pushing an `AuditEntry` when they differ. +/// +/// Three variants: +/// - `diff_field!(audits, row, field_name, old_field, new_field)` — `Option` old vs `T` new +/// - `diff_field!(opt audits, row, field_name, old_field, new_field)` — `Option` old vs `Option` new +/// - `diff_field!(json audits, row, field_name, old_field, new_field)` — `Option` old vs `Value` new +/// +/// All variants skip when `old_id` is None (fresh insert). +macro_rules! diff_field { + // Standard: Option old vs T new (non-nullable columns) + ($audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => { + if $row.old_id.is_some() { + let old_str = $row + .$old + .as_ref() + .map(|v| v.to_string()) + .unwrap_or_default(); + let new_str = $row.$new.to_string(); + if old_str != new_str { + $audits.push(AuditEntry { + course_id: $row.id, + field_changed: $field, + old_value: old_str, + new_value: new_str, + }); + } + } + }; + // Nullable: Option old vs Option new + (opt $audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => { + if $row.old_id.is_some() { + let old_str = $row + .$old + .as_ref() + .map(|v| v.to_string()) + .unwrap_or_default(); + let new_str = $row + .$new + .as_ref() + .map(|v| v.to_string()) + .unwrap_or_default(); + if old_str != new_str { + $audits.push(AuditEntry { + course_id: $row.id, + field_changed: $field, + old_value: old_str, + new_value: new_str, + }); + } + } + }; + // JSONB: Option old vs Value new + (json $audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => { + if $row.old_id.is_some() { + let old_val = $row + .$old + .as_ref() + .cloned() + .unwrap_or(serde_json::Value::Null); + let new_val = &$row.$new; + if old_val != *new_val { + $audits.push(AuditEntry { + course_id: $row.id, + field_changed: $field, + old_value: old_val.to_string(), + new_value: new_val.to_string(), + }); + } + } + }; +} + +/// Compute audit entries (field-level diffs) and metric entries from upsert diff rows. +fn compute_diffs(rows: &[UpsertDiffRow]) -> (Vec, Vec) { + let mut audits = Vec::new(); + let mut metrics = Vec::new(); + + for row in rows { + // Non-nullable fields + diff_field!(audits, row, "enrollment", old_enrollment, new_enrollment); + diff_field!( + audits, + row, + "max_enrollment", + old_max_enrollment, + new_max_enrollment + ); + diff_field!(audits, row, "wait_count", old_wait_count, new_wait_count); + diff_field!( + audits, + row, + "wait_capacity", + old_wait_capacity, + new_wait_capacity + ); + diff_field!(audits, row, "subject", old_subject, new_subject); + diff_field!( + audits, + row, + "course_number", + old_course_number, + new_course_number + ); + diff_field!(audits, row, "title", old_title, new_title); + + // Nullable text fields + diff_field!(opt audits, row, "sequence_number", old_sequence_number, new_sequence_number); + diff_field!(opt audits, row, "part_of_term", old_part_of_term, new_part_of_term); + diff_field!(opt audits, row, "instructional_method", old_instructional_method, new_instructional_method); + diff_field!(opt audits, row, "campus", old_campus, new_campus); + + // Nullable int fields + diff_field!(opt audits, row, "credit_hours", old_credit_hours, new_credit_hours); + diff_field!(opt audits, row, "credit_hour_low", old_credit_hour_low, new_credit_hour_low); + diff_field!(opt audits, row, "credit_hour_high", old_credit_hour_high, new_credit_hour_high); + + // Cross-list fields + diff_field!(opt audits, row, "cross_list", old_cross_list, new_cross_list); + diff_field!(opt audits, row, "cross_list_capacity", old_cross_list_capacity, new_cross_list_capacity); + diff_field!(opt audits, row, "cross_list_count", old_cross_list_count, new_cross_list_count); + + // Link fields + diff_field!(opt audits, row, "link_identifier", old_link_identifier, new_link_identifier); + diff_field!(opt audits, row, "is_section_linked", old_is_section_linked, new_is_section_linked); + + // JSONB fields + diff_field!(json audits, row, "meeting_times", old_meeting_times, new_meeting_times); + diff_field!(json audits, row, "attributes", old_attributes, new_attributes); + + // Emit a metric entry when enrollment/wait_count/max_enrollment changed + // Skip fresh inserts (no old data to compare against) + let enrollment_changed = row.old_id.is_some() + && (row.old_enrollment != Some(row.new_enrollment) + || row.old_wait_count != Some(row.new_wait_count) + || row.old_max_enrollment != Some(row.new_max_enrollment)); + + if enrollment_changed { + metrics.push(MetricEntry { + course_id: row.id, + enrollment: row.new_enrollment, + wait_count: row.new_wait_count, + seats_available: row.new_max_enrollment - row.new_enrollment, + }); + } + } + + (audits, metrics) +} + +// --------------------------------------------------------------------------- +// Task 4: Batch insert functions for audits and metrics +// --------------------------------------------------------------------------- + +async fn insert_audits(audits: &[AuditEntry], conn: &mut PgConnection) -> Result<()> { + if audits.is_empty() { + return Ok(()); + } + + let course_ids: Vec = audits.iter().map(|a| a.course_id).collect(); + let fields: Vec<&str> = audits.iter().map(|a| a.field_changed).collect(); + let old_values: Vec<&str> = audits.iter().map(|a| a.old_value.as_str()).collect(); + let new_values: Vec<&str> = audits.iter().map(|a| a.new_value.as_str()).collect(); + + sqlx::query( + r#" + INSERT INTO course_audits (course_id, timestamp, field_changed, old_value, new_value) + SELECT v.course_id, NOW(), v.field_changed, v.old_value, v.new_value + FROM UNNEST($1::int4[], $2::text[], $3::text[], $4::text[]) + AS v(course_id, field_changed, old_value, new_value) + "#, + ) + .bind(&course_ids) + .bind(&fields) + .bind(&old_values) + .bind(&new_values) + .execute(&mut *conn) + .await + .map_err(|e| anyhow::anyhow!("Failed to batch insert course_audits: {}", e))?; + + Ok(()) +} + +async fn insert_metrics(metrics: &[MetricEntry], conn: &mut PgConnection) -> Result<()> { + if metrics.is_empty() { + return Ok(()); + } + + let course_ids: Vec = metrics.iter().map(|m| m.course_id).collect(); + let enrollments: Vec = metrics.iter().map(|m| m.enrollment).collect(); + let wait_counts: Vec = metrics.iter().map(|m| m.wait_count).collect(); + let seats_available: Vec = metrics.iter().map(|m| m.seats_available).collect(); + + sqlx::query( + r#" + INSERT INTO course_metrics (course_id, timestamp, enrollment, wait_count, seats_available) + SELECT v.course_id, NOW(), v.enrollment, v.wait_count, v.seats_available + FROM UNNEST($1::int4[], $2::int4[], $3::int4[], $4::int4[]) + AS v(course_id, enrollment, wait_count, seats_available) + "#, + ) + .bind(&course_ids) + .bind(&enrollments) + .bind(&wait_counts) + .bind(&seats_available) + .execute(&mut *conn) + .await + .map_err(|e| anyhow::anyhow!("Failed to batch insert course_metrics: {}", e))?; + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Core upsert functions (updated to use &mut PgConnection) +// --------------------------------------------------------------------------- + /// Batch upsert courses in a single database query. /// /// Performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses, including -/// new fields (meeting times, attributes, instructor data). Returns the -/// database IDs for all upserted courses (in input order) so instructors -/// can be linked. +/// new fields (meeting times, attributes, instructor data). Captures pre-update +/// state for audit/metric tracking, all within a single transaction. /// /// # Performance -/// - Reduces N database round-trips to 3 (courses, instructors, junction) +/// - Reduces N database round-trips to 5 (old-data CTE + upsert, audits, metrics, instructors, junction) /// - Typical usage: 50-200 courses per batch pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> { if courses.is_empty() { @@ -76,27 +377,47 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul let start = Instant::now(); let course_count = courses.len(); - // Step 1: Upsert courses with all fields, returning IDs - let course_ids = upsert_courses(courses, db_pool).await?; + let mut tx = db_pool.begin().await?; - // Step 2: Upsert instructors (deduplicated across batch) - upsert_instructors(courses, db_pool).await?; + // Step 1: Upsert courses with CTE, returning diff rows + let diff_rows = upsert_courses(courses, &mut tx).await?; - // Step 3: Link courses to instructors via junction table - upsert_course_instructors(courses, &course_ids, db_pool).await?; + // Step 2: Extract course IDs for instructor linking + let course_ids: Vec = diff_rows.iter().map(|r| r.id).collect(); + + // Step 3: Compute audit/metric diffs + let (audits, metrics) = compute_diffs(&diff_rows); + + // Step 4: Insert audits and metrics + insert_audits(&audits, &mut tx).await?; + insert_metrics(&metrics, &mut tx).await?; + + // Step 5: Upsert instructors (deduplicated across batch) + upsert_instructors(courses, &mut tx).await?; + + // Step 6: Link courses to instructors via junction table + upsert_course_instructors(courses, &course_ids, &mut tx).await?; + + tx.commit().await?; let duration = start.elapsed(); info!( courses_count = course_count, + audit_entries = audits.len(), + metric_entries = metrics.len(), duration_ms = duration.as_millis(), - "Batch upserted courses with instructors" + "Batch upserted courses with instructors, audits, and metrics" ); Ok(()) } -/// Upsert all courses and return their database IDs in input order. -async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result> { +// --------------------------------------------------------------------------- +// Task 2: CTE-based upsert returning old+new values +// --------------------------------------------------------------------------- + +/// Upsert all courses and return diff rows with old and new values for auditing. +async fn upsert_courses(courses: &[Course], conn: &mut PgConnection) -> Result> { let crns: Vec<&str> = courses .iter() .map(|c| c.course_reference_number.as_str()) @@ -143,67 +464,106 @@ async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result courses.iter().map(to_db_meeting_times).collect(); let attributes_json: Vec = courses.iter().map(to_db_attributes).collect(); - let rows = sqlx::query_scalar::<_, i32>( + let rows = sqlx::query_as::<_, UpsertDiffRow>( r#" - INSERT INTO courses ( - crn, subject, course_number, title, term_code, - enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at, - sequence_number, part_of_term, instructional_method, campus, - credit_hours, credit_hour_low, credit_hour_high, - cross_list, cross_list_capacity, cross_list_count, - link_identifier, is_section_linked, - meeting_times, attributes + WITH old_data AS ( + SELECT id, enrollment, max_enrollment, wait_count, wait_capacity, + subject, course_number, title, + sequence_number, part_of_term, instructional_method, campus, + credit_hours, credit_hour_low, credit_hour_high, + cross_list, cross_list_capacity, cross_list_count, + link_identifier, is_section_linked, + meeting_times, attributes, + crn, term_code + FROM courses + WHERE (crn, term_code) IN (SELECT * FROM UNNEST($1::text[], $5::text[])) + ), + upserted AS ( + INSERT INTO courses ( + crn, subject, course_number, title, term_code, + enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at, + sequence_number, part_of_term, instructional_method, campus, + credit_hours, credit_hour_low, credit_hour_high, + cross_list, cross_list_capacity, cross_list_count, + link_identifier, is_section_linked, + meeting_times, attributes + ) + SELECT + v.crn, v.subject, v.course_number, v.title, v.term_code, + v.enrollment, v.max_enrollment, v.wait_count, v.wait_capacity, NOW(), + v.sequence_number, v.part_of_term, v.instructional_method, v.campus, + v.credit_hours, v.credit_hour_low, v.credit_hour_high, + v.cross_list, v.cross_list_capacity, v.cross_list_count, + v.link_identifier, v.is_section_linked, + v.meeting_times, v.attributes + FROM UNNEST( + $1::text[], $2::text[], $3::text[], $4::text[], $5::text[], + $6::int4[], $7::int4[], $8::int4[], $9::int4[], + $10::text[], $11::text[], $12::text[], $13::text[], + $14::int4[], $15::int4[], $16::int4[], + $17::text[], $18::int4[], $19::int4[], + $20::text[], $21::bool[], + $22::jsonb[], $23::jsonb[] + ) AS v( + crn, subject, course_number, title, term_code, + enrollment, max_enrollment, wait_count, wait_capacity, + sequence_number, part_of_term, instructional_method, campus, + credit_hours, credit_hour_low, credit_hour_high, + cross_list, cross_list_capacity, cross_list_count, + link_identifier, is_section_linked, + meeting_times, attributes + ) + ON CONFLICT (crn, term_code) + DO UPDATE SET + subject = EXCLUDED.subject, + course_number = EXCLUDED.course_number, + title = EXCLUDED.title, + enrollment = EXCLUDED.enrollment, + max_enrollment = EXCLUDED.max_enrollment, + wait_count = EXCLUDED.wait_count, + wait_capacity = EXCLUDED.wait_capacity, + last_scraped_at = EXCLUDED.last_scraped_at, + sequence_number = EXCLUDED.sequence_number, + part_of_term = EXCLUDED.part_of_term, + instructional_method = EXCLUDED.instructional_method, + campus = EXCLUDED.campus, + credit_hours = EXCLUDED.credit_hours, + credit_hour_low = EXCLUDED.credit_hour_low, + credit_hour_high = EXCLUDED.credit_hour_high, + cross_list = EXCLUDED.cross_list, + cross_list_capacity = EXCLUDED.cross_list_capacity, + cross_list_count = EXCLUDED.cross_list_count, + link_identifier = EXCLUDED.link_identifier, + is_section_linked = EXCLUDED.is_section_linked, + meeting_times = EXCLUDED.meeting_times, + attributes = EXCLUDED.attributes + RETURNING * ) - SELECT - v.crn, v.subject, v.course_number, v.title, v.term_code, - v.enrollment, v.max_enrollment, v.wait_count, v.wait_capacity, NOW(), - v.sequence_number, v.part_of_term, v.instructional_method, v.campus, - v.credit_hours, v.credit_hour_low, v.credit_hour_high, - v.cross_list, v.cross_list_capacity, v.cross_list_count, - v.link_identifier, v.is_section_linked, - v.meeting_times, v.attributes - FROM UNNEST( - $1::text[], $2::text[], $3::text[], $4::text[], $5::text[], - $6::int4[], $7::int4[], $8::int4[], $9::int4[], - $10::text[], $11::text[], $12::text[], $13::text[], - $14::int4[], $15::int4[], $16::int4[], - $17::text[], $18::int4[], $19::int4[], - $20::text[], $21::bool[], - $22::jsonb[], $23::jsonb[] - ) AS v( - crn, subject, course_number, title, term_code, - enrollment, max_enrollment, wait_count, wait_capacity, - sequence_number, part_of_term, instructional_method, campus, - credit_hours, credit_hour_low, credit_hour_high, - cross_list, cross_list_capacity, cross_list_count, - link_identifier, is_section_linked, - meeting_times, attributes - ) - ON CONFLICT (crn, term_code) - DO UPDATE SET - subject = EXCLUDED.subject, - course_number = EXCLUDED.course_number, - title = EXCLUDED.title, - enrollment = EXCLUDED.enrollment, - max_enrollment = EXCLUDED.max_enrollment, - wait_count = EXCLUDED.wait_count, - wait_capacity = EXCLUDED.wait_capacity, - last_scraped_at = EXCLUDED.last_scraped_at, - sequence_number = EXCLUDED.sequence_number, - part_of_term = EXCLUDED.part_of_term, - instructional_method = EXCLUDED.instructional_method, - campus = EXCLUDED.campus, - credit_hours = EXCLUDED.credit_hours, - credit_hour_low = EXCLUDED.credit_hour_low, - credit_hour_high = EXCLUDED.credit_hour_high, - cross_list = EXCLUDED.cross_list, - cross_list_capacity = EXCLUDED.cross_list_capacity, - cross_list_count = EXCLUDED.cross_list_count, - link_identifier = EXCLUDED.link_identifier, - is_section_linked = EXCLUDED.is_section_linked, - meeting_times = EXCLUDED.meeting_times, - attributes = EXCLUDED.attributes - RETURNING id + SELECT u.id, + o.id AS old_id, + o.enrollment AS old_enrollment, u.enrollment AS new_enrollment, + o.max_enrollment AS old_max_enrollment, u.max_enrollment AS new_max_enrollment, + o.wait_count AS old_wait_count, u.wait_count AS new_wait_count, + o.wait_capacity AS old_wait_capacity, u.wait_capacity AS new_wait_capacity, + o.subject AS old_subject, u.subject AS new_subject, + o.course_number AS old_course_number, u.course_number AS new_course_number, + o.title AS old_title, u.title AS new_title, + o.sequence_number AS old_sequence_number, u.sequence_number AS new_sequence_number, + o.part_of_term AS old_part_of_term, u.part_of_term AS new_part_of_term, + o.instructional_method AS old_instructional_method, u.instructional_method AS new_instructional_method, + o.campus AS old_campus, u.campus AS new_campus, + o.credit_hours AS old_credit_hours, u.credit_hours AS new_credit_hours, + o.credit_hour_low AS old_credit_hour_low, u.credit_hour_low AS new_credit_hour_low, + o.credit_hour_high AS old_credit_hour_high, u.credit_hour_high AS new_credit_hour_high, + o.cross_list AS old_cross_list, u.cross_list AS new_cross_list, + o.cross_list_capacity AS old_cross_list_capacity, u.cross_list_capacity AS new_cross_list_capacity, + o.cross_list_count AS old_cross_list_count, u.cross_list_count AS new_cross_list_count, + o.link_identifier AS old_link_identifier, u.link_identifier AS new_link_identifier, + o.is_section_linked AS old_is_section_linked, u.is_section_linked AS new_is_section_linked, + o.meeting_times AS old_meeting_times, u.meeting_times AS new_meeting_times, + o.attributes AS old_attributes, u.attributes AS new_attributes + FROM upserted u + LEFT JOIN old_data o ON u.crn = o.crn AND u.term_code = o.term_code "#, ) .bind(&crns) @@ -229,7 +589,7 @@ async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result .bind(&is_section_linkeds) .bind(&meeting_times_json) .bind(&attributes_json) - .fetch_all(db_pool) + .fetch_all(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?; @@ -237,7 +597,7 @@ async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result } /// Deduplicate and upsert all instructors from the batch. -async fn upsert_instructors(courses: &[Course], db_pool: &PgPool) -> Result<()> { +async fn upsert_instructors(courses: &[Course], conn: &mut PgConnection) -> Result<()> { let mut seen = HashSet::new(); let mut banner_ids = Vec::new(); let mut display_names = Vec::new(); @@ -270,7 +630,7 @@ async fn upsert_instructors(courses: &[Course], db_pool: &PgPool) -> Result<()> .bind(&banner_ids) .bind(&display_names) .bind(&emails) - .execute(db_pool) + .execute(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch upsert instructors: {}", e))?; @@ -281,7 +641,7 @@ async fn upsert_instructors(courses: &[Course], db_pool: &PgPool) -> Result<()> async fn upsert_course_instructors( courses: &[Course], course_ids: &[i32], - db_pool: &PgPool, + conn: &mut PgConnection, ) -> Result<()> { let mut cids = Vec::new(); let mut iids = Vec::new(); @@ -303,7 +663,7 @@ async fn upsert_course_instructors( // This handles instructor changes cleanly. sqlx::query("DELETE FROM course_instructors WHERE course_id = ANY($1)") .bind(&cids) - .execute(db_pool) + .execute(&mut *conn) .await?; sqlx::query( @@ -317,7 +677,7 @@ async fn upsert_course_instructors( .bind(&cids) .bind(&iids) .bind(&primaries) - .execute(db_pool) + .execute(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch upsert course_instructors: {}", e))?; diff --git a/src/web/routes.rs b/src/web/routes.rs index e385152..9646ca8 100644 --- a/src/web/routes.rs +++ b/src/web/routes.rs @@ -244,20 +244,130 @@ async fn status(State(state): State) -> Json { } /// Metrics endpoint for monitoring -async fn metrics() -> Json { - // For now, return basic metrics structure - Json(json!({ - "banner_api": { - "status": "connected" - }, - "timestamp": chrono::Utc::now().to_rfc3339() - })) +async fn metrics( + State(state): State, + Query(params): Query, +) -> Result, (AxumStatusCode, String)> { + let limit = params.limit.clamp(1, 5000); + + // Parse range shorthand, defaulting to 24h + let range_str = params.range.as_deref().unwrap_or("24h"); + let duration = match range_str { + "1h" => chrono::Duration::hours(1), + "6h" => chrono::Duration::hours(6), + "24h" => chrono::Duration::hours(24), + "7d" => chrono::Duration::days(7), + "30d" => chrono::Duration::days(30), + _ => { + return Err(( + AxumStatusCode::BAD_REQUEST, + format!("Invalid range '{range_str}'. Valid: 1h, 6h, 24h, 7d, 30d"), + )); + } + }; + let since = chrono::Utc::now() - duration; + + // Resolve course_id: explicit param takes priority, then term+crn lookup + let course_id = if let Some(id) = params.course_id { + Some(id) + } else if let (Some(term), Some(crn)) = (params.term.as_deref(), params.crn.as_deref()) { + let row: Option<(i32,)> = + sqlx::query_as("SELECT id FROM courses WHERE term_code = $1 AND crn = $2") + .bind(term) + .bind(crn) + .fetch_optional(&state.db_pool) + .await + .map_err(|e| { + tracing::error!(error = %e, "Course lookup for metrics failed"); + ( + AxumStatusCode::INTERNAL_SERVER_ERROR, + "Course lookup failed".to_string(), + ) + })?; + row.map(|(id,)| id) + } else { + None + }; + + // Build query dynamically based on filters + let metrics: Vec<(i32, i32, chrono::DateTime, i32, i32, i32)> = + if let Some(cid) = course_id { + sqlx::query_as( + "SELECT id, course_id, timestamp, enrollment, wait_count, seats_available \ + FROM course_metrics \ + WHERE course_id = $1 AND timestamp >= $2 \ + ORDER BY timestamp DESC \ + LIMIT $3", + ) + .bind(cid) + .bind(since) + .bind(limit) + .fetch_all(&state.db_pool) + .await + } else { + sqlx::query_as( + "SELECT id, course_id, timestamp, enrollment, wait_count, seats_available \ + FROM course_metrics \ + WHERE timestamp >= $1 \ + ORDER BY timestamp DESC \ + LIMIT $2", + ) + .bind(since) + .bind(limit) + .fetch_all(&state.db_pool) + .await + } + .map_err(|e| { + tracing::error!(error = %e, "Metrics query failed"); + ( + AxumStatusCode::INTERNAL_SERVER_ERROR, + "Metrics query failed".to_string(), + ) + })?; + + let count = metrics.len(); + let metrics_json: Vec = metrics + .into_iter() + .map( + |(id, course_id, timestamp, enrollment, wait_count, seats_available)| { + json!({ + "id": id, + "courseId": course_id, + "timestamp": timestamp.to_rfc3339(), + "enrollment": enrollment, + "waitCount": wait_count, + "seatsAvailable": seats_available, + }) + }, + ) + .collect(); + + Ok(Json(json!({ + "metrics": metrics_json, + "count": count, + "timestamp": chrono::Utc::now().to_rfc3339(), + }))) } // ============================================================ // Course search & detail API // ============================================================ +#[derive(Deserialize)] +struct MetricsParams { + course_id: Option, + term: Option, + crn: Option, + /// Shorthand durations: "1h", "6h", "24h", "7d", "30d" + range: Option, + #[serde(default = "default_metrics_limit")] + limit: i32, +} + +fn default_metrics_limit() -> i32 { + 500 +} + #[derive(Deserialize)] struct SubjectsParams { term: String, diff --git a/tests/db_batch_upsert.rs b/tests/db_batch_upsert.rs index 771ab40..1c85b19 100644 --- a/tests/db_batch_upsert.rs +++ b/tests/db_batch_upsert.rs @@ -210,3 +210,116 @@ async fn test_batch_upsert_unique_constraint_crn_term(pool: PgPool) { assert_eq!(rows[1].0, "202520"); assert_eq!(rows[1].1, 10); } + +#[sqlx::test] +async fn test_batch_upsert_creates_audit_and_metric_entries(pool: PgPool) { + // Insert initial data — should NOT create audits/metrics (it's a fresh insert) + let initial = vec![helpers::make_course( + "50001", + "202510", + "CS", + "3443", + "App Programming", + 10, + 35, + 0, + 5, + )]; + batch_upsert_courses(&initial, &pool).await.unwrap(); + + let (audit_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM course_audits") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + audit_count, 0, + "initial insert should not create audit entries" + ); + + let (metric_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM course_metrics") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + metric_count, 0, + "initial insert should not create metric entries" + ); + + // Update enrollment and wait_count + let updated = vec![helpers::make_course( + "50001", + "202510", + "CS", + "3443", + "App Programming", + 20, + 35, + 2, + 5, + )]; + batch_upsert_courses(&updated, &pool).await.unwrap(); + + // Should have audit entries for enrollment and wait_count changes + let (audit_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM course_audits") + .fetch_one(&pool) + .await + .unwrap(); + assert!( + audit_count >= 2, + "should have audit entries for enrollment and wait_count changes, got {audit_count}" + ); + + // Should have exactly 1 metric entry + let (metric_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM course_metrics") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(metric_count, 1, "should have 1 metric snapshot"); + + // Verify metric values + let (enrollment, wait_count, seats): (i32, i32, i32) = sqlx::query_as( + "SELECT enrollment, wait_count, seats_available FROM course_metrics LIMIT 1", + ) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(enrollment, 20); + assert_eq!(wait_count, 2); + assert_eq!(seats, 15); // 35 - 20 +} + +#[sqlx::test] +async fn test_batch_upsert_no_change_no_audit(pool: PgPool) { + // Insert then re-insert identical data — should produce zero audits/metrics + let course = vec![helpers::make_course( + "60001", + "202510", + "CS", + "1083", + "Intro to CS", + 25, + 30, + 0, + 5, + )]; + batch_upsert_courses(&course, &pool).await.unwrap(); + batch_upsert_courses(&course, &pool).await.unwrap(); + + let (audit_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM course_audits") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + audit_count, 0, + "identical re-upsert should not create audit entries" + ); + + let (metric_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM course_metrics") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + metric_count, 0, + "identical re-upsert should not create metric entries" + ); +} diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index a015dc8..6f12fb9 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -72,6 +72,29 @@ export interface AuditLogResponse { entries: AuditLogEntry[]; } +export interface MetricEntry { + id: number; + courseId: number; + timestamp: string; + enrollment: number; + waitCount: number; + seatsAvailable: number; +} + +export interface MetricsResponse { + metrics: MetricEntry[]; + count: number; + timestamp: string; +} + +export interface MetricsParams { + course_id?: number; + term?: string; + crn?: string; + range?: "1h" | "6h" | "24h" | "7d" | "30d"; + limit?: number; +} + export interface SearchParams { term: string; subjects?: string[]; @@ -161,6 +184,17 @@ export class BannerApiClient { async getAdminAuditLog(): Promise { return this.request("/admin/audit-log"); } + + async getMetrics(params?: MetricsParams): Promise { + const query = new URLSearchParams(); + if (params?.course_id !== undefined) query.set("course_id", String(params.course_id)); + if (params?.term) query.set("term", params.term); + if (params?.crn) query.set("crn", params.crn); + if (params?.range) query.set("range", params.range); + if (params?.limit !== undefined) query.set("limit", String(params.limit)); + const qs = query.toString(); + return this.request(`/metrics${qs ? `?${qs}` : ""}`); + } } export const client = new BannerApiClient();