perf: implement batch operations and optimize database indexes

Add batch upsert functionality to reduce database round-trips from N to 1 when inserting courses. Create comprehensive database indexes for common query patterns including term/subject lookups, time-series metrics, and job scheduling. Remove redundant indexes and add monitoring guidance for BRIN index effectiveness.
This commit is contained in:
Ryan Walters
2025-11-03 11:18:42 -06:00
parent 51f8256e61
commit 1c6d2d4b6e
5 changed files with 240 additions and 38 deletions

View File

@@ -0,0 +1,45 @@
-- Performance optimization indexes
-- Index for term-based queries (most common access pattern)
CREATE INDEX IF NOT EXISTS idx_courses_term_code ON courses(term_code);
-- Index for subject-based filtering
CREATE INDEX IF NOT EXISTS idx_courses_subject ON courses(subject);
-- Composite index for subject + term queries
CREATE INDEX IF NOT EXISTS idx_courses_subject_term ON courses(subject, term_code);
-- Index for course number lookups
CREATE INDEX IF NOT EXISTS idx_courses_course_number ON courses(course_number);
-- Index for last scraped timestamp (useful for finding stale data)
CREATE INDEX IF NOT EXISTS idx_courses_last_scraped ON courses(last_scraped_at);
-- Index for course metrics time-series queries
-- BRIN index is optimal for time-series data
CREATE INDEX IF NOT EXISTS idx_course_metrics_timestamp ON course_metrics USING BRIN(timestamp);
-- B-tree index for specific course metric lookups
CREATE INDEX IF NOT EXISTS idx_course_metrics_course_timestamp
ON course_metrics(course_id, timestamp DESC);
-- Partial index for pending scrape jobs (only unlocked jobs)
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_pending
ON scrape_jobs(execute_at ASC)
WHERE locked_at IS NULL;
-- Index for high-priority job processing
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_priority_pending
ON scrape_jobs(priority DESC, execute_at ASC)
WHERE locked_at IS NULL;
-- Index for retry tracking
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_retry_count
ON scrape_jobs(retry_count)
WHERE retry_count > 0 AND locked_at IS NULL;
-- Analyze tables to update statistics
ANALYZE courses;
ANALYZE course_metrics;
ANALYZE course_audits;
ANALYZE scrape_jobs;

View File

@@ -0,0 +1,53 @@
-- Index Optimization Follow-up Migration
-- Reason: Redundant with composite index idx_courses_subject_term
DROP INDEX IF EXISTS idx_courses_subject;
-- Remove: idx_scrape_jobs_retry_count
DROP INDEX IF EXISTS idx_scrape_jobs_retry_count;
-- Purpose: Optimize the scheduler's frequent query (runs every 60 seconds)
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_scheduler_lookup
ON scrape_jobs(target_type, target_payload)
WHERE locked_at IS NULL;
-- Note: We use (target_type, target_payload) instead of including locked_at
-- in the index columns because:
-- 1. The WHERE clause filters locked_at IS NULL (partial index optimization)
-- 2. target_payload is JSONB and already large; keeping it as an indexed column
-- allows PostgreSQL to use index-only scans for the SELECT target_payload query
-- 3. This design minimizes index size while maximizing query performance
-- Purpose: Enable efficient audit trail queries by course
CREATE INDEX IF NOT EXISTS idx_course_audits_course_timestamp
ON course_audits(course_id, timestamp DESC);
-- Purpose: Enable queries like "Show all changes in the last 24 hours"
CREATE INDEX IF NOT EXISTS idx_course_audits_timestamp
ON course_audits(timestamp DESC);
-- The BRIN index on course_metrics(timestamp) assumes data is inserted in
-- chronological order. BRIN indexes are only effective when data is physically
-- ordered on disk. If you perform:
-- - Backfills of historical data
-- - Out-of-order inserts
-- - Frequent UPDATEs that move rows
--
-- Then the BRIN index effectiveness will degrade. Monitor with:
-- SELECT * FROM brin_page_items(get_raw_page('idx_course_metrics_timestamp', 1));
--
-- If you see poor selectivity, consider:
-- 1. REINDEX to rebuild after bulk loads
-- 2. Switch to B-tree if inserts are not time-ordered
-- 3. Use CLUSTER to physically reorder the table (requires downtime)
COMMENT ON INDEX idx_course_metrics_timestamp IS
'BRIN index - requires chronologically ordered inserts for efficiency. Monitor selectivity.';
-- Update statistics for query planner
ANALYZE courses;
ANALYZE course_metrics;
ANALYZE course_audits;
ANALYZE scrape_jobs;

