//! Batch database operations for improved performance. use crate::banner::Course; use crate::data::models::DbMeetingTime; use crate::error::Result; use sqlx::PgConnection; use sqlx::PgPool; use std::collections::HashSet; use std::time::Instant; use tracing::info; /// Convert a Banner API course's meeting times to the DB JSONB shape. fn to_db_meeting_times(course: &Course) -> serde_json::Value { let meetings: Vec = course .meetings_faculty .iter() .map(|mf| { let mt = &mf.meeting_time; DbMeetingTime { begin_time: mt.begin_time.clone(), end_time: mt.end_time.clone(), start_date: mt.start_date.clone(), end_date: mt.end_date.clone(), monday: mt.monday, tuesday: mt.tuesday, wednesday: mt.wednesday, thursday: mt.thursday, friday: mt.friday, saturday: mt.saturday, sunday: mt.sunday, building: mt.building.clone(), building_description: mt.building_description.clone(), room: mt.room.clone(), campus: mt.campus.clone(), meeting_type: mt.meeting_type.clone(), meeting_schedule_type: mt.meeting_schedule_type.clone(), } }) .collect(); serde_json::to_value(meetings).unwrap_or_default() } /// Convert a Banner API course's section attributes to a JSONB array of code strings. fn to_db_attributes(course: &Course) -> serde_json::Value { let codes: Vec<&str> = course .section_attributes .iter() .map(|a| a.code.as_str()) .collect(); serde_json::to_value(codes).unwrap_or_default() } /// Extract the campus code from the first meeting time (Banner doesn't put it on the course directly). fn extract_campus_code(course: &Course) -> Option { course .meetings_faculty .first() .and_then(|mf| mf.meeting_time.campus.clone()) } // --------------------------------------------------------------------------- // Task 1: UpsertDiffRow — captures pre- and post-upsert state for diffing // --------------------------------------------------------------------------- /// Row returned by the CTE-based upsert query, carrying both old and new values /// for every auditable field. `old_id` is `None` for fresh inserts. #[derive(sqlx::FromRow, Debug)] struct UpsertDiffRow { id: i32, old_id: Option, // enrollment fields old_enrollment: Option, new_enrollment: i32, old_max_enrollment: Option, new_max_enrollment: i32, old_wait_count: Option, new_wait_count: i32, old_wait_capacity: Option, new_wait_capacity: i32, // text fields (non-nullable in DB) old_subject: Option, new_subject: String, old_course_number: Option, new_course_number: String, old_title: Option, new_title: String, // nullable text fields old_sequence_number: Option, new_sequence_number: Option, old_part_of_term: Option, new_part_of_term: Option, old_instructional_method: Option, new_instructional_method: Option, old_campus: Option, new_campus: Option, // nullable int fields old_credit_hours: Option, new_credit_hours: Option, old_credit_hour_low: Option, new_credit_hour_low: Option, old_credit_hour_high: Option, new_credit_hour_high: Option, // cross-list fields old_cross_list: Option, new_cross_list: Option, old_cross_list_capacity: Option, new_cross_list_capacity: Option, old_cross_list_count: Option, new_cross_list_count: Option, // link fields old_link_identifier: Option, new_link_identifier: Option, old_is_section_linked: Option, new_is_section_linked: Option, // JSONB fields old_meeting_times: Option, new_meeting_times: serde_json::Value, old_attributes: Option, new_attributes: serde_json::Value, } // --------------------------------------------------------------------------- // Task 3: Entry types and diff logic // --------------------------------------------------------------------------- struct AuditEntry { course_id: i32, field_changed: &'static str, old_value: String, new_value: String, } struct MetricEntry { course_id: i32, enrollment: i32, wait_count: i32, seats_available: i32, } /// Compare old vs new for a single field, pushing an `AuditEntry` when they differ. /// /// Three variants: /// - `diff_field!(audits, row, field_name, old_field, new_field)` — `Option` old vs `T` new /// - `diff_field!(opt audits, row, field_name, old_field, new_field)` — `Option` old vs `Option` new /// - `diff_field!(json audits, row, field_name, old_field, new_field)` — `Option` old vs `Value` new /// /// All variants skip when `old_id` is None (fresh insert). macro_rules! diff_field { // Standard: Option old vs T new (non-nullable columns) ($audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => { if $row.old_id.is_some() { let old_str = $row .$old .as_ref() .map(|v| v.to_string()) .unwrap_or_default(); let new_str = $row.$new.to_string(); if old_str != new_str { $audits.push(AuditEntry { course_id: $row.id, field_changed: $field, old_value: old_str, new_value: new_str, }); } } }; // Nullable: Option old vs Option new (opt $audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => { if $row.old_id.is_some() { let old_str = $row .$old .as_ref() .map(|v| v.to_string()) .unwrap_or_default(); let new_str = $row .$new .as_ref() .map(|v| v.to_string()) .unwrap_or_default(); if old_str != new_str { $audits.push(AuditEntry { course_id: $row.id, field_changed: $field, old_value: old_str, new_value: new_str, }); } } }; // JSONB: Option old vs Value new (json $audits:ident, $row:ident, $field:expr, $old:ident, $new:ident) => { if $row.old_id.is_some() { let old_val = $row .$old .as_ref() .cloned() .unwrap_or(serde_json::Value::Null); let new_val = &$row.$new; if old_val != *new_val { $audits.push(AuditEntry { course_id: $row.id, field_changed: $field, old_value: old_val.to_string(), new_value: new_val.to_string(), }); } } }; } /// Compute audit entries (field-level diffs) and metric entries from upsert diff rows. fn compute_diffs(rows: &[UpsertDiffRow]) -> (Vec, Vec) { let mut audits = Vec::new(); let mut metrics = Vec::new(); for row in rows { // Non-nullable fields diff_field!(audits, row, "enrollment", old_enrollment, new_enrollment); diff_field!( audits, row, "max_enrollment", old_max_enrollment, new_max_enrollment ); diff_field!(audits, row, "wait_count", old_wait_count, new_wait_count); diff_field!( audits, row, "wait_capacity", old_wait_capacity, new_wait_capacity ); diff_field!(audits, row, "subject", old_subject, new_subject); diff_field!( audits, row, "course_number", old_course_number, new_course_number ); diff_field!(audits, row, "title", old_title, new_title); // Nullable text fields diff_field!(opt audits, row, "sequence_number", old_sequence_number, new_sequence_number); diff_field!(opt audits, row, "part_of_term", old_part_of_term, new_part_of_term); diff_field!(opt audits, row, "instructional_method", old_instructional_method, new_instructional_method); diff_field!(opt audits, row, "campus", old_campus, new_campus); // Nullable int fields diff_field!(opt audits, row, "credit_hours", old_credit_hours, new_credit_hours); diff_field!(opt audits, row, "credit_hour_low", old_credit_hour_low, new_credit_hour_low); diff_field!(opt audits, row, "credit_hour_high", old_credit_hour_high, new_credit_hour_high); // Cross-list fields diff_field!(opt audits, row, "cross_list", old_cross_list, new_cross_list); diff_field!(opt audits, row, "cross_list_capacity", old_cross_list_capacity, new_cross_list_capacity); diff_field!(opt audits, row, "cross_list_count", old_cross_list_count, new_cross_list_count); // Link fields diff_field!(opt audits, row, "link_identifier", old_link_identifier, new_link_identifier); diff_field!(opt audits, row, "is_section_linked", old_is_section_linked, new_is_section_linked); // JSONB fields diff_field!(json audits, row, "meeting_times", old_meeting_times, new_meeting_times); diff_field!(json audits, row, "attributes", old_attributes, new_attributes); // Emit a metric entry when enrollment/wait_count/max_enrollment changed // Skip fresh inserts (no old data to compare against) let enrollment_changed = row.old_id.is_some() && (row.old_enrollment != Some(row.new_enrollment) || row.old_wait_count != Some(row.new_wait_count) || row.old_max_enrollment != Some(row.new_max_enrollment)); if enrollment_changed { metrics.push(MetricEntry { course_id: row.id, enrollment: row.new_enrollment, wait_count: row.new_wait_count, seats_available: row.new_max_enrollment - row.new_enrollment, }); } } (audits, metrics) } // --------------------------------------------------------------------------- // Task 4: Batch insert functions for audits and metrics // --------------------------------------------------------------------------- async fn insert_audits(audits: &[AuditEntry], conn: &mut PgConnection) -> Result<()> { if audits.is_empty() { return Ok(()); } let course_ids: Vec = audits.iter().map(|a| a.course_id).collect(); let fields: Vec<&str> = audits.iter().map(|a| a.field_changed).collect(); let old_values: Vec<&str> = audits.iter().map(|a| a.old_value.as_str()).collect(); let new_values: Vec<&str> = audits.iter().map(|a| a.new_value.as_str()).collect(); sqlx::query( r#" INSERT INTO course_audits (course_id, timestamp, field_changed, old_value, new_value) SELECT v.course_id, NOW(), v.field_changed, v.old_value, v.new_value FROM UNNEST($1::int4[], $2::text[], $3::text[], $4::text[]) AS v(course_id, field_changed, old_value, new_value) "#, ) .bind(&course_ids) .bind(&fields) .bind(&old_values) .bind(&new_values) .execute(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch insert course_audits: {}", e))?; Ok(()) } async fn insert_metrics(metrics: &[MetricEntry], conn: &mut PgConnection) -> Result<()> { if metrics.is_empty() { return Ok(()); } let course_ids: Vec = metrics.iter().map(|m| m.course_id).collect(); let enrollments: Vec = metrics.iter().map(|m| m.enrollment).collect(); let wait_counts: Vec = metrics.iter().map(|m| m.wait_count).collect(); let seats_available: Vec = metrics.iter().map(|m| m.seats_available).collect(); sqlx::query( r#" INSERT INTO course_metrics (course_id, timestamp, enrollment, wait_count, seats_available) SELECT v.course_id, NOW(), v.enrollment, v.wait_count, v.seats_available FROM UNNEST($1::int4[], $2::int4[], $3::int4[], $4::int4[]) AS v(course_id, enrollment, wait_count, seats_available) "#, ) .bind(&course_ids) .bind(&enrollments) .bind(&wait_counts) .bind(&seats_available) .execute(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch insert course_metrics: {}", e))?; Ok(()) } // --------------------------------------------------------------------------- // Core upsert functions (updated to use &mut PgConnection) // --------------------------------------------------------------------------- /// Batch upsert courses in a single database query. /// /// Performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses, including /// new fields (meeting times, attributes, instructor data). Captures pre-update /// state for audit/metric tracking, all within a single transaction. /// /// # Performance /// - Reduces N database round-trips to 5 (old-data CTE + upsert, audits, metrics, instructors, junction) /// - Typical usage: 50-200 courses per batch pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> { if courses.is_empty() { info!("No courses to upsert, skipping batch operation"); return Ok(()); } let start = Instant::now(); let course_count = courses.len(); let mut tx = db_pool.begin().await?; // Step 1: Upsert courses with CTE, returning diff rows let diff_rows = upsert_courses(courses, &mut tx).await?; // Step 2: Extract course IDs for instructor linking let course_ids: Vec = diff_rows.iter().map(|r| r.id).collect(); // Step 3: Compute audit/metric diffs let (audits, metrics) = compute_diffs(&diff_rows); // Step 4: Insert audits and metrics insert_audits(&audits, &mut tx).await?; insert_metrics(&metrics, &mut tx).await?; // Step 5: Upsert instructors (deduplicated across batch) upsert_instructors(courses, &mut tx).await?; // Step 6: Link courses to instructors via junction table upsert_course_instructors(courses, &course_ids, &mut tx).await?; tx.commit().await?; let duration = start.elapsed(); info!( courses_count = course_count, audit_entries = audits.len(), metric_entries = metrics.len(), duration_ms = duration.as_millis(), "Batch upserted courses with instructors, audits, and metrics" ); Ok(()) } // --------------------------------------------------------------------------- // Task 2: CTE-based upsert returning old+new values // --------------------------------------------------------------------------- /// Upsert all courses and return diff rows with old and new values for auditing. async fn upsert_courses(courses: &[Course], conn: &mut PgConnection) -> Result> { let crns: Vec<&str> = courses .iter() .map(|c| c.course_reference_number.as_str()) .collect(); let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect(); let course_numbers: Vec<&str> = courses.iter().map(|c| c.course_number.as_str()).collect(); let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect(); let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect(); let enrollments: Vec = courses.iter().map(|c| c.enrollment).collect(); let max_enrollments: Vec = courses.iter().map(|c| c.maximum_enrollment).collect(); let wait_counts: Vec = courses.iter().map(|c| c.wait_count).collect(); let wait_capacities: Vec = courses.iter().map(|c| c.wait_capacity).collect(); // New scalar fields let sequence_numbers: Vec> = courses .iter() .map(|c| Some(c.sequence_number.as_str())) .collect(); let parts_of_term: Vec> = courses .iter() .map(|c| Some(c.part_of_term.as_str())) .collect(); let instructional_methods: Vec> = courses .iter() .map(|c| Some(c.instructional_method.as_str())) .collect(); let campuses: Vec> = courses.iter().map(extract_campus_code).collect(); let credit_hours: Vec> = courses.iter().map(|c| c.credit_hours).collect(); let credit_hour_lows: Vec> = courses.iter().map(|c| c.credit_hour_low).collect(); let credit_hour_highs: Vec> = courses.iter().map(|c| c.credit_hour_high).collect(); let cross_lists: Vec> = courses.iter().map(|c| c.cross_list.as_deref()).collect(); let cross_list_capacities: Vec> = courses.iter().map(|c| c.cross_list_capacity).collect(); let cross_list_counts: Vec> = courses.iter().map(|c| c.cross_list_count).collect(); let link_identifiers: Vec> = courses .iter() .map(|c| c.link_identifier.as_deref()) .collect(); let is_section_linkeds: Vec> = courses.iter().map(|c| Some(c.is_section_linked)).collect(); // JSONB fields let meeting_times_json: Vec = courses.iter().map(to_db_meeting_times).collect(); let attributes_json: Vec = courses.iter().map(to_db_attributes).collect(); let rows = sqlx::query_as::<_, UpsertDiffRow>( r#" WITH old_data AS ( SELECT id, enrollment, max_enrollment, wait_count, wait_capacity, subject, course_number, title, sequence_number, part_of_term, instructional_method, campus, credit_hours, credit_hour_low, credit_hour_high, cross_list, cross_list_capacity, cross_list_count, link_identifier, is_section_linked, meeting_times, attributes, crn, term_code FROM courses WHERE (crn, term_code) IN (SELECT * FROM UNNEST($1::text[], $5::text[])) ), upserted AS ( INSERT INTO courses ( crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at, sequence_number, part_of_term, instructional_method, campus, credit_hours, credit_hour_low, credit_hour_high, cross_list, cross_list_capacity, cross_list_count, link_identifier, is_section_linked, meeting_times, attributes ) SELECT v.crn, v.subject, v.course_number, v.title, v.term_code, v.enrollment, v.max_enrollment, v.wait_count, v.wait_capacity, NOW(), v.sequence_number, v.part_of_term, v.instructional_method, v.campus, v.credit_hours, v.credit_hour_low, v.credit_hour_high, v.cross_list, v.cross_list_capacity, v.cross_list_count, v.link_identifier, v.is_section_linked, v.meeting_times, v.attributes FROM UNNEST( $1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::int4[], $7::int4[], $8::int4[], $9::int4[], $10::text[], $11::text[], $12::text[], $13::text[], $14::int4[], $15::int4[], $16::int4[], $17::text[], $18::int4[], $19::int4[], $20::text[], $21::bool[], $22::jsonb[], $23::jsonb[] ) AS v( crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, sequence_number, part_of_term, instructional_method, campus, credit_hours, credit_hour_low, credit_hour_high, cross_list, cross_list_capacity, cross_list_count, link_identifier, is_section_linked, meeting_times, attributes ) ON CONFLICT (crn, term_code) DO UPDATE SET subject = EXCLUDED.subject, course_number = EXCLUDED.course_number, title = EXCLUDED.title, enrollment = EXCLUDED.enrollment, max_enrollment = EXCLUDED.max_enrollment, wait_count = EXCLUDED.wait_count, wait_capacity = EXCLUDED.wait_capacity, last_scraped_at = EXCLUDED.last_scraped_at, sequence_number = EXCLUDED.sequence_number, part_of_term = EXCLUDED.part_of_term, instructional_method = EXCLUDED.instructional_method, campus = EXCLUDED.campus, credit_hours = EXCLUDED.credit_hours, credit_hour_low = EXCLUDED.credit_hour_low, credit_hour_high = EXCLUDED.credit_hour_high, cross_list = EXCLUDED.cross_list, cross_list_capacity = EXCLUDED.cross_list_capacity, cross_list_count = EXCLUDED.cross_list_count, link_identifier = EXCLUDED.link_identifier, is_section_linked = EXCLUDED.is_section_linked, meeting_times = EXCLUDED.meeting_times, attributes = EXCLUDED.attributes RETURNING * ) SELECT u.id, o.id AS old_id, o.enrollment AS old_enrollment, u.enrollment AS new_enrollment, o.max_enrollment AS old_max_enrollment, u.max_enrollment AS new_max_enrollment, o.wait_count AS old_wait_count, u.wait_count AS new_wait_count, o.wait_capacity AS old_wait_capacity, u.wait_capacity AS new_wait_capacity, o.subject AS old_subject, u.subject AS new_subject, o.course_number AS old_course_number, u.course_number AS new_course_number, o.title AS old_title, u.title AS new_title, o.sequence_number AS old_sequence_number, u.sequence_number AS new_sequence_number, o.part_of_term AS old_part_of_term, u.part_of_term AS new_part_of_term, o.instructional_method AS old_instructional_method, u.instructional_method AS new_instructional_method, o.campus AS old_campus, u.campus AS new_campus, o.credit_hours AS old_credit_hours, u.credit_hours AS new_credit_hours, o.credit_hour_low AS old_credit_hour_low, u.credit_hour_low AS new_credit_hour_low, o.credit_hour_high AS old_credit_hour_high, u.credit_hour_high AS new_credit_hour_high, o.cross_list AS old_cross_list, u.cross_list AS new_cross_list, o.cross_list_capacity AS old_cross_list_capacity, u.cross_list_capacity AS new_cross_list_capacity, o.cross_list_count AS old_cross_list_count, u.cross_list_count AS new_cross_list_count, o.link_identifier AS old_link_identifier, u.link_identifier AS new_link_identifier, o.is_section_linked AS old_is_section_linked, u.is_section_linked AS new_is_section_linked, o.meeting_times AS old_meeting_times, u.meeting_times AS new_meeting_times, o.attributes AS old_attributes, u.attributes AS new_attributes FROM upserted u LEFT JOIN old_data o ON u.crn = o.crn AND u.term_code = o.term_code "#, ) .bind(&crns) .bind(&subjects) .bind(&course_numbers) .bind(&titles) .bind(&term_codes) .bind(&enrollments) .bind(&max_enrollments) .bind(&wait_counts) .bind(&wait_capacities) .bind(&sequence_numbers) .bind(&parts_of_term) .bind(&instructional_methods) .bind(&campuses) .bind(&credit_hours) .bind(&credit_hour_lows) .bind(&credit_hour_highs) .bind(&cross_lists) .bind(&cross_list_capacities) .bind(&cross_list_counts) .bind(&link_identifiers) .bind(&is_section_linkeds) .bind(&meeting_times_json) .bind(&attributes_json) .fetch_all(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?; Ok(rows) } /// Deduplicate and upsert all instructors from the batch. async fn upsert_instructors(courses: &[Course], conn: &mut PgConnection) -> Result<()> { let mut seen = HashSet::new(); let mut banner_ids = Vec::new(); let mut display_names = Vec::new(); let mut emails: Vec> = Vec::new(); for course in courses { for faculty in &course.faculty { if seen.insert(faculty.banner_id.as_str()) { banner_ids.push(faculty.banner_id.as_str()); display_names.push(faculty.display_name.as_str()); emails.push(faculty.email_address.as_deref()); } } } if banner_ids.is_empty() { return Ok(()); } sqlx::query( r#" INSERT INTO instructors (banner_id, display_name, email) SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[]) ON CONFLICT (banner_id) DO UPDATE SET display_name = EXCLUDED.display_name, email = COALESCE(EXCLUDED.email, instructors.email) "#, ) .bind(&banner_ids) .bind(&display_names) .bind(&emails) .execute(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch upsert instructors: {}", e))?; Ok(()) } /// Link courses to their instructors via the junction table. async fn upsert_course_instructors( courses: &[Course], course_ids: &[i32], conn: &mut PgConnection, ) -> Result<()> { let mut cids = Vec::new(); let mut iids = Vec::new(); let mut primaries = Vec::new(); for (course, &course_id) in courses.iter().zip(course_ids) { for faculty in &course.faculty { cids.push(course_id); iids.push(faculty.banner_id.as_str()); primaries.push(faculty.primary_indicator); } } if cids.is_empty() { return Ok(()); } // Delete existing links for these courses then re-insert. // This handles instructor changes cleanly. sqlx::query("DELETE FROM course_instructors WHERE course_id = ANY($1)") .bind(&cids) .execute(&mut *conn) .await?; sqlx::query( r#" INSERT INTO course_instructors (course_id, instructor_id, is_primary) SELECT * FROM UNNEST($1::int4[], $2::text[], $3::bool[]) ON CONFLICT (course_id, instructor_id) DO UPDATE SET is_primary = EXCLUDED.is_primary "#, ) .bind(&cids) .bind(&iids) .bind(&primaries) .execute(&mut *conn) .await .map_err(|e| anyhow::anyhow!("Failed to batch upsert course_instructors: {}", e))?; Ok(()) }