mirror of
https://github.com/Xevion/banner.git
synced 2026-01-31 04:23:34 -06:00
feat: implement course change auditing with time-series metrics endpoint
This commit is contained in:
+438
-78
@@ -3,6 +3,7 @@
|
||||
use crate::banner::Course;
|
||||
use crate::data::models::DbMeetingTime;
|
||||
use crate::error::Result;
|
||||
use sqlx::PgConnection;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashSet;
|
||||
use std::time::Instant;
|
||||
@@ -57,15 +58,315 @@ fn extract_campus_code(course: &Course) -> Option<String> {
|
||||
.and_then(|mf| mf.meeting_time.campus.clone())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Task 1: UpsertDiffRow — captures pre- and post-upsert state for diffing
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Row returned by the CTE-based upsert query, carrying both old and new values
|
||||
/// for every auditable field. `old_id` is `None` for fresh inserts.
|
||||
#[derive(sqlx::FromRow, Debug)]
|
||||
struct UpsertDiffRow {
|
||||
id: i32,
|
||||
old_id: Option<i32>,
|
||||
|
||||
// enrollment fields
|
||||
old_enrollment: Option<i32>,
|
||||
new_enrollment: i32,
|
||||
old_max_enrollment: Option<i32>,
|
||||
new_max_enrollment: i32,
|
||||
old_wait_count: Option<i32>,
|
||||
new_wait_count: i32,
|
||||
old_wait_capacity: Option<i32>,
|
||||
new_wait_capacity: i32,
|
||||
|
||||
// text fields (non-nullable in DB)
|
||||
old_subject: Option<String>,
|
||||
new_subject: String,
|
||||
old_course_number: Option<String>,
|
||||
new_course_number: String,
|
||||
old_title: Option<String>,
|
||||
new_title: String,
|
||||
|
||||
// nullable text fields
|
||||
old_sequence_number: Option<String>,
|
||||
new_sequence_number: Option<String>,
|
||||
old_part_of_term: Option<String>,
|
||||
new_part_of_term: Option<String>,
|
||||
old_instructional_method: Option<String>,
|
||||
new_instructional_method: Option<String>,
|
||||
old_campus: Option<String>,
|
||||
new_campus: Option<String>,
|
||||
|
||||
// nullable int fields
|
||||
old_credit_hours: Option<i32>,
|
||||
new_credit_hours: Option<i32>,
|
||||
old_credit_hour_low: Option<i32>,
|
||||
new_credit_hour_low: Option<i32>,
|
||||
old_credit_hour_high: Option<i32>,
|
||||
new_credit_hour_high: Option<i32>,
|
||||
|
||||
// cross-list fields
|
||||
old_cross_list: Option<String>,
|
||||
new_cross_list: Option<String>,
|
||||
old_cross_list_capacity: Option<i32>,
|
||||
new_cross_list_capacity: Option<i32>,
|
||||
old_cross_list_count: Option<i32>,
|
||||
new_cross_list_count: Option<i32>,
|
||||
|
||||
// link fields
|
||||
old_link_identifier: Option<String>,
|
||||
new_link_identifier: Option<String>,
|
||||
old_is_section_linked: Option<bool>,
|
||||
new_is_section_linked: Option<bool>,
|
||||
|
||||
// JSONB fields
|
||||
old_meeting_times: Option<serde_json::Value>,
|
||||
new_meeting_times: serde_json::Value,
|
||||
old_attributes: Option<serde_json::Value>,
|
||||
new_attributes: serde_json::Value,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Task 3: Entry types and diff logic
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
struct AuditEntry {
|
||||
course_id: i32,
|
||||
field_changed: &'static str,
|
||||
old_value: String,
|
||||
new_value: String,
|
||||
}
|
||||
|
||||
struct MetricEntry {
|
||||
course_id: i32,
|
||||
enrollment: i32,
|
||||
wait_count: i32,
|
||||
seats_available: i32,
|
||||
}
|
||||
|
||||
/// Compare old vs new for a single field, pushing an `AuditEntry` when they differ.
|
||||
///
|
||||
/// Three variants:
|
||||
/// - `diff_field!(audits, row, field_name, old_field, new_field)` — `Option<T>` old vs `T` new
|
||||
/// - `diff_field!(opt audits, row, field_name, old_field, new_field)` — `Option<T>` old vs `Option<T>` new
|
||||
/// - `diff_field!(json audits, row, field_name, old_field, new_field)` — `Option<Value>` old vs `Value` new
|
||||
///
|
||||
/// All variants skip when `old_id` is None (fresh insert).
|
||||
macro_rules! diff_field {
|
||||
// Standard: Option<T> old vs T new (non-nullable columns)
|
||||
($audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => {
|
||||
if $row.old_id.is_some() {
|
||||
let old_str = $row
|
||||
.$old
|
||||
.as_ref()
|
||||
.map(|v| v.to_string())
|
||||
.unwrap_or_default();
|
||||
let new_str = $row.$new.to_string();
|
||||
if old_str != new_str {
|
||||
$audits.push(AuditEntry {
|
||||
course_id: $row.id,
|
||||
field_changed: $field,
|
||||
old_value: old_str,
|
||||
new_value: new_str,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
// Nullable: Option<T> old vs Option<T> new
|
||||
(opt $audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => {
|
||||
if $row.old_id.is_some() {
|
||||
let old_str = $row
|
||||
.$old
|
||||
.as_ref()
|
||||
.map(|v| v.to_string())
|
||||
.unwrap_or_default();
|
||||
let new_str = $row
|
||||
.$new
|
||||
.as_ref()
|
||||
.map(|v| v.to_string())
|
||||
.unwrap_or_default();
|
||||
if old_str != new_str {
|
||||
$audits.push(AuditEntry {
|
||||
course_id: $row.id,
|
||||
field_changed: $field,
|
||||
old_value: old_str,
|
||||
new_value: new_str,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
// JSONB: Option<Value> old vs Value new
|
||||
(json $audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => {
|
||||
if $row.old_id.is_some() {
|
||||
let old_val = $row
|
||||
.$old
|
||||
.as_ref()
|
||||
.cloned()
|
||||
.unwrap_or(serde_json::Value::Null);
|
||||
let new_val = &$row.$new;
|
||||
if old_val != *new_val {
|
||||
$audits.push(AuditEntry {
|
||||
course_id: $row.id,
|
||||
field_changed: $field,
|
||||
old_value: old_val.to_string(),
|
||||
new_value: new_val.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Compute audit entries (field-level diffs) and metric entries from upsert diff rows.
|
||||
fn compute_diffs(rows: &[UpsertDiffRow]) -> (Vec<AuditEntry>, Vec<MetricEntry>) {
|
||||
let mut audits = Vec::new();
|
||||
let mut metrics = Vec::new();
|
||||
|
||||
for row in rows {
|
||||
// Non-nullable fields
|
||||
diff_field!(audits, row, "enrollment", old_enrollment, new_enrollment);
|
||||
diff_field!(
|
||||
audits,
|
||||
row,
|
||||
"max_enrollment",
|
||||
old_max_enrollment,
|
||||
new_max_enrollment
|
||||
);
|
||||
diff_field!(audits, row, "wait_count", old_wait_count, new_wait_count);
|
||||
diff_field!(
|
||||
audits,
|
||||
row,
|
||||
"wait_capacity",
|
||||
old_wait_capacity,
|
||||
new_wait_capacity
|
||||
);
|
||||
diff_field!(audits, row, "subject", old_subject, new_subject);
|
||||
diff_field!(
|
||||
audits,
|
||||
row,
|
||||
"course_number",
|
||||
old_course_number,
|
||||
new_course_number
|
||||
);
|
||||
diff_field!(audits, row, "title", old_title, new_title);
|
||||
|
||||
// Nullable text fields
|
||||
diff_field!(opt audits, row, "sequence_number", old_sequence_number, new_sequence_number);
|
||||
diff_field!(opt audits, row, "part_of_term", old_part_of_term, new_part_of_term);
|
||||
diff_field!(opt audits, row, "instructional_method", old_instructional_method, new_instructional_method);
|
||||
diff_field!(opt audits, row, "campus", old_campus, new_campus);
|
||||
|
||||
// Nullable int fields
|
||||
diff_field!(opt audits, row, "credit_hours", old_credit_hours, new_credit_hours);
|
||||
diff_field!(opt audits, row, "credit_hour_low", old_credit_hour_low, new_credit_hour_low);
|
||||
diff_field!(opt audits, row, "credit_hour_high", old_credit_hour_high, new_credit_hour_high);
|
||||
|
||||
// Cross-list fields
|
||||
diff_field!(opt audits, row, "cross_list", old_cross_list, new_cross_list);
|
||||
diff_field!(opt audits, row, "cross_list_capacity", old_cross_list_capacity, new_cross_list_capacity);
|
||||
diff_field!(opt audits, row, "cross_list_count", old_cross_list_count, new_cross_list_count);
|
||||
|
||||
// Link fields
|
||||
diff_field!(opt audits, row, "link_identifier", old_link_identifier, new_link_identifier);
|
||||
diff_field!(opt audits, row, "is_section_linked", old_is_section_linked, new_is_section_linked);
|
||||
|
||||
// JSONB fields
|
||||
diff_field!(json audits, row, "meeting_times", old_meeting_times, new_meeting_times);
|
||||
diff_field!(json audits, row, "attributes", old_attributes, new_attributes);
|
||||
|
||||
// Emit a metric entry when enrollment/wait_count/max_enrollment changed
|
||||
// Skip fresh inserts (no old data to compare against)
|
||||
let enrollment_changed = row.old_id.is_some()
|
||||
&& (row.old_enrollment != Some(row.new_enrollment)
|
||||
|| row.old_wait_count != Some(row.new_wait_count)
|
||||
|| row.old_max_enrollment != Some(row.new_max_enrollment));
|
||||
|
||||
if enrollment_changed {
|
||||
metrics.push(MetricEntry {
|
||||
course_id: row.id,
|
||||
enrollment: row.new_enrollment,
|
||||
wait_count: row.new_wait_count,
|
||||
seats_available: row.new_max_enrollment - row.new_enrollment,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
(audits, metrics)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Task 4: Batch insert functions for audits and metrics
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async fn insert_audits(audits: &[AuditEntry], conn: &mut PgConnection) -> Result<()> {
|
||||
if audits.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let course_ids: Vec<i32> = audits.iter().map(|a| a.course_id).collect();
|
||||
let fields: Vec<&str> = audits.iter().map(|a| a.field_changed).collect();
|
||||
let old_values: Vec<&str> = audits.iter().map(|a| a.old_value.as_str()).collect();
|
||||
let new_values: Vec<&str> = audits.iter().map(|a| a.new_value.as_str()).collect();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO course_audits (course_id, timestamp, field_changed, old_value, new_value)
|
||||
SELECT v.course_id, NOW(), v.field_changed, v.old_value, v.new_value
|
||||
FROM UNNEST($1::int4[], $2::text[], $3::text[], $4::text[])
|
||||
AS v(course_id, field_changed, old_value, new_value)
|
||||
"#,
|
||||
)
|
||||
.bind(&course_ids)
|
||||
.bind(&fields)
|
||||
.bind(&old_values)
|
||||
.bind(&new_values)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to batch insert course_audits: {}", e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_metrics(metrics: &[MetricEntry], conn: &mut PgConnection) -> Result<()> {
|
||||
if metrics.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let course_ids: Vec<i32> = metrics.iter().map(|m| m.course_id).collect();
|
||||
let enrollments: Vec<i32> = metrics.iter().map(|m| m.enrollment).collect();
|
||||
let wait_counts: Vec<i32> = metrics.iter().map(|m| m.wait_count).collect();
|
||||
let seats_available: Vec<i32> = metrics.iter().map(|m| m.seats_available).collect();
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO course_metrics (course_id, timestamp, enrollment, wait_count, seats_available)
|
||||
SELECT v.course_id, NOW(), v.enrollment, v.wait_count, v.seats_available
|
||||
FROM UNNEST($1::int4[], $2::int4[], $3::int4[], $4::int4[])
|
||||
AS v(course_id, enrollment, wait_count, seats_available)
|
||||
"#,
|
||||
)
|
||||
.bind(&course_ids)
|
||||
.bind(&enrollments)
|
||||
.bind(&wait_counts)
|
||||
.bind(&seats_available)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to batch insert course_metrics: {}", e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Core upsert functions (updated to use &mut PgConnection)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Batch upsert courses in a single database query.
|
||||
///
|
||||
/// Performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses, including
|
||||
/// new fields (meeting times, attributes, instructor data). Returns the
|
||||
/// database IDs for all upserted courses (in input order) so instructors
|
||||
/// can be linked.
|
||||
/// new fields (meeting times, attributes, instructor data). Captures pre-update
|
||||
/// state for audit/metric tracking, all within a single transaction.
|
||||
///
|
||||
/// # Performance
|
||||
/// - Reduces N database round-trips to 3 (courses, instructors, junction)
|
||||
/// - Reduces N database round-trips to 5 (old-data CTE + upsert, audits, metrics, instructors, junction)
|
||||
/// - Typical usage: 50-200 courses per batch
|
||||
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> {
|
||||
if courses.is_empty() {
|
||||
@@ -76,27 +377,47 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul
|
||||
let start = Instant::now();
|
||||
let course_count = courses.len();
|
||||
|
||||
// Step 1: Upsert courses with all fields, returning IDs
|
||||
let course_ids = upsert_courses(courses, db_pool).await?;
|
||||
let mut tx = db_pool.begin().await?;
|
||||
|
||||
// Step 2: Upsert instructors (deduplicated across batch)
|
||||
upsert_instructors(courses, db_pool).await?;
|
||||
// Step 1: Upsert courses with CTE, returning diff rows
|
||||
let diff_rows = upsert_courses(courses, &mut tx).await?;
|
||||
|
||||
// Step 3: Link courses to instructors via junction table
|
||||
upsert_course_instructors(courses, &course_ids, db_pool).await?;
|
||||
// Step 2: Extract course IDs for instructor linking
|
||||
let course_ids: Vec<i32> = diff_rows.iter().map(|r| r.id).collect();
|
||||
|
||||
// Step 3: Compute audit/metric diffs
|
||||
let (audits, metrics) = compute_diffs(&diff_rows);
|
||||
|
||||
// Step 4: Insert audits and metrics
|
||||
insert_audits(&audits, &mut tx).await?;
|
||||
insert_metrics(&metrics, &mut tx).await?;
|
||||
|
||||
// Step 5: Upsert instructors (deduplicated across batch)
|
||||
upsert_instructors(courses, &mut tx).await?;
|
||||
|
||||
// Step 6: Link courses to instructors via junction table
|
||||
upsert_course_instructors(courses, &course_ids, &mut tx).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
let duration = start.elapsed();
|
||||
info!(
|
||||
courses_count = course_count,
|
||||
audit_entries = audits.len(),
|
||||
metric_entries = metrics.len(),
|
||||
duration_ms = duration.as_millis(),
|
||||
"Batch upserted courses with instructors"
|
||||
"Batch upserted courses with instructors, audits, and metrics"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Upsert all courses and return their database IDs in input order.
|
||||
async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<Vec<i32>> {
|
||||
// ---------------------------------------------------------------------------
|
||||
// Task 2: CTE-based upsert returning old+new values
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Upsert all courses and return diff rows with old and new values for auditing.
|
||||
async fn upsert_courses(courses: &[Course], conn: &mut PgConnection) -> Result<Vec<UpsertDiffRow>> {
|
||||
let crns: Vec<&str> = courses
|
||||
.iter()
|
||||
.map(|c| c.course_reference_number.as_str())
|
||||
@@ -143,67 +464,106 @@ async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<Vec<i32>
|
||||
courses.iter().map(to_db_meeting_times).collect();
|
||||
let attributes_json: Vec<serde_json::Value> = courses.iter().map(to_db_attributes).collect();
|
||||
|
||||
let rows = sqlx::query_scalar::<_, i32>(
|
||||
let rows = sqlx::query_as::<_, UpsertDiffRow>(
|
||||
r#"
|
||||
INSERT INTO courses (
|
||||
crn, subject, course_number, title, term_code,
|
||||
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at,
|
||||
sequence_number, part_of_term, instructional_method, campus,
|
||||
credit_hours, credit_hour_low, credit_hour_high,
|
||||
cross_list, cross_list_capacity, cross_list_count,
|
||||
link_identifier, is_section_linked,
|
||||
meeting_times, attributes
|
||||
WITH old_data AS (
|
||||
SELECT id, enrollment, max_enrollment, wait_count, wait_capacity,
|
||||
subject, course_number, title,
|
||||
sequence_number, part_of_term, instructional_method, campus,
|
||||
credit_hours, credit_hour_low, credit_hour_high,
|
||||
cross_list, cross_list_capacity, cross_list_count,
|
||||
link_identifier, is_section_linked,
|
||||
meeting_times, attributes,
|
||||
crn, term_code
|
||||
FROM courses
|
||||
WHERE (crn, term_code) IN (SELECT * FROM UNNEST($1::text[], $5::text[]))
|
||||
),
|
||||
upserted AS (
|
||||
INSERT INTO courses (
|
||||
crn, subject, course_number, title, term_code,
|
||||
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at,
|
||||
sequence_number, part_of_term, instructional_method, campus,
|
||||
credit_hours, credit_hour_low, credit_hour_high,
|
||||
cross_list, cross_list_capacity, cross_list_count,
|
||||
link_identifier, is_section_linked,
|
||||
meeting_times, attributes
|
||||
)
|
||||
SELECT
|
||||
v.crn, v.subject, v.course_number, v.title, v.term_code,
|
||||
v.enrollment, v.max_enrollment, v.wait_count, v.wait_capacity, NOW(),
|
||||
v.sequence_number, v.part_of_term, v.instructional_method, v.campus,
|
||||
v.credit_hours, v.credit_hour_low, v.credit_hour_high,
|
||||
v.cross_list, v.cross_list_capacity, v.cross_list_count,
|
||||
v.link_identifier, v.is_section_linked,
|
||||
v.meeting_times, v.attributes
|
||||
FROM UNNEST(
|
||||
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
|
||||
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
|
||||
$10::text[], $11::text[], $12::text[], $13::text[],
|
||||
$14::int4[], $15::int4[], $16::int4[],
|
||||
$17::text[], $18::int4[], $19::int4[],
|
||||
$20::text[], $21::bool[],
|
||||
$22::jsonb[], $23::jsonb[]
|
||||
) AS v(
|
||||
crn, subject, course_number, title, term_code,
|
||||
enrollment, max_enrollment, wait_count, wait_capacity,
|
||||
sequence_number, part_of_term, instructional_method, campus,
|
||||
credit_hours, credit_hour_low, credit_hour_high,
|
||||
cross_list, cross_list_capacity, cross_list_count,
|
||||
link_identifier, is_section_linked,
|
||||
meeting_times, attributes
|
||||
)
|
||||
ON CONFLICT (crn, term_code)
|
||||
DO UPDATE SET
|
||||
subject = EXCLUDED.subject,
|
||||
course_number = EXCLUDED.course_number,
|
||||
title = EXCLUDED.title,
|
||||
enrollment = EXCLUDED.enrollment,
|
||||
max_enrollment = EXCLUDED.max_enrollment,
|
||||
wait_count = EXCLUDED.wait_count,
|
||||
wait_capacity = EXCLUDED.wait_capacity,
|
||||
last_scraped_at = EXCLUDED.last_scraped_at,
|
||||
sequence_number = EXCLUDED.sequence_number,
|
||||
part_of_term = EXCLUDED.part_of_term,
|
||||
instructional_method = EXCLUDED.instructional_method,
|
||||
campus = EXCLUDED.campus,
|
||||
credit_hours = EXCLUDED.credit_hours,
|
||||
credit_hour_low = EXCLUDED.credit_hour_low,
|
||||
credit_hour_high = EXCLUDED.credit_hour_high,
|
||||
cross_list = EXCLUDED.cross_list,
|
||||
cross_list_capacity = EXCLUDED.cross_list_capacity,
|
||||
cross_list_count = EXCLUDED.cross_list_count,
|
||||
link_identifier = EXCLUDED.link_identifier,
|
||||
is_section_linked = EXCLUDED.is_section_linked,
|
||||
meeting_times = EXCLUDED.meeting_times,
|
||||
attributes = EXCLUDED.attributes
|
||||
RETURNING *
|
||||
)
|
||||
SELECT
|
||||
v.crn, v.subject, v.course_number, v.title, v.term_code,
|
||||
v.enrollment, v.max_enrollment, v.wait_count, v.wait_capacity, NOW(),
|
||||
v.sequence_number, v.part_of_term, v.instructional_method, v.campus,
|
||||
v.credit_hours, v.credit_hour_low, v.credit_hour_high,
|
||||
v.cross_list, v.cross_list_capacity, v.cross_list_count,
|
||||
v.link_identifier, v.is_section_linked,
|
||||
v.meeting_times, v.attributes
|
||||
FROM UNNEST(
|
||||
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
|
||||
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
|
||||
$10::text[], $11::text[], $12::text[], $13::text[],
|
||||
$14::int4[], $15::int4[], $16::int4[],
|
||||
$17::text[], $18::int4[], $19::int4[],
|
||||
$20::text[], $21::bool[],
|
||||
$22::jsonb[], $23::jsonb[]
|
||||
) AS v(
|
||||
crn, subject, course_number, title, term_code,
|
||||
enrollment, max_enrollment, wait_count, wait_capacity,
|
||||
sequence_number, part_of_term, instructional_method, campus,
|
||||
credit_hours, credit_hour_low, credit_hour_high,
|
||||
cross_list, cross_list_capacity, cross_list_count,
|
||||
link_identifier, is_section_linked,
|
||||
meeting_times, attributes
|
||||
)
|
||||
ON CONFLICT (crn, term_code)
|
||||
DO UPDATE SET
|
||||
subject = EXCLUDED.subject,
|
||||
course_number = EXCLUDED.course_number,
|
||||
title = EXCLUDED.title,
|
||||
enrollment = EXCLUDED.enrollment,
|
||||
max_enrollment = EXCLUDED.max_enrollment,
|
||||
wait_count = EXCLUDED.wait_count,
|
||||
wait_capacity = EXCLUDED.wait_capacity,
|
||||
last_scraped_at = EXCLUDED.last_scraped_at,
|
||||
sequence_number = EXCLUDED.sequence_number,
|
||||
part_of_term = EXCLUDED.part_of_term,
|
||||
instructional_method = EXCLUDED.instructional_method,
|
||||
campus = EXCLUDED.campus,
|
||||
credit_hours = EXCLUDED.credit_hours,
|
||||
credit_hour_low = EXCLUDED.credit_hour_low,
|
||||
credit_hour_high = EXCLUDED.credit_hour_high,
|
||||
cross_list = EXCLUDED.cross_list,
|
||||
cross_list_capacity = EXCLUDED.cross_list_capacity,
|
||||
cross_list_count = EXCLUDED.cross_list_count,
|
||||
link_identifier = EXCLUDED.link_identifier,
|
||||
is_section_linked = EXCLUDED.is_section_linked,
|
||||
meeting_times = EXCLUDED.meeting_times,
|
||||
attributes = EXCLUDED.attributes
|
||||
RETURNING id
|
||||
SELECT u.id,
|
||||
o.id AS old_id,
|
||||
o.enrollment AS old_enrollment, u.enrollment AS new_enrollment,
|
||||
o.max_enrollment AS old_max_enrollment, u.max_enrollment AS new_max_enrollment,
|
||||
o.wait_count AS old_wait_count, u.wait_count AS new_wait_count,
|
||||
o.wait_capacity AS old_wait_capacity, u.wait_capacity AS new_wait_capacity,
|
||||
o.subject AS old_subject, u.subject AS new_subject,
|
||||
o.course_number AS old_course_number, u.course_number AS new_course_number,
|
||||
o.title AS old_title, u.title AS new_title,
|
||||
o.sequence_number AS old_sequence_number, u.sequence_number AS new_sequence_number,
|
||||
o.part_of_term AS old_part_of_term, u.part_of_term AS new_part_of_term,
|
||||
o.instructional_method AS old_instructional_method, u.instructional_method AS new_instructional_method,
|
||||
o.campus AS old_campus, u.campus AS new_campus,
|
||||
o.credit_hours AS old_credit_hours, u.credit_hours AS new_credit_hours,
|
||||
o.credit_hour_low AS old_credit_hour_low, u.credit_hour_low AS new_credit_hour_low,
|
||||
o.credit_hour_high AS old_credit_hour_high, u.credit_hour_high AS new_credit_hour_high,
|
||||
o.cross_list AS old_cross_list, u.cross_list AS new_cross_list,
|
||||
o.cross_list_capacity AS old_cross_list_capacity, u.cross_list_capacity AS new_cross_list_capacity,
|
||||
o.cross_list_count AS old_cross_list_count, u.cross_list_count AS new_cross_list_count,
|
||||
o.link_identifier AS old_link_identifier, u.link_identifier AS new_link_identifier,
|
||||
o.is_section_linked AS old_is_section_linked, u.is_section_linked AS new_is_section_linked,
|
||||
o.meeting_times AS old_meeting_times, u.meeting_times AS new_meeting_times,
|
||||
o.attributes AS old_attributes, u.attributes AS new_attributes
|
||||
FROM upserted u
|
||||
LEFT JOIN old_data o ON u.crn = o.crn AND u.term_code = o.term_code
|
||||
"#,
|
||||
)
|
||||
.bind(&crns)
|
||||
@@ -229,7 +589,7 @@ async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<Vec<i32>
|
||||
.bind(&is_section_linkeds)
|
||||
.bind(&meeting_times_json)
|
||||
.bind(&attributes_json)
|
||||
.fetch_all(db_pool)
|
||||
.fetch_all(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?;
|
||||
|
||||
@@ -237,7 +597,7 @@ async fn upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<Vec<i32>
|
||||
}
|
||||
|
||||
/// Deduplicate and upsert all instructors from the batch.
|
||||
async fn upsert_instructors(courses: &[Course], db_pool: &PgPool) -> Result<()> {
|
||||
async fn upsert_instructors(courses: &[Course], conn: &mut PgConnection) -> Result<()> {
|
||||
let mut seen = HashSet::new();
|
||||
let mut banner_ids = Vec::new();
|
||||
let mut display_names = Vec::new();
|
||||
@@ -270,7 +630,7 @@ async fn upsert_instructors(courses: &[Course], db_pool: &PgPool) -> Result<()>
|
||||
.bind(&banner_ids)
|
||||
.bind(&display_names)
|
||||
.bind(&emails)
|
||||
.execute(db_pool)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to batch upsert instructors: {}", e))?;
|
||||
|
||||
@@ -281,7 +641,7 @@ async fn upsert_instructors(courses: &[Course], db_pool: &PgPool) -> Result<()>
|
||||
async fn upsert_course_instructors(
|
||||
courses: &[Course],
|
||||
course_ids: &[i32],
|
||||
db_pool: &PgPool,
|
||||
conn: &mut PgConnection,
|
||||
) -> Result<()> {
|
||||
let mut cids = Vec::new();
|
||||
let mut iids = Vec::new();
|
||||
@@ -303,7 +663,7 @@ async fn upsert_course_instructors(
|
||||
// This handles instructor changes cleanly.
|
||||
sqlx::query("DELETE FROM course_instructors WHERE course_id = ANY($1)")
|
||||
.bind(&cids)
|
||||
.execute(db_pool)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
@@ -317,7 +677,7 @@ async fn upsert_course_instructors(
|
||||
.bind(&cids)
|
||||
.bind(&iids)
|
||||
.bind(&primaries)
|
||||
.execute(db_pool)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to batch upsert course_instructors: {}", e))?;
|
||||
|
||||
|
||||
+118
-8
@@ -244,20 +244,130 @@ async fn status(State(state): State<AppState>) -> Json<StatusResponse> {
|
||||
}
|
||||
|
||||
/// Metrics endpoint for monitoring
|
||||
async fn metrics() -> Json<Value> {
|
||||
// For now, return basic metrics structure
|
||||
Json(json!({
|
||||
"banner_api": {
|
||||
"status": "connected"
|
||||
},
|
||||
"timestamp": chrono::Utc::now().to_rfc3339()
|
||||
}))
|
||||
async fn metrics(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<MetricsParams>,
|
||||
) -> Result<Json<Value>, (AxumStatusCode, String)> {
|
||||
let limit = params.limit.clamp(1, 5000);
|
||||
|
||||
// Parse range shorthand, defaulting to 24h
|
||||
let range_str = params.range.as_deref().unwrap_or("24h");
|
||||
let duration = match range_str {
|
||||
"1h" => chrono::Duration::hours(1),
|
||||
"6h" => chrono::Duration::hours(6),
|
||||
"24h" => chrono::Duration::hours(24),
|
||||
"7d" => chrono::Duration::days(7),
|
||||
"30d" => chrono::Duration::days(30),
|
||||
_ => {
|
||||
return Err((
|
||||
AxumStatusCode::BAD_REQUEST,
|
||||
format!("Invalid range '{range_str}'. Valid: 1h, 6h, 24h, 7d, 30d"),
|
||||
));
|
||||
}
|
||||
};
|
||||
let since = chrono::Utc::now() - duration;
|
||||
|
||||
// Resolve course_id: explicit param takes priority, then term+crn lookup
|
||||
let course_id = if let Some(id) = params.course_id {
|
||||
Some(id)
|
||||
} else if let (Some(term), Some(crn)) = (params.term.as_deref(), params.crn.as_deref()) {
|
||||
let row: Option<(i32,)> =
|
||||
sqlx::query_as("SELECT id FROM courses WHERE term_code = $1 AND crn = $2")
|
||||
.bind(term)
|
||||
.bind(crn)
|
||||
.fetch_optional(&state.db_pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!(error = %e, "Course lookup for metrics failed");
|
||||
(
|
||||
AxumStatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Course lookup failed".to_string(),
|
||||
)
|
||||
})?;
|
||||
row.map(|(id,)| id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Build query dynamically based on filters
|
||||
let metrics: Vec<(i32, i32, chrono::DateTime<chrono::Utc>, i32, i32, i32)> =
|
||||
if let Some(cid) = course_id {
|
||||
sqlx::query_as(
|
||||
"SELECT id, course_id, timestamp, enrollment, wait_count, seats_available \
|
||||
FROM course_metrics \
|
||||
WHERE course_id = $1 AND timestamp >= $2 \
|
||||
ORDER BY timestamp DESC \
|
||||
LIMIT $3",
|
||||
)
|
||||
.bind(cid)
|
||||
.bind(since)
|
||||
.bind(limit)
|
||||
.fetch_all(&state.db_pool)
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_as(
|
||||
"SELECT id, course_id, timestamp, enrollment, wait_count, seats_available \
|
||||
FROM course_metrics \
|
||||
WHERE timestamp >= $1 \
|
||||
ORDER BY timestamp DESC \
|
||||
LIMIT $2",
|
||||
)
|
||||
.bind(since)
|
||||
.bind(limit)
|
||||
.fetch_all(&state.db_pool)
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
tracing::error!(error = %e, "Metrics query failed");
|
||||
(
|
||||
AxumStatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Metrics query failed".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let count = metrics.len();
|
||||
let metrics_json: Vec<Value> = metrics
|
||||
.into_iter()
|
||||
.map(
|
||||
|(id, course_id, timestamp, enrollment, wait_count, seats_available)| {
|
||||
json!({
|
||||
"id": id,
|
||||
"courseId": course_id,
|
||||
"timestamp": timestamp.to_rfc3339(),
|
||||
"enrollment": enrollment,
|
||||
"waitCount": wait_count,
|
||||
"seatsAvailable": seats_available,
|
||||
})
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
|
||||
Ok(Json(json!({
|
||||
"metrics": metrics_json,
|
||||
"count": count,
|
||||
"timestamp": chrono::Utc::now().to_rfc3339(),
|
||||
})))
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Course search & detail API
|
||||
// ============================================================
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct MetricsParams {
|
||||
course_id: Option<i32>,
|
||||
term: Option<String>,
|
||||
crn: Option<String>,
|
||||
/// Shorthand durations: "1h", "6h", "24h", "7d", "30d"
|
||||
range: Option<String>,
|
||||
#[serde(default = "default_metrics_limit")]
|
||||
limit: i32,
|
||||
}
|
||||
|
||||
fn default_metrics_limit() -> i32 {
|
||||
500
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct SubjectsParams {
|
||||
term: String,
|
||||
|
||||
Reference in New Issue
Block a user