diff --git a/src/data/scrape_jobs.rs b/src/data/scrape_jobs.rs index a66ba2c..44c8444 100644 --- a/src/data/scrape_jobs.rs +++ b/src/data/scrape_jobs.rs @@ -5,11 +5,33 @@ use crate::error::Result; use sqlx::PgPool; use std::collections::HashSet; +/// Force-unlock all jobs that have a non-NULL `locked_at`. +/// +/// Intended to be called once at startup to recover jobs left locked by +/// a previous unclean shutdown (crash, OOM kill, etc.). +/// +/// # Returns +/// The number of jobs that were unlocked. +pub async fn force_unlock_all(db_pool: &PgPool) -> Result { + let result = sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE locked_at IS NOT NULL") + .execute(db_pool) + .await?; + Ok(result.rows_affected()) +} + +/// How long a lock can be held before it is considered expired and reclaimable. +/// +/// This acts as a safety net for cases where a worker dies without unlocking +/// (OOM kill, crash, network partition). Under normal operation, the worker's +/// own job timeout fires well before this threshold. +const LOCK_EXPIRY: std::time::Duration = std::time::Duration::from_secs(10 * 60); + /// Atomically fetch and lock the next available scrape job. /// /// Uses `FOR UPDATE SKIP LOCKED` to allow multiple workers to poll the queue -/// concurrently without conflicts. Only jobs that are unlocked and ready to -/// execute (based on `execute_at`) are considered. +/// concurrently without conflicts. Considers jobs that are: +/// - Unlocked and ready to execute, OR +/// - Locked but past [`LOCK_EXPIRY`] (abandoned by a dead worker) /// /// # Arguments /// * `db_pool` - PostgreSQL connection pool @@ -20,9 +42,16 @@ use std::collections::HashSet; pub async fn fetch_and_lock_job(db_pool: &PgPool) -> Result> { let mut tx = db_pool.begin().await?; + let lock_expiry_secs = LOCK_EXPIRY.as_secs() as i32; let job = sqlx::query_as::<_, ScrapeJob>( - "SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED" + "SELECT * FROM scrape_jobs \ + WHERE (locked_at IS NULL OR locked_at < NOW() - make_interval(secs => $1::double precision)) \ + AND execute_at <= NOW() \ + ORDER BY priority DESC, execute_at ASC \ + LIMIT 1 \ + FOR UPDATE SKIP LOCKED" ) + .bind(lock_expiry_secs) .fetch_optional(&mut *tx) .await?; @@ -90,7 +119,7 @@ pub async fn unlock_and_increment_retry( "UPDATE scrape_jobs SET locked_at = NULL, retry_count = retry_count + 1 WHERE id = $1 - RETURNING CASE WHEN retry_count < $2 THEN retry_count ELSE NULL END", + RETURNING CASE WHEN retry_count <= $2 THEN retry_count ELSE NULL END", ) .bind(job_id) .bind(max_retries) @@ -100,10 +129,10 @@ pub async fn unlock_and_increment_retry( Ok(result.is_some()) } -/// Find existing unlocked job payloads matching the given target type and candidates. +/// Find existing job payloads matching the given target type and candidates. /// -/// Returns a set of stringified JSON payloads that already exist in the queue, -/// used for deduplication when scheduling new jobs. +/// Returns a set of stringified JSON payloads that already exist in the queue +/// (both locked and unlocked), used for deduplication when scheduling new jobs. /// /// # Arguments /// * `target_type` - The target type to filter by @@ -111,7 +140,7 @@ pub async fn unlock_and_increment_retry( /// * `db_pool` - PostgreSQL connection pool /// /// # Returns -/// A `HashSet` of stringified JSON payloads that already have pending jobs +/// A `HashSet` of stringified JSON payloads that already have pending or in-progress jobs pub async fn find_existing_job_payloads( target_type: TargetType, candidate_payloads: &[serde_json::Value], @@ -119,7 +148,7 @@ pub async fn find_existing_job_payloads( ) -> Result> { let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as( "SELECT target_payload FROM scrape_jobs - WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL", + WHERE target_type = $1 AND target_payload = ANY($2)", ) .bind(target_type) .bind(candidate_payloads) diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 51c3fb0..1914bf0 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -3,6 +3,7 @@ pub mod scheduler; pub mod worker; use crate::banner::BannerApi; +use crate::data::scrape_jobs; use crate::services::Service; use crate::state::ReferenceCache; use crate::status::{ServiceStatus, ServiceStatusRegistry}; @@ -49,7 +50,17 @@ impl ScraperService { } /// Starts the scheduler and a pool of workers. - pub fn start(&mut self) { + /// + /// Force-unlocks any jobs left locked by a previous unclean shutdown before + /// spawning workers, so those jobs re-enter the queue immediately. + pub async fn start(&mut self) { + // Recover jobs left locked by a previous crash/unclean shutdown + match scrape_jobs::force_unlock_all(&self.db_pool).await { + Ok(0) => {} + Ok(count) => warn!(count, "Force-unlocked stale jobs from previous run"), + Err(e) => warn!(error = ?e, "Failed to force-unlock stale jobs"), + } + info!("ScraperService starting"); // Create shutdown channel @@ -92,7 +103,7 @@ impl Service for ScraperService { } async fn run(&mut self) -> Result<(), anyhow::Error> { - self.start(); + self.start().await; std::future::pending::<()>().await; Ok(()) } diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index 94b41e0..3fdc1f0 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -10,6 +10,9 @@ use tokio::sync::broadcast; use tokio::time; use tracing::{Instrument, debug, error, info, trace, warn}; +/// Maximum time a single job is allowed to run before being considered stuck. +const JOB_TIMEOUT: Duration = Duration::from_secs(5 * 60); + /// A single worker instance. /// /// Each worker runs in its own asynchronous task and continuously polls the @@ -62,13 +65,23 @@ impl Worker { let max_retries = job.max_retries; let start = std::time::Instant::now(); - // Process the job, racing against shutdown signal + // Process the job, racing against shutdown signal and timeout let process_result = tokio::select! { _ = shutdown_rx.recv() => { self.handle_shutdown_during_processing(job_id).await; break; } - result = self.process_job(job) => result + result = async { + match time::timeout(JOB_TIMEOUT, self.process_job(job)).await { + Ok(result) => result, + Err(_elapsed) => { + Err(JobError::Recoverable(anyhow::anyhow!( + "job timed out after {}s", + JOB_TIMEOUT.as_secs() + ))) + } + } + } => result }; let duration = start.elapsed(); diff --git a/tests/db_scrape_jobs.rs b/tests/db_scrape_jobs.rs index cb52258..039b6d5 100644 --- a/tests/db_scrape_jobs.rs +++ b/tests/db_scrape_jobs.rs @@ -241,7 +241,7 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) { json!({"subject": "CS"}), ScrapePriority::Medium, true, - 2, // retry_count + 3, // retry_count (already used all 3 retries) 3, // max_retries ) .await; @@ -251,7 +251,7 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) { .unwrap(); assert!( !has_retries, - "should NOT have retries remaining (2→3, max=3)" + "should NOT have retries remaining (3→4, max=3)" ); let (retry_count,): (i32,) = @@ -260,7 +260,7 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) { .fetch_one(&pool) .await .unwrap(); - assert_eq!(retry_count, 3); + assert_eq!(retry_count, 4); } #[sqlx::test] @@ -346,7 +346,7 @@ async fn find_existing_payloads_returns_matching(pool: PgPool) { } #[sqlx::test] -async fn find_existing_payloads_ignores_locked(pool: PgPool) { +async fn find_existing_payloads_includes_locked(pool: PgPool) { let payload = json!({"subject": "CS"}); helpers::insert_scrape_job( @@ -365,7 +365,10 @@ async fn find_existing_payloads_ignores_locked(pool: PgPool) { .await .unwrap(); - assert!(existing.is_empty(), "locked jobs should be ignored"); + assert!( + existing.contains(&payload.to_string()), + "locked jobs should be included in deduplication" + ); } #[sqlx::test]