138
src/data/batch.rs Normal file
View File

@@ -0,0 +1,138 @@
//! Batch database operations for improved performance.
use crate::banner::Course;
use crate::error::Result;
use sqlx::PgPool;
use std::time::Instant;
use tracing::info;
/// Batch upsert courses in a single database query.
///
/// This function performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses
/// in a single round-trip to the database, significantly reducing overhead compared
/// to individual inserts.
///
/// # Performance
/// - Reduces N database round-trips to 1
/// - Typical usage: 50-200 courses per batch
/// - PostgreSQL parameter limit: 65,535 (we use ~10 per course)
///
/// # Arguments
/// * `courses` - Slice of Course structs from the Banner API
/// * `db_pool` - PostgreSQL connection pool
///
/// # Returns
/// * `Ok(())` on success
/// * `Err(_)` if the database operation fails
///
/// # Example
/// ```no_run
/// use banner::data::batch::batch_upsert_courses;
/// use banner::banner::Course;
/// use sqlx::PgPool;
///
/// async fn example(courses: &[Course], pool: &PgPool) -> anyhow::Result<()> {
/// batch_upsert_courses(courses, pool).await?;
/// Ok(())
/// }
/// ```
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> {
// Early return for empty batches
if courses.is_empty() {
info!("No courses to upsert, skipping batch operation");
return Ok(());
}
let start = Instant::now();
let course_count = courses.len();
// Extract course fields into vectors for UNNEST
let crns: Vec<&str> = courses
.iter()
.map(|c| c.course_reference_number.as_str())
.collect();
let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect();
let course_numbers: Vec<&str> = courses
.iter()
.map(|c| c.course_number.as_str())
.collect();
let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect();
let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect();
let enrollments: Vec<i32> = courses.iter().map(|c| c.enrollment).collect();
let max_enrollments: Vec<i32> = courses.iter().map(|c| c.maximum_enrollment).collect();
let wait_counts: Vec<i32> = courses.iter().map(|c| c.wait_count).collect();
let wait_capacities: Vec<i32> = courses.iter().map(|c| c.wait_capacity).collect();
// Perform batch upsert using UNNEST for efficient bulk insertion
let result = sqlx::query(
r#"
INSERT INTO courses (
crn, subject, course_number, title, term_code,
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
)
SELECT * FROM UNNEST(
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
array_fill(NOW()::timestamptz, ARRAY[$10])
) AS t(
crn, subject, course_number, title, term_code,
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
)
ON CONFLICT (crn, term_code)
DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&crns)
.bind(&subjects)
.bind(&course_numbers)
.bind(&titles)
.bind(&term_codes)
.bind(&enrollments)
.bind(&max_enrollments)
.bind(&wait_counts)
.bind(&wait_capacities)
.bind(course_count as i32)
.execute(db_pool)
.await
.map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?;
let duration = start.elapsed();
info!(
courses_count = course_count,
rows_affected = result.rows_affected(),
duration_ms = duration.as_millis(),
"Batch upserted courses"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_batch_returns_ok() {
// This is a basic compile-time test
// Runtime tests would require sqlx::test macro and a test database
let courses: Vec<Course> = vec![];
assert_eq!(courses.len(), 0);
}
}

View File

@@ -1,3 +1,4 @@
//! Database models and schema.
pub mod batch;
pub mod models;

View File

@@ -1,5 +1,6 @@
use super::Job;
use crate::banner::{BannerApi, Course, SearchQuery, Term};
use crate::banner::{BannerApi, SearchQuery, Term};
use crate::data::batch::batch_upsert_courses;
use crate::data::models::TargetType;
use crate::error::Result;
use serde::{Deserialize, Serialize};
@@ -42,9 +43,7 @@ impl Job for SubjectJob {
count = courses_from_api.len(),
"Found courses"
);
for course in courses_from_api {
self.upsert_course(&course, db_pool).await?;
}
batch_upsert_courses(&courses_from_api, db_pool).await?;
}
debug!(subject = subject_code, "Subject job completed");
@@ -55,37 +54,3 @@ impl Job for SubjectJob {
format!("Scrape subject: {}", self.subject)
}
}
impl SubjectJob {
async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> {
sqlx::query(
r#"
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (crn, term_code) DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&course.course_reference_number)
.bind(&course.subject)
.bind(&course.course_number)
.bind(&course.course_title)
.bind(&course.term)
.bind(course.enrollment)
.bind(course.maximum_enrollment)
.bind(course.wait_count)
.bind(course.wait_capacity)
.bind(chrono::Utc::now())
.execute(db_pool)
.await
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}"))
}
}