diff --git a/migrations/20251103104300_add_performance_indexes.sql b/migrations/20251103104300_add_performance_indexes.sql new file mode 100644 index 0000000..240c245 --- /dev/null +++ b/migrations/20251103104300_add_performance_indexes.sql @@ -0,0 +1,45 @@ +-- Performance optimization indexes + +-- Index for term-based queries (most common access pattern) +CREATE INDEX IF NOT EXISTS idx_courses_term_code ON courses(term_code); + +-- Index for subject-based filtering +CREATE INDEX IF NOT EXISTS idx_courses_subject ON courses(subject); + +-- Composite index for subject + term queries +CREATE INDEX IF NOT EXISTS idx_courses_subject_term ON courses(subject, term_code); + +-- Index for course number lookups +CREATE INDEX IF NOT EXISTS idx_courses_course_number ON courses(course_number); + +-- Index for last scraped timestamp (useful for finding stale data) +CREATE INDEX IF NOT EXISTS idx_courses_last_scraped ON courses(last_scraped_at); + +-- Index for course metrics time-series queries +-- BRIN index is optimal for time-series data +CREATE INDEX IF NOT EXISTS idx_course_metrics_timestamp ON course_metrics USING BRIN(timestamp); + +-- B-tree index for specific course metric lookups +CREATE INDEX IF NOT EXISTS idx_course_metrics_course_timestamp + ON course_metrics(course_id, timestamp DESC); + +-- Partial index for pending scrape jobs (only unlocked jobs) +CREATE INDEX IF NOT EXISTS idx_scrape_jobs_pending + ON scrape_jobs(execute_at ASC) + WHERE locked_at IS NULL; + +-- Index for high-priority job processing +CREATE INDEX IF NOT EXISTS idx_scrape_jobs_priority_pending + ON scrape_jobs(priority DESC, execute_at ASC) + WHERE locked_at IS NULL; + +-- Index for retry tracking +CREATE INDEX IF NOT EXISTS idx_scrape_jobs_retry_count + ON scrape_jobs(retry_count) + WHERE retry_count > 0 AND locked_at IS NULL; + +-- Analyze tables to update statistics +ANALYZE courses; +ANALYZE course_metrics; +ANALYZE course_audits; +ANALYZE scrape_jobs; diff --git a/migrations/20251103104400_optimize_indexes.sql b/migrations/20251103104400_optimize_indexes.sql new file mode 100644 index 0000000..b05de09 --- /dev/null +++ b/migrations/20251103104400_optimize_indexes.sql @@ -0,0 +1,53 @@ +-- Index Optimization Follow-up Migration + +-- Reason: Redundant with composite index idx_courses_subject_term +DROP INDEX IF EXISTS idx_courses_subject; + +-- Remove: idx_scrape_jobs_retry_count +DROP INDEX IF EXISTS idx_scrape_jobs_retry_count; + +-- Purpose: Optimize the scheduler's frequent query (runs every 60 seconds) +CREATE INDEX IF NOT EXISTS idx_scrape_jobs_scheduler_lookup + ON scrape_jobs(target_type, target_payload) + WHERE locked_at IS NULL; + +-- Note: We use (target_type, target_payload) instead of including locked_at +-- in the index columns because: +-- 1. The WHERE clause filters locked_at IS NULL (partial index optimization) +-- 2. target_payload is JSONB and already large; keeping it as an indexed column +-- allows PostgreSQL to use index-only scans for the SELECT target_payload query +-- 3. This design minimizes index size while maximizing query performance + + +-- Purpose: Enable efficient audit trail queries by course +CREATE INDEX IF NOT EXISTS idx_course_audits_course_timestamp + ON course_audits(course_id, timestamp DESC); + +-- Purpose: Enable queries like "Show all changes in the last 24 hours" +CREATE INDEX IF NOT EXISTS idx_course_audits_timestamp + ON course_audits(timestamp DESC); + + +-- The BRIN index on course_metrics(timestamp) assumes data is inserted in +-- chronological order. BRIN indexes are only effective when data is physically +-- ordered on disk. If you perform: +-- - Backfills of historical data +-- - Out-of-order inserts +-- - Frequent UPDATEs that move rows +-- +-- Then the BRIN index effectiveness will degrade. Monitor with: +-- SELECT * FROM brin_page_items(get_raw_page('idx_course_metrics_timestamp', 1)); +-- +-- If you see poor selectivity, consider: +-- 1. REINDEX to rebuild after bulk loads +-- 2. Switch to B-tree if inserts are not time-ordered +-- 3. Use CLUSTER to physically reorder the table (requires downtime) + +COMMENT ON INDEX idx_course_metrics_timestamp IS + 'BRIN index - requires chronologically ordered inserts for efficiency. Monitor selectivity.'; + +-- Update statistics for query planner +ANALYZE courses; +ANALYZE course_metrics; +ANALYZE course_audits; +ANALYZE scrape_jobs; diff --git a/src/data/batch.rs b/src/data/batch.rs new file mode 100644 index 0000000..35dfc63 --- /dev/null +++ b/src/data/batch.rs @@ -0,0 +1,138 @@ +//! Batch database operations for improved performance. + +use crate::banner::Course; +use crate::error::Result; +use sqlx::PgPool; +use std::time::Instant; +use tracing::info; + +/// Batch upsert courses in a single database query. +/// +/// This function performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses +/// in a single round-trip to the database, significantly reducing overhead compared +/// to individual inserts. +/// +/// # Performance +/// - Reduces N database round-trips to 1 +/// - Typical usage: 50-200 courses per batch +/// - PostgreSQL parameter limit: 65,535 (we use ~10 per course) +/// +/// # Arguments +/// * `courses` - Slice of Course structs from the Banner API +/// * `db_pool` - PostgreSQL connection pool +/// +/// # Returns +/// * `Ok(())` on success +/// * `Err(_)` if the database operation fails +/// +/// # Example +/// ```no_run +/// use banner::data::batch::batch_upsert_courses; +/// use banner::banner::Course; +/// use sqlx::PgPool; +/// +/// async fn example(courses: &[Course], pool: &PgPool) -> anyhow::Result<()> { +/// batch_upsert_courses(courses, pool).await?; +/// Ok(()) +/// } +/// ``` +pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> { + // Early return for empty batches + if courses.is_empty() { + info!("No courses to upsert, skipping batch operation"); + return Ok(()); + } + + let start = Instant::now(); + let course_count = courses.len(); + + // Extract course fields into vectors for UNNEST + let crns: Vec<&str> = courses + .iter() + .map(|c| c.course_reference_number.as_str()) + .collect(); + + let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect(); + + let course_numbers: Vec<&str> = courses + .iter() + .map(|c| c.course_number.as_str()) + .collect(); + + let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect(); + + let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect(); + + let enrollments: Vec = courses.iter().map(|c| c.enrollment).collect(); + + let max_enrollments: Vec = courses.iter().map(|c| c.maximum_enrollment).collect(); + + let wait_counts: Vec = courses.iter().map(|c| c.wait_count).collect(); + + let wait_capacities: Vec = courses.iter().map(|c| c.wait_capacity).collect(); + + // Perform batch upsert using UNNEST for efficient bulk insertion + let result = sqlx::query( + r#" + INSERT INTO courses ( + crn, subject, course_number, title, term_code, + enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at + ) + SELECT * FROM UNNEST( + $1::text[], $2::text[], $3::text[], $4::text[], $5::text[], + $6::int4[], $7::int4[], $8::int4[], $9::int4[], + array_fill(NOW()::timestamptz, ARRAY[$10]) + ) AS t( + crn, subject, course_number, title, term_code, + enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at + ) + ON CONFLICT (crn, term_code) + DO UPDATE SET + subject = EXCLUDED.subject, + course_number = EXCLUDED.course_number, + title = EXCLUDED.title, + enrollment = EXCLUDED.enrollment, + max_enrollment = EXCLUDED.max_enrollment, + wait_count = EXCLUDED.wait_count, + wait_capacity = EXCLUDED.wait_capacity, + last_scraped_at = EXCLUDED.last_scraped_at + "#, + ) + .bind(&crns) + .bind(&subjects) + .bind(&course_numbers) + .bind(&titles) + .bind(&term_codes) + .bind(&enrollments) + .bind(&max_enrollments) + .bind(&wait_counts) + .bind(&wait_capacities) + .bind(course_count as i32) + .execute(db_pool) + .await + .map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?; + + let duration = start.elapsed(); + + info!( + courses_count = course_count, + rows_affected = result.rows_affected(), + duration_ms = duration.as_millis(), + "Batch upserted courses" + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_batch_returns_ok() { + // This is a basic compile-time test + // Runtime tests would require sqlx::test macro and a test database + let courses: Vec = vec![]; + assert_eq!(courses.len(), 0); + } +} diff --git a/src/data/mod.rs b/src/data/mod.rs index ded2bd5..33a732d 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,3 +1,4 @@ //! Database models and schema. +pub mod batch; pub mod models; diff --git a/src/scraper/jobs/subject.rs b/src/scraper/jobs/subject.rs index 36a534d..e0d1a2f 100644 --- a/src/scraper/jobs/subject.rs +++ b/src/scraper/jobs/subject.rs @@ -1,5 +1,6 @@ use super::Job; -use crate::banner::{BannerApi, Course, SearchQuery, Term}; +use crate::banner::{BannerApi, SearchQuery, Term}; +use crate::data::batch::batch_upsert_courses; use crate::data::models::TargetType; use crate::error::Result; use serde::{Deserialize, Serialize}; @@ -42,9 +43,7 @@ impl Job for SubjectJob { count = courses_from_api.len(), "Found courses" ); - for course in courses_from_api { - self.upsert_course(&course, db_pool).await?; - } + batch_upsert_courses(&courses_from_api, db_pool).await?; } debug!(subject = subject_code, "Subject job completed"); @@ -55,37 +54,3 @@ impl Job for SubjectJob { format!("Scrape subject: {}", self.subject) } } - -impl SubjectJob { - async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> { - sqlx::query( - r#" - INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT (crn, term_code) DO UPDATE SET - subject = EXCLUDED.subject, - course_number = EXCLUDED.course_number, - title = EXCLUDED.title, - enrollment = EXCLUDED.enrollment, - max_enrollment = EXCLUDED.max_enrollment, - wait_count = EXCLUDED.wait_count, - wait_capacity = EXCLUDED.wait_capacity, - last_scraped_at = EXCLUDED.last_scraped_at - "#, - ) - .bind(&course.course_reference_number) - .bind(&course.subject) - .bind(&course.course_number) - .bind(&course.course_title) - .bind(&course.term) - .bind(course.enrollment) - .bind(course.maximum_enrollment) - .bind(course.wait_count) - .bind(course.wait_capacity) - .bind(chrono::Utc::now()) - .execute(db_pool) - .await - .map(|_| ()) - .map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}")) - } -}