From 1733ee5f86f30347e38e6f9dafa12a52a5ddcb65 Mon Sep 17 00:00:00 2001 From: Xevion Date: Wed, 28 Jan 2026 17:32:27 -0600 Subject: [PATCH] feat: extract database operations module and add extensive test suite --- Cargo.toml | 1 + src/banner/models/terms.rs | 206 ++++++++++++++++++ src/banner/rate_limiter.rs | 123 +++++++++++ src/banner/session.rs | 99 +++++++++ src/bot/commands/search.rs | 107 +++++++++ src/data/batch.rs | 13 -- src/data/mod.rs | 1 + src/data/scrape_jobs.rs | 170 +++++++++++++++ src/scraper/jobs/mod.rs | 80 +++++++ src/scraper/scheduler.rs | 39 +--- src/scraper/worker.rs | 45 +--- tests/db_batch_upsert.rs | 212 ++++++++++++++++++ tests/db_scrape_jobs.rs | 435 +++++++++++++++++++++++++++++++++++++ tests/helpers/mod.rs | 88 ++++++++ 14 files changed, 1539 insertions(+), 80 deletions(-) create mode 100644 src/data/scrape_jobs.rs create mode 100644 tests/db_batch_upsert.rs create mode 100644 tests/db_scrape_jobs.rs create mode 100644 tests/helpers/mod.rs diff --git a/Cargo.toml b/Cargo.toml index c994ed3..ebed4ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ sqlx = { version = "0.8.6", features = [ "chrono", "json", "macros", + "migrate", ] } thiserror = "2.0.16" time = "0.3.43" diff --git a/src/banner/models/terms.rs b/src/banner/models/terms.rs index 15b2866..b83759e 100644 --- a/src/banner/models/terms.rs +++ b/src/banner/models/terms.rs @@ -240,3 +240,209 @@ impl FromStr for Term { Ok(Term { year, season }) } } + +#[cfg(test)] +mod tests { + use super::*; + + // --- Season::from_str --- + + #[test] + fn test_season_from_str_fall() { + assert_eq!(Season::from_str("10").unwrap(), Season::Fall); + } + + #[test] + fn test_season_from_str_spring() { + assert_eq!(Season::from_str("20").unwrap(), Season::Spring); + } + + #[test] + fn test_season_from_str_summer() { + assert_eq!(Season::from_str("30").unwrap(), Season::Summer); + } + + #[test] + fn test_season_from_str_invalid() { + for input in ["00", "40", "1", ""] { + assert!( + Season::from_str(input).is_err(), + "expected Err for {input:?}" + ); + } + } + + // --- Season Display --- + + #[test] + fn test_season_display() { + assert_eq!(Season::Fall.to_string(), "Fall"); + assert_eq!(Season::Spring.to_string(), "Spring"); + assert_eq!(Season::Summer.to_string(), "Summer"); + } + + #[test] + fn test_season_to_str_roundtrip() { + for season in [Season::Fall, Season::Spring, Season::Summer] { + assert_eq!(Season::from_str(season.to_str()).unwrap(), season); + } + } + + // --- Term::from_str --- + + #[test] + fn test_term_from_str_valid_fall() { + let term = Term::from_str("202510").unwrap(); + assert_eq!(term.year, 2025); + assert_eq!(term.season, Season::Fall); + } + + #[test] + fn test_term_from_str_valid_spring() { + let term = Term::from_str("202520").unwrap(); + assert_eq!(term.year, 2025); + assert_eq!(term.season, Season::Spring); + } + + #[test] + fn test_term_from_str_valid_summer() { + let term = Term::from_str("202530").unwrap(); + assert_eq!(term.year, 2025); + assert_eq!(term.season, Season::Summer); + } + + #[test] + fn test_term_from_str_too_short() { + assert!(Term::from_str("20251").is_err()); + } + + #[test] + fn test_term_from_str_too_long() { + assert!(Term::from_str("2025100").is_err()); + } + + #[test] + fn test_term_from_str_empty() { + assert!(Term::from_str("").is_err()); + } + + #[test] + fn test_term_from_str_invalid_year_chars() { + assert!(Term::from_str("abcd10").is_err()); + } + + #[test] + fn test_term_from_str_invalid_season() { + assert!(Term::from_str("202540").is_err()); + } + + #[test] + fn test_term_from_str_year_below_range() { + assert!(Term::from_str("200010").is_err()); + } + + #[test] + fn test_term_display_roundtrip() { + for code in ["202510", "202520", "202530"] { + let term = Term::from_str(code).unwrap(); + assert_eq!(term.to_string(), code); + } + } + + // --- Term::get_status_for_date --- + + #[test] + fn test_status_mid_spring() { + let date = NaiveDate::from_ymd_opt(2025, 2, 15).unwrap(); + let status = Term::get_status_for_date(date); + assert!( + matches!(status, TermPoint::InTerm { current } if current.season == Season::Spring) + ); + } + + #[test] + fn test_status_mid_summer() { + let date = NaiveDate::from_ymd_opt(2025, 7, 1).unwrap(); + let status = Term::get_status_for_date(date); + assert!( + matches!(status, TermPoint::InTerm { current } if current.season == Season::Summer) + ); + } + + #[test] + fn test_status_mid_fall() { + let date = NaiveDate::from_ymd_opt(2025, 10, 15).unwrap(); + let status = Term::get_status_for_date(date); + assert!(matches!(status, TermPoint::InTerm { current } if current.season == Season::Fall)); + } + + #[test] + fn test_status_between_fall_and_spring() { + let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(); + let status = Term::get_status_for_date(date); + assert!( + matches!(status, TermPoint::BetweenTerms { next } if next.season == Season::Spring) + ); + } + + #[test] + fn test_status_between_spring_and_summer() { + let date = NaiveDate::from_ymd_opt(2025, 5, 15).unwrap(); + let status = Term::get_status_for_date(date); + assert!( + matches!(status, TermPoint::BetweenTerms { next } if next.season == Season::Summer) + ); + } + + #[test] + fn test_status_between_summer_and_fall() { + let date = NaiveDate::from_ymd_opt(2025, 8, 16).unwrap(); + let status = Term::get_status_for_date(date); + assert!(matches!(status, TermPoint::BetweenTerms { next } if next.season == Season::Fall)); + } + + #[test] + fn test_status_after_fall_end() { + let date = NaiveDate::from_ymd_opt(2025, 12, 15).unwrap(); + let status = Term::get_status_for_date(date); + assert!( + matches!(status, TermPoint::BetweenTerms { next } if next.season == Season::Spring) + ); + // Year should roll over: fall 2025 ends → next spring is 2026 + let next_term = status.inner(); + assert_eq!(next_term.year, 2026); + } + + // --- TermPoint::inner --- + + #[test] + fn test_term_point_inner() { + let in_term = TermPoint::InTerm { + current: Term { + year: 2025, + season: Season::Fall, + }, + }; + assert_eq!( + in_term.inner(), + &Term { + year: 2025, + season: Season::Fall + } + ); + + let between = TermPoint::BetweenTerms { + next: Term { + year: 2026, + season: Season::Spring, + }, + }; + assert_eq!( + between.inner(), + &Term { + year: 2026, + season: Season::Spring + } + ); + } +} diff --git a/src/banner/rate_limiter.rs b/src/banner/rate_limiter.rs index 3135cec..3adf7b6 100644 --- a/src/banner/rate_limiter.rs +++ b/src/banner/rate_limiter.rs @@ -85,3 +85,126 @@ pub type SharedRateLimiter = Arc; pub fn create_shared_rate_limiter(config: Option) -> SharedRateLimiter { Arc::new(BannerRateLimiter::new(config.unwrap_or_default())) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new_with_default_config() { + let _limiter = BannerRateLimiter::new(RateLimitingConfig::default()); + } + + #[test] + fn test_new_with_custom_config() { + let config = RateLimitingConfig { + session_rpm: 10, + search_rpm: 30, + metadata_rpm: 20, + reset_rpm: 15, + burst_allowance: 5, + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + fn test_new_with_minimum_valid_values() { + let config = RateLimitingConfig { + session_rpm: 1, + search_rpm: 1, + metadata_rpm: 1, + reset_rpm: 1, + burst_allowance: 1, + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + fn test_new_with_high_rpm_values() { + let config = RateLimitingConfig { + session_rpm: 10000, + search_rpm: 10000, + metadata_rpm: 10000, + reset_rpm: 10000, + burst_allowance: 1, + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + fn test_default_impl() { + let _limiter = BannerRateLimiter::default(); + } + + #[test] + #[should_panic] + fn test_new_panics_on_zero_session_rpm() { + let config = RateLimitingConfig { + session_rpm: 0, + ..RateLimitingConfig::default() + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + #[should_panic] + fn test_new_panics_on_zero_search_rpm() { + let config = RateLimitingConfig { + search_rpm: 0, + ..RateLimitingConfig::default() + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + #[should_panic] + fn test_new_panics_on_zero_metadata_rpm() { + let config = RateLimitingConfig { + metadata_rpm: 0, + ..RateLimitingConfig::default() + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + #[should_panic] + fn test_new_panics_on_zero_reset_rpm() { + let config = RateLimitingConfig { + reset_rpm: 0, + ..RateLimitingConfig::default() + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[test] + #[should_panic] + fn test_new_panics_on_zero_burst_allowance() { + let config = RateLimitingConfig { + burst_allowance: 0, + ..RateLimitingConfig::default() + }; + let _limiter = BannerRateLimiter::new(config); + } + + #[tokio::test] + async fn test_wait_for_permission_completes() { + let limiter = BannerRateLimiter::default(); + let timeout_duration = std::time::Duration::from_secs(1); + + for request_type in [ + RequestType::Session, + RequestType::Search, + RequestType::Metadata, + RequestType::Reset, + ] { + let result = + tokio::time::timeout(timeout_duration, limiter.wait_for_permission(request_type)) + .await; + assert!( + result.is_ok(), + "wait_for_permission timed out for {:?}", + request_type + ); + } + } +} diff --git a/src/banner/session.rs b/src/banner/session.rs index 782472f..b4ddbb9 100644 --- a/src/banner/session.rs +++ b/src/banner/session.rs @@ -101,6 +101,105 @@ impl BannerSession { pub fn been_used(&self) -> bool { self.last_activity.is_some() } + + #[cfg(test)] + pub(crate) fn new_with_created_at( + unique_session_id: &str, + jsessionid: &str, + ssb_cookie: &str, + created_at: Instant, + ) -> Self { + Self { + unique_session_id: unique_session_id.to_string(), + created_at, + last_activity: None, + jsessionid: jsessionid.to_string(), + ssb_cookie: ssb_cookie.to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new_session_returns_ok() { + let session = BannerSession::new("sess-1", "JSID123", "SSB456"); + assert!(session.is_ok()); + assert_eq!(session.unwrap().id(), "sess-1"); + } + + #[test] + fn test_fresh_session_not_expired() { + let session = BannerSession::new("sess-1", "JSID123", "SSB456").unwrap(); + assert!(!session.is_expired()); + } + + #[test] + fn test_fresh_session_not_been_used() { + let session = BannerSession::new("sess-1", "JSID123", "SSB456").unwrap(); + assert!(!session.been_used()); + } + + #[test] + fn test_touch_marks_used() { + let mut session = BannerSession::new("sess-1", "JSID123", "SSB456").unwrap(); + session.touch(); + assert!(session.been_used()); + } + + #[test] + fn test_touched_session_not_expired() { + let mut session = BannerSession::new("sess-1", "JSID123", "SSB456").unwrap(); + session.touch(); + assert!(!session.is_expired()); + } + + #[test] + fn test_cookie_format() { + let session = BannerSession::new("sess-1", "JSID123", "SSB456").unwrap(); + assert_eq!(session.cookie(), "JSESSIONID=JSID123; SSB_COOKIE=SSB456"); + } + + #[test] + fn test_id_returns_unique_session_id() { + let session = BannerSession::new("my-unique-id", "JSID123", "SSB456").unwrap(); + assert_eq!(session.id(), "my-unique-id"); + } + + #[test] + fn test_expired_session() { + let session = BannerSession::new_with_created_at( + "sess-old", + "JSID123", + "SSB456", + Instant::now() - Duration::from_secs(26 * 60), + ); + assert!(session.is_expired()); + } + + #[test] + fn test_not_quite_expired_session() { + let session = BannerSession::new_with_created_at( + "sess-recent", + "JSID123", + "SSB456", + Instant::now() - Duration::from_secs(24 * 60), + ); + assert!(!session.is_expired()); + } + + #[test] + fn test_session_at_expiry_boundary() { + let session = BannerSession::new_with_created_at( + "sess-boundary", + "JSID123", + "SSB456", + Instant::now() - Duration::from_secs(25 * 60 + 1), + ); + assert!(session.is_expired()); + } } /// A smart pointer that returns a BannerSession to the pool when dropped. diff --git a/src/bot/commands/search.rs b/src/bot/commands/search.rs index fbf39e6..d536e3c 100644 --- a/src/bot/commands/search.rs +++ b/src/bot/commands/search.rs @@ -140,3 +140,110 @@ fn parse_course_code(input: &str) -> Result<(i32, i32), Error> { Err(anyhow!("Invalid course code format")) } + +#[cfg(test)] +mod tests { + use super::*; + + // --- Single codes --- + + #[test] + fn test_parse_single_code() { + assert_eq!(parse_course_code("3743").unwrap(), (3743, 3743)); + } + + #[test] + fn test_parse_single_code_boundaries() { + assert_eq!(parse_course_code("1000").unwrap(), (1000, 1000)); + assert_eq!(parse_course_code("9999").unwrap(), (9999, 9999)); + } + + #[test] + fn test_parse_single_code_below_range() { + assert!(parse_course_code("0999").is_err()); + } + + #[test] + fn test_parse_single_code_wrong_length() { + assert!(parse_course_code("123").is_err()); + } + + #[test] + fn test_parse_single_code_non_numeric() { + assert!(parse_course_code("abcd").is_err()); + } + + #[test] + fn test_parse_single_code_trimmed() { + assert_eq!(parse_course_code(" 3743 ").unwrap(), (3743, 3743)); + } + + // --- Ranges --- + + #[test] + fn test_parse_range_full() { + assert_eq!(parse_course_code("3000-3999").unwrap(), (3000, 3999)); + } + + #[test] + fn test_parse_range_same() { + assert_eq!(parse_course_code("3000-3000").unwrap(), (3000, 3000)); + } + + #[test] + fn test_parse_range_open() { + assert_eq!(parse_course_code("3000-").unwrap(), (3000, 9999)); + } + + #[test] + fn test_parse_range_inverted() { + assert!(parse_course_code("5000-3000").is_err()); + } + + #[test] + fn test_parse_range_below_1000() { + assert!(parse_course_code("500-999").is_err()); + } + + #[test] + fn test_parse_range_above_9999() { + assert!(parse_course_code("9000-10000").is_err()); + } + + #[test] + fn test_parse_range_full_valid() { + assert_eq!(parse_course_code("1000-9999").unwrap(), (1000, 9999)); + } + + // --- Wildcards --- + + #[test] + fn test_parse_wildcard_one_x() { + assert_eq!(parse_course_code("300x").unwrap(), (3000, 3009)); + } + + #[test] + fn test_parse_wildcard_two_x() { + assert_eq!(parse_course_code("30xx").unwrap(), (3000, 3099)); + } + + #[test] + fn test_parse_wildcard_three_x() { + assert_eq!(parse_course_code("3xxx").unwrap(), (3000, 3999)); + } + + #[test] + fn test_parse_wildcard_9xxx() { + assert_eq!(parse_course_code("9xxx").unwrap(), (9000, 9999)); + } + + #[test] + fn test_parse_wildcard_wrong_length() { + assert!(parse_course_code("3xxxx").is_err()); + } + + #[test] + fn test_parse_wildcard_0xxx() { + assert!(parse_course_code("0xxx").is_err()); + } +} diff --git a/src/data/batch.rs b/src/data/batch.rs index 158b708..c79ef97 100644 --- a/src/data/batch.rs +++ b/src/data/batch.rs @@ -120,16 +120,3 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_empty_batch_returns_ok() { - // This is a basic compile-time test - // Runtime tests would require sqlx::test macro and a test database - let courses: Vec = vec![]; - assert_eq!(courses.len(), 0); - } -} diff --git a/src/data/mod.rs b/src/data/mod.rs index 33a732d..091b5e7 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -2,3 +2,4 @@ pub mod batch; pub mod models; +pub mod scrape_jobs; diff --git a/src/data/scrape_jobs.rs b/src/data/scrape_jobs.rs new file mode 100644 index 0000000..c0e0975 --- /dev/null +++ b/src/data/scrape_jobs.rs @@ -0,0 +1,170 @@ +//! Database operations for scrape job queue management. + +use crate::data::models::{ScrapeJob, ScrapePriority, TargetType}; +use crate::error::Result; +use sqlx::PgPool; +use std::collections::HashSet; + +/// Atomically fetch and lock the next available scrape job. +/// +/// Uses `FOR UPDATE SKIP LOCKED` to allow multiple workers to poll the queue +/// concurrently without conflicts. Only jobs that are unlocked and ready to +/// execute (based on `execute_at`) are considered. +/// +/// # Arguments +/// * `db_pool` - PostgreSQL connection pool +/// +/// # Returns +/// * `Ok(Some(job))` if a job was successfully fetched and locked +/// * `Ok(None)` if no jobs are available +pub async fn fetch_and_lock_job(db_pool: &PgPool) -> Result> { + let mut tx = db_pool.begin().await?; + + let job = sqlx::query_as::<_, ScrapeJob>( + "SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED" + ) + .fetch_optional(&mut *tx) + .await?; + + if let Some(ref job) = job { + sqlx::query("UPDATE scrape_jobs SET locked_at = NOW() WHERE id = $1") + .bind(job.id) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + + Ok(job) +} + +/// Delete a scrape job by ID. +/// +/// Typically called after a job has been successfully processed or permanently failed. +/// +/// # Arguments +/// * `job_id` - The database ID of the job to delete +/// * `db_pool` - PostgreSQL connection pool +pub async fn delete_job(job_id: i32, db_pool: &PgPool) -> Result<()> { + sqlx::query("DELETE FROM scrape_jobs WHERE id = $1") + .bind(job_id) + .execute(db_pool) + .await?; + Ok(()) +} + +/// Unlock a scrape job by clearing its `locked_at` timestamp. +/// +/// Used to release a job back to the queue, e.g. during graceful shutdown. +/// +/// # Arguments +/// * `job_id` - The database ID of the job to unlock +/// * `db_pool` - PostgreSQL connection pool +pub async fn unlock_job(job_id: i32, db_pool: &PgPool) -> Result<()> { + sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE id = $1") + .bind(job_id) + .execute(db_pool) + .await?; + Ok(()) +} + +/// Atomically unlock a job and increment its retry count. +/// +/// Returns whether the job still has retries remaining. This is determined +/// atomically in the database to avoid race conditions between workers. +/// +/// # Arguments +/// * `job_id` - The database ID of the job +/// * `max_retries` - Maximum number of retries allowed for this job +/// * `db_pool` - PostgreSQL connection pool +/// +/// # Returns +/// * `Ok(true)` if the job was unlocked and retries remain +/// * `Ok(false)` if the job has exhausted its retries +pub async fn unlock_and_increment_retry( + job_id: i32, + max_retries: i32, + db_pool: &PgPool, +) -> Result { + let result = sqlx::query_scalar::<_, Option>( + "UPDATE scrape_jobs + SET locked_at = NULL, retry_count = retry_count + 1 + WHERE id = $1 + RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END", + ) + .bind(job_id) + .bind(max_retries) + .fetch_one(db_pool) + .await?; + + Ok(result.is_some()) +} + +/// Find existing unlocked job payloads matching the given target type and candidates. +/// +/// Returns a set of stringified JSON payloads that already exist in the queue, +/// used for deduplication when scheduling new jobs. +/// +/// # Arguments +/// * `target_type` - The target type to filter by +/// * `candidate_payloads` - Candidate payloads to check against existing jobs +/// * `db_pool` - PostgreSQL connection pool +/// +/// # Returns +/// A `HashSet` of stringified JSON payloads that already have pending jobs +pub async fn find_existing_job_payloads( + target_type: TargetType, + candidate_payloads: &[serde_json::Value], + db_pool: &PgPool, +) -> Result> { + let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as( + "SELECT target_payload FROM scrape_jobs + WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL", + ) + .bind(target_type) + .bind(candidate_payloads) + .fetch_all(db_pool) + .await?; + + let existing_payloads = existing_jobs + .into_iter() + .map(|(payload,)| payload.to_string()) + .collect(); + + Ok(existing_payloads) +} + +/// Batch insert scrape jobs in a single transaction. +/// +/// All jobs are inserted with `execute_at` set to the current time. +/// +/// # Arguments +/// * `jobs` - Slice of `(payload, target_type, priority)` tuples to insert +/// * `db_pool` - PostgreSQL connection pool +pub async fn batch_insert_jobs( + jobs: &[(serde_json::Value, TargetType, ScrapePriority)], + db_pool: &PgPool, +) -> Result<()> { + if jobs.is_empty() { + return Ok(()); + } + + let now = chrono::Utc::now(); + let mut tx = db_pool.begin().await?; + + for (payload, target_type, priority) in jobs { + sqlx::query( + "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)" + ) + .bind(target_type) + .bind(payload) + .bind(priority) + .bind(now) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + + Ok(()) +} diff --git a/src/scraper/jobs/mod.rs b/src/scraper/jobs/mod.rs index 0396290..54c3104 100644 --- a/src/scraper/jobs/mod.rs +++ b/src/scraper/jobs/mod.rs @@ -102,3 +102,83 @@ impl JobType { } } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + // --- Valid dispatch --- + + #[test] + fn test_from_target_subject_valid() { + let result = + JobType::from_target_type_and_payload(TargetType::Subject, json!({"subject": "CS"})); + assert!(matches!(result, Ok(JobType::Subject(_)))); + } + + #[test] + fn test_from_target_subject_empty_string() { + let result = + JobType::from_target_type_and_payload(TargetType::Subject, json!({"subject": ""})); + assert!(matches!(result, Ok(JobType::Subject(_)))); + } + + // --- Invalid JSON --- + + #[test] + fn test_from_target_subject_missing_field() { + let result = JobType::from_target_type_and_payload(TargetType::Subject, json!({})); + assert!(matches!(result, Err(JobParseError::InvalidJson(_)))); + } + + #[test] + fn test_from_target_subject_wrong_type() { + let result = + JobType::from_target_type_and_payload(TargetType::Subject, json!({"subject": 123})); + assert!(matches!(result, Err(JobParseError::InvalidJson(_)))); + } + + #[test] + fn test_from_target_subject_null_payload() { + let result = JobType::from_target_type_and_payload(TargetType::Subject, json!(null)); + assert!(matches!(result, Err(JobParseError::InvalidJson(_)))); + } + + // --- Unsupported target types --- + + #[test] + fn test_from_target_unsupported_variants() { + let unsupported = [ + TargetType::CourseRange, + TargetType::CrnList, + TargetType::SingleCrn, + ]; + for target_type in unsupported { + let result = + JobType::from_target_type_and_payload(target_type, json!({"subject": "CS"})); + assert!( + matches!(result, Err(JobParseError::UnsupportedTargetType(_))), + "expected UnsupportedTargetType for {target_type:?}" + ); + } + } + + // --- Error Display --- + + #[test] + fn test_job_parse_error_display() { + let invalid_json_err = + JobType::from_target_type_and_payload(TargetType::Subject, json!(null)).unwrap_err(); + let display = invalid_json_err.to_string(); + assert!(display.contains("Invalid JSON"), "got: {display}"); + + let unsupported_err = + JobType::from_target_type_and_payload(TargetType::CrnList, json!({})).unwrap_err(); + let display = unsupported_err.to_string(); + assert!( + display.contains("Unsupported target type"), + "got: {display}" + ); + } +} diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs index 2194cc0..3ea30dc 100644 --- a/src/scraper/scheduler.rs +++ b/src/scraper/scheduler.rs @@ -1,5 +1,6 @@ use crate::banner::{BannerApi, Term}; use crate::data::models::{ScrapePriority, TargetType}; +use crate::data::scrape_jobs; use crate::error::Result; use crate::scraper::jobs::subject::SubjectJob; use serde_json::json; @@ -123,21 +124,13 @@ impl Scheduler { .collect(); // Query existing jobs for all subjects in a single query - let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as( - "SELECT target_payload FROM scrape_jobs - WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL", + let existing_payloads = scrape_jobs::find_existing_job_payloads( + TargetType::Subject, + &subject_payloads, + db_pool, ) - .bind(TargetType::Subject) - .bind(&subject_payloads) - .fetch_all(db_pool) .await?; - // Convert to a HashSet for efficient lookup - let existing_payloads: std::collections::HashSet = existing_jobs - .into_iter() - .map(|(payload,)| payload.to_string()) - .collect(); - // Filter out subjects that already have jobs and prepare new jobs let mut skipped_count = 0; let new_jobs: Vec<_> = subjects @@ -162,24 +155,16 @@ impl Scheduler { // Insert all new jobs in a single batch if !new_jobs.is_empty() { - let now = chrono::Utc::now(); - let mut tx = db_pool.begin().await?; - - for (payload, subject_code) in new_jobs { - sqlx::query( - "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)" - ) - .bind(TargetType::Subject) - .bind(&payload) - .bind(ScrapePriority::Low) - .bind(now) - .execute(&mut *tx) - .await?; - + for (_, subject_code) in &new_jobs { debug!(subject = subject_code, "New job enqueued for subject"); } - tx.commit().await?; + let jobs: Vec<_> = new_jobs + .into_iter() + .map(|(payload, _)| (payload, TargetType::Subject, ScrapePriority::Low)) + .collect(); + + scrape_jobs::batch_insert_jobs(&jobs, db_pool).await?; } debug!("Job scheduling complete"); diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index 29ae0c5..fefa787 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -1,5 +1,6 @@ use crate::banner::{BannerApi, BannerApiError}; use crate::data::models::ScrapeJob; +use crate::data::scrape_jobs; use crate::error::Result; use crate::scraper::jobs::{JobError, JobType}; use sqlx::PgPool; @@ -83,24 +84,7 @@ impl Worker { /// This uses a `FOR UPDATE SKIP LOCKED` query to ensure that multiple /// workers can poll the queue concurrently without conflicts. async fn fetch_and_lock_job(&self) -> Result> { - let mut tx = self.db_pool.begin().await?; - - let job = sqlx::query_as::<_, ScrapeJob>( - "SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED" - ) - .fetch_optional(&mut *tx) - .await?; - - if let Some(ref job) = job { - sqlx::query("UPDATE scrape_jobs SET locked_at = NOW() WHERE id = $1") - .bind(job.id) - .execute(&mut *tx) - .await?; - } - - tx.commit().await?; - - Ok(job) + scrape_jobs::fetch_and_lock_job(&self.db_pool).await } async fn process_job(&self, job: ScrapeJob) -> Result<(), JobError> { @@ -139,34 +123,15 @@ impl Worker { } async fn delete_job(&self, job_id: i32) -> Result<()> { - sqlx::query("DELETE FROM scrape_jobs WHERE id = $1") - .bind(job_id) - .execute(&self.db_pool) - .await?; - Ok(()) + scrape_jobs::delete_job(job_id, &self.db_pool).await } async fn unlock_job(&self, job_id: i32) -> Result<()> { - sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE id = $1") - .bind(job_id) - .execute(&self.db_pool) - .await?; - Ok(()) + scrape_jobs::unlock_job(job_id, &self.db_pool).await } async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result { - let result = sqlx::query_scalar::<_, Option>( - "UPDATE scrape_jobs - SET locked_at = NULL, retry_count = retry_count + 1 - WHERE id = $1 - RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END", - ) - .bind(job_id) - .bind(max_retries) - .fetch_one(&self.db_pool) - .await?; - - Ok(result.is_some()) + scrape_jobs::unlock_and_increment_retry(job_id, max_retries, &self.db_pool).await } /// Handle shutdown signal received during job processing diff --git a/tests/db_batch_upsert.rs b/tests/db_batch_upsert.rs new file mode 100644 index 0000000..771ab40 --- /dev/null +++ b/tests/db_batch_upsert.rs @@ -0,0 +1,212 @@ +mod helpers; + +use banner::data::batch::batch_upsert_courses; +use sqlx::PgPool; + +#[sqlx::test] +async fn test_batch_upsert_empty_slice(pool: PgPool) { + batch_upsert_courses(&[], &pool).await.unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM courses") + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(count.0, 0); +} + +#[sqlx::test] +async fn test_batch_upsert_inserts_new_courses(pool: PgPool) { + let courses = vec![ + helpers::make_course("10001", "202510", "CS", "1083", "Intro to CS", 25, 30, 0, 5), + helpers::make_course( + "10002", + "202510", + "MAT", + "1214", + "Calculus I", + 40, + 45, + 3, + 10, + ), + ]; + + batch_upsert_courses(&courses, &pool).await.unwrap(); + + let rows: Vec<(String, String, String, String, i32, i32, i32, i32)> = sqlx::query_as( + "SELECT crn, subject, course_number, title, enrollment, max_enrollment, wait_count, wait_capacity + FROM courses ORDER BY crn", + ) + .fetch_all(&pool) + .await + .unwrap(); + + assert_eq!(rows.len(), 2); + + let (crn, subject, course_number, title, enrollment, max_enrollment, wait_count, wait_capacity) = + &rows[0]; + assert_eq!(crn, "10001"); + assert_eq!(subject, "CS"); + assert_eq!(course_number, "1083"); + assert_eq!(title, "Intro to CS"); + assert_eq!(*enrollment, 25); + assert_eq!(*max_enrollment, 30); + assert_eq!(*wait_count, 0); + assert_eq!(*wait_capacity, 5); + + let (crn, subject, ..) = &rows[1]; + assert_eq!(crn, "10002"); + assert_eq!(subject, "MAT"); +} + +#[sqlx::test] +async fn test_batch_upsert_updates_existing(pool: PgPool) { + let initial = vec![helpers::make_course( + "20001", + "202510", + "CS", + "3443", + "App Programming", + 10, + 35, + 0, + 5, + )]; + batch_upsert_courses(&initial, &pool).await.unwrap(); + + // Upsert the same CRN+term with updated enrollment + let updated = vec![helpers::make_course( + "20001", + "202510", + "CS", + "3443", + "App Programming", + 30, + 35, + 2, + 5, + )]; + batch_upsert_courses(&updated, &pool).await.unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM courses") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count.0, 1, "upsert should not create a duplicate row"); + + let (enrollment, wait_count): (i32, i32) = + sqlx::query_as("SELECT enrollment, wait_count FROM courses WHERE crn = '20001'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(enrollment, 30); + assert_eq!(wait_count, 2); +} + +#[sqlx::test] +async fn test_batch_upsert_mixed_insert_and_update(pool: PgPool) { + let initial = vec![ + helpers::make_course("30001", "202510", "CS", "1083", "Intro to CS", 10, 30, 0, 5), + helpers::make_course( + "30002", + "202510", + "CS", + "2073", + "Computer Architecture", + 20, + 30, + 0, + 5, + ), + ]; + batch_upsert_courses(&initial, &pool).await.unwrap(); + + // Update both existing courses and add a new one + let mixed = vec![ + helpers::make_course("30001", "202510", "CS", "1083", "Intro to CS", 15, 30, 1, 5), + helpers::make_course( + "30002", + "202510", + "CS", + "2073", + "Computer Architecture", + 25, + 30, + 0, + 5, + ), + helpers::make_course( + "30003", + "202510", + "MAT", + "1214", + "Calculus I", + 40, + 45, + 3, + 10, + ), + ]; + batch_upsert_courses(&mixed, &pool).await.unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM courses") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count.0, 3, "should have 2 updated + 1 new = 3 total rows"); + + // Verify updated values + let (enrollment,): (i32,) = + sqlx::query_as("SELECT enrollment FROM courses WHERE crn = '30001'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(enrollment, 15); + + let (enrollment,): (i32,) = + sqlx::query_as("SELECT enrollment FROM courses WHERE crn = '30002'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(enrollment, 25); + + // Verify new row + let (subject,): (String,) = sqlx::query_as("SELECT subject FROM courses WHERE crn = '30003'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(subject, "MAT"); +} + +#[sqlx::test] +async fn test_batch_upsert_unique_constraint_crn_term(pool: PgPool) { + // Same CRN, different term codes → should produce two separate rows + let courses = vec![ + helpers::make_course("40001", "202510", "CS", "1083", "Intro to CS", 25, 30, 0, 5), + helpers::make_course("40001", "202520", "CS", "1083", "Intro to CS", 10, 30, 0, 5), + ]; + + batch_upsert_courses(&courses, &pool).await.unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM courses WHERE crn = '40001'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + count.0, 2, + "same CRN with different term codes should be separate rows" + ); + + let rows: Vec<(String, i32)> = sqlx::query_as( + "SELECT term_code, enrollment FROM courses WHERE crn = '40001' ORDER BY term_code", + ) + .fetch_all(&pool) + .await + .unwrap(); + + assert_eq!(rows[0].0, "202510"); + assert_eq!(rows[0].1, 25); + assert_eq!(rows[1].0, "202520"); + assert_eq!(rows[1].1, 10); +} diff --git a/tests/db_scrape_jobs.rs b/tests/db_scrape_jobs.rs new file mode 100644 index 0000000..cb52258 --- /dev/null +++ b/tests/db_scrape_jobs.rs @@ -0,0 +1,435 @@ +mod helpers; + +use banner::data::models::{ScrapePriority, TargetType}; +use banner::data::scrape_jobs; +use serde_json::json; +use sqlx::PgPool; + +// ── fetch_and_lock_job ────────────────────────────────────────────── + +#[sqlx::test] +async fn fetch_and_lock_empty_queue(pool: PgPool) { + let result = scrape_jobs::fetch_and_lock_job(&pool).await.unwrap(); + assert!(result.is_none()); +} + +#[sqlx::test] +async fn fetch_and_lock_returns_job_and_sets_locked_at(pool: PgPool) { + let id = helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CS"}), + ScrapePriority::Medium, + false, + 0, + 3, + ) + .await; + + let job = scrape_jobs::fetch_and_lock_job(&pool) + .await + .unwrap() + .expect("should return a job"); + + assert_eq!(job.id, id); + assert!(matches!(job.target_type, TargetType::Subject)); + assert_eq!(job.target_payload, json!({"subject": "CS"})); + + // Verify locked_at was set in the database + let (locked_at,): (Option>,) = + sqlx::query_as("SELECT locked_at FROM scrape_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert!(locked_at.is_some(), "locked_at should be set after fetch"); +} + +#[sqlx::test] +async fn fetch_and_lock_skips_locked_jobs(pool: PgPool) { + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CS"}), + ScrapePriority::Medium, + true, // locked + 0, + 3, + ) + .await; + + let result = scrape_jobs::fetch_and_lock_job(&pool).await.unwrap(); + assert!(result.is_none(), "locked jobs should be skipped"); +} + +#[sqlx::test] +async fn fetch_and_lock_skips_future_execute_at(pool: PgPool) { + // Insert a job with execute_at in the future via raw SQL + sqlx::query( + "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) + VALUES ('Subject', '{\"subject\": \"CS\"}', 'Medium', NOW() + INTERVAL '1 hour')", + ) + .execute(&pool) + .await + .unwrap(); + + let result = scrape_jobs::fetch_and_lock_job(&pool).await.unwrap(); + assert!(result.is_none(), "future execute_at jobs should be skipped"); +} + +#[sqlx::test] +async fn fetch_and_lock_priority_desc_ordering(pool: PgPool) { + // Insert low priority first, then critical + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "LOW"}), + ScrapePriority::Low, + false, + 0, + 3, + ) + .await; + + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CRIT"}), + ScrapePriority::Critical, + false, + 0, + 3, + ) + .await; + + let job = scrape_jobs::fetch_and_lock_job(&pool) + .await + .unwrap() + .expect("should return a job"); + + assert_eq!( + job.target_payload, + json!({"subject": "CRIT"}), + "Critical priority should be fetched before Low" + ); +} + +#[sqlx::test] +async fn fetch_and_lock_execute_at_asc_ordering(pool: PgPool) { + // Insert an older job and a newer job, both same priority + sqlx::query( + "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) + VALUES ('Subject', '{\"subject\": \"OLDER\"}', 'Medium', NOW() - INTERVAL '2 hours')", + ) + .execute(&pool) + .await + .unwrap(); + + sqlx::query( + "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) + VALUES ('Subject', '{\"subject\": \"NEWER\"}', 'Medium', NOW() - INTERVAL '1 hour')", + ) + .execute(&pool) + .await + .unwrap(); + + let job = scrape_jobs::fetch_and_lock_job(&pool) + .await + .unwrap() + .expect("should return a job"); + + assert_eq!( + job.target_payload, + json!({"subject": "OLDER"}), + "Older execute_at should be fetched first" + ); +} + +// ── delete_job ────────────────────────────────────────────────────── + +#[sqlx::test] +async fn delete_job_removes_row(pool: PgPool) { + let id = helpers::insert_scrape_job( + &pool, + TargetType::SingleCrn, + json!({"crn": "12345"}), + ScrapePriority::High, + false, + 0, + 3, + ) + .await; + + scrape_jobs::delete_job(id, &pool).await.unwrap(); + + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrape_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 0, "row should be deleted"); +} + +#[sqlx::test] +async fn delete_job_nonexistent_id_no_error(pool: PgPool) { + // Deleting a non-existent ID should not error + scrape_jobs::delete_job(999_999, &pool).await.unwrap(); +} + +// ── unlock_job ────────────────────────────────────────────────────── + +#[sqlx::test] +async fn unlock_job_clears_locked_at(pool: PgPool) { + let id = helpers::insert_scrape_job( + &pool, + TargetType::CrnList, + json!({"crns": [1, 2, 3]}), + ScrapePriority::Medium, + true, // locked + 0, + 3, + ) + .await; + + scrape_jobs::unlock_job(id, &pool).await.unwrap(); + + let (locked_at,): (Option>,) = + sqlx::query_as("SELECT locked_at FROM scrape_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert!(locked_at.is_none(), "locked_at should be cleared"); +} + +// ── unlock_and_increment_retry ────────────────────────────────────── + +#[sqlx::test] +async fn unlock_and_increment_retry_has_retries_remaining(pool: PgPool) { + let id = helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CS"}), + ScrapePriority::Medium, + true, + 0, // retry_count + 3, // max_retries + ) + .await; + + let has_retries = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) + .await + .unwrap(); + assert!(has_retries, "should have retries remaining (0→1, max=3)"); + + // Verify state in DB + let (retry_count, locked_at): (i32, Option>) = + sqlx::query_as("SELECT retry_count, locked_at FROM scrape_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(retry_count, 1); + assert!(locked_at.is_none(), "should be unlocked"); +} + +#[sqlx::test] +async fn unlock_and_increment_retry_exhausted(pool: PgPool) { + let id = helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CS"}), + ScrapePriority::Medium, + true, + 2, // retry_count + 3, // max_retries + ) + .await; + + let has_retries = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) + .await + .unwrap(); + assert!( + !has_retries, + "should NOT have retries remaining (2→3, max=3)" + ); + + let (retry_count,): (i32,) = + sqlx::query_as("SELECT retry_count FROM scrape_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(retry_count, 3); +} + +#[sqlx::test] +async fn unlock_and_increment_retry_already_exceeded(pool: PgPool) { + let id = helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CS"}), + ScrapePriority::Medium, + true, + 5, // retry_count already past max + 3, // max_retries + ) + .await; + + let has_retries = scrape_jobs::unlock_and_increment_retry(id, 3, &pool) + .await + .unwrap(); + assert!( + !has_retries, + "should NOT have retries remaining (5→6, max=3)" + ); + + let (retry_count,): (i32,) = + sqlx::query_as("SELECT retry_count FROM scrape_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(retry_count, 6); +} + +// ── find_existing_job_payloads ────────────────────────────────────── + +#[sqlx::test] +async fn find_existing_payloads_returns_matching(pool: PgPool) { + let payload_a = json!({"subject": "CS"}); + let payload_b = json!({"subject": "MAT"}); + let payload_c = json!({"subject": "ENG"}); + + // Insert A and B as Subject jobs + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + payload_a.clone(), + ScrapePriority::Medium, + false, + 0, + 3, + ) + .await; + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + payload_b.clone(), + ScrapePriority::Medium, + false, + 0, + 3, + ) + .await; + // Insert C as a different target type + helpers::insert_scrape_job( + &pool, + TargetType::SingleCrn, + payload_c.clone(), + ScrapePriority::Medium, + false, + 0, + 3, + ) + .await; + + let candidates = vec![payload_a.clone(), payload_b.clone(), payload_c.clone()]; + let existing = scrape_jobs::find_existing_job_payloads(TargetType::Subject, &candidates, &pool) + .await + .unwrap(); + + assert!(existing.contains(&payload_a.to_string())); + assert!(existing.contains(&payload_b.to_string())); + // payload_c is SingleCrn, not Subject — should not match + assert!(!existing.contains(&payload_c.to_string())); +} + +#[sqlx::test] +async fn find_existing_payloads_ignores_locked(pool: PgPool) { + let payload = json!({"subject": "CS"}); + + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + payload.clone(), + ScrapePriority::Medium, + true, // locked + 0, + 3, + ) + .await; + + let candidates = vec![payload.clone()]; + let existing = scrape_jobs::find_existing_job_payloads(TargetType::Subject, &candidates, &pool) + .await + .unwrap(); + + assert!(existing.is_empty(), "locked jobs should be ignored"); +} + +#[sqlx::test] +async fn find_existing_payloads_empty_candidates(pool: PgPool) { + // Insert a job so the table isn't empty + helpers::insert_scrape_job( + &pool, + TargetType::Subject, + json!({"subject": "CS"}), + ScrapePriority::Medium, + false, + 0, + 3, + ) + .await; + + let existing = scrape_jobs::find_existing_job_payloads(TargetType::Subject, &[], &pool) + .await + .unwrap(); + + assert!( + existing.is_empty(), + "empty candidates should return empty result" + ); +} + +// ── batch_insert_jobs ─────────────────────────────────────────────── + +#[sqlx::test] +async fn batch_insert_jobs_inserts_multiple(pool: PgPool) { + let jobs = vec![ + ( + json!({"subject": "CS"}), + TargetType::Subject, + ScrapePriority::High, + ), + ( + json!({"subject": "MAT"}), + TargetType::Subject, + ScrapePriority::Medium, + ), + ( + json!({"crn": "12345"}), + TargetType::SingleCrn, + ScrapePriority::Low, + ), + ]; + + scrape_jobs::batch_insert_jobs(&jobs, &pool).await.unwrap(); + + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrape_jobs") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 3); +} + +#[sqlx::test] +async fn batch_insert_jobs_empty_slice(pool: PgPool) { + scrape_jobs::batch_insert_jobs(&[], &pool).await.unwrap(); + + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrape_jobs") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 0); +} diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs new file mode 100644 index 0000000..3ac8b0c --- /dev/null +++ b/tests/helpers/mod.rs @@ -0,0 +1,88 @@ +use banner::banner::Course; +use banner::data::models::{ScrapePriority, TargetType}; +use chrono::Utc; +use sqlx::PgPool; + +/// Build a test `Course` (Banner API model) with sensible defaults. +/// +/// Only the fields used by `batch_upsert_courses` need meaningful values; +/// the rest are filled with harmless placeholders. +pub fn make_course( + crn: &str, + term: &str, + subject: &str, + course_number: &str, + title: &str, + enrollment: i32, + max_enrollment: i32, + wait_count: i32, + wait_capacity: i32, +) -> Course { + Course { + id: 0, + term: term.to_owned(), + term_desc: String::new(), + course_reference_number: crn.to_owned(), + part_of_term: "1".to_owned(), + course_number: course_number.to_owned(), + subject: subject.to_owned(), + subject_description: subject.to_owned(), + sequence_number: "001".to_owned(), + campus_description: "Main Campus".to_owned(), + schedule_type_description: "Lecture".to_owned(), + course_title: title.to_owned(), + credit_hours: Some(3), + maximum_enrollment: max_enrollment, + enrollment, + seats_available: max_enrollment - enrollment, + wait_capacity, + wait_count, + cross_list: None, + cross_list_capacity: None, + cross_list_count: None, + cross_list_available: None, + credit_hour_high: None, + credit_hour_low: None, + credit_hour_indicator: None, + open_section: enrollment < max_enrollment, + link_identifier: None, + is_section_linked: false, + subject_course: format!("{subject}{course_number}"), + reserved_seat_summary: None, + instructional_method: "FF".to_owned(), + instructional_method_description: "Face to Face".to_owned(), + section_attributes: vec![], + faculty: vec![], + meetings_faculty: vec![], + } +} + +/// Insert a scrape job row directly via SQL, returning the generated ID. +pub async fn insert_scrape_job( + pool: &PgPool, + target_type: TargetType, + payload: serde_json::Value, + priority: ScrapePriority, + locked: bool, + retry_count: i32, + max_retries: i32, +) -> i32 { + let locked_at = if locked { Some(Utc::now()) } else { None }; + + let (id,): (i32,) = sqlx::query_as( + "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at, locked_at, retry_count, max_retries) + VALUES ($1, $2, $3, NOW(), $4, $5, $6) + RETURNING id", + ) + .bind(target_type) + .bind(payload) + .bind(priority) + .bind(locked_at) + .bind(retry_count) + .bind(max_retries) + .fetch_one(pool) + .await + .expect("insert_scrape_job failed"); + + id +}