feat: implement worker timeout protection and crash recovery for job queue

Add JOB_TIMEOUT constant to fail stuck jobs after 5 minutes, and
LOCK_EXPIRY to reclaim abandoned locks after 10 minutes. Introduce
force_unlock_all to recover orphaned jobs at startup. Fix retry limit
off-by-one error and update deduplication to include locked jobs.
This commit is contained in:
2026-01-29 15:50:09 -06:00
parent db0ec1e69d
commit e880126281
4 changed files with 74 additions and 18 deletions
+38 -9
View File
@@ -5,11 +5,33 @@ use crate::error::Result;
use sqlx::PgPool;
use std::collections::HashSet;
/// Force-unlock all jobs that have a non-NULL `locked_at`.
///
/// Intended to be called once at startup to recover jobs left locked by
/// a previous unclean shutdown (crash, OOM kill, etc.).
///
/// # Returns
/// The number of jobs that were unlocked.
pub async fn force_unlock_all(db_pool: &PgPool) -> Result<u64> {
let result = sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE locked_at IS NOT NULL")
.execute(db_pool)
.await?;
Ok(result.rows_affected())
}
/// How long a lock can be held before it is considered expired and reclaimable.
///
/// This acts as a safety net for cases where a worker dies without unlocking
/// (OOM kill, crash, network partition). Under normal operation, the worker's
/// own job timeout fires well before this threshold.
const LOCK_EXPIRY: std::time::Duration = std::time::Duration::from_secs(10 * 60);
/// Atomically fetch and lock the next available scrape job.
///
/// Uses `FOR UPDATE SKIP LOCKED` to allow multiple workers to poll the queue
/// concurrently without conflicts. Only jobs that are unlocked and ready to
/// execute (based on `execute_at`) are considered.
/// concurrently without conflicts. Considers jobs that are:
/// - Unlocked and ready to execute, OR
/// - Locked but past [`LOCK_EXPIRY`] (abandoned by a dead worker)
///
/// # Arguments
/// * `db_pool` - PostgreSQL connection pool
@@ -20,9 +42,16 @@ use std::collections::HashSet;
pub async fn fetch_and_lock_job(db_pool: &PgPool) -> Result<Option<ScrapeJob>> {
let mut tx = db_pool.begin().await?;
let lock_expiry_secs = LOCK_EXPIRY.as_secs() as i32;
let job = sqlx::query_as::<_, ScrapeJob>(
"SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED"
"SELECT * FROM scrape_jobs \
WHERE (locked_at IS NULL OR locked_at < NOW() - make_interval(secs => $1::double precision)) \
AND execute_at <= NOW() \
ORDER BY priority DESC, execute_at ASC \
LIMIT 1 \
FOR UPDATE SKIP LOCKED"
)
.bind(lock_expiry_secs)
.fetch_optional(&mut *tx)
.await?;
@@ -90,7 +119,7 @@ pub async fn unlock_and_increment_retry(
"UPDATE scrape_jobs
SET locked_at = NULL, retry_count = retry_count + 1
WHERE id = $1
RETURNING CASE WHEN retry_count < $2 THEN retry_count ELSE NULL END",
RETURNING CASE WHEN retry_count <= $2 THEN retry_count ELSE NULL END",
)
.bind(job_id)
.bind(max_retries)
@@ -100,10 +129,10 @@ pub async fn unlock_and_increment_retry(
Ok(result.is_some())
}
/// Find existing unlocked job payloads matching the given target type and candidates.
/// Find existing job payloads matching the given target type and candidates.
///
/// Returns a set of stringified JSON payloads that already exist in the queue,
/// used for deduplication when scheduling new jobs.
/// Returns a set of stringified JSON payloads that already exist in the queue
/// (both locked and unlocked), used for deduplication when scheduling new jobs.
///
/// # Arguments
/// * `target_type` - The target type to filter by
@@ -111,7 +140,7 @@ pub async fn unlock_and_increment_retry(
/// * `db_pool` - PostgreSQL connection pool
///
/// # Returns
/// A `HashSet` of stringified JSON payloads that already have pending jobs
/// A `HashSet` of stringified JSON payloads that already have pending or in-progress jobs
pub async fn find_existing_job_payloads(
target_type: TargetType,
candidate_payloads: &[serde_json::Value],
@@ -119,7 +148,7 @@ pub async fn find_existing_job_payloads(
) -> Result<HashSet<String>> {
let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as(
"SELECT target_payload FROM scrape_jobs
WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL",
WHERE target_type = $1 AND target_payload = ANY($2)",
)
.bind(target_type)
.bind(candidate_payloads)
+13 -2
View File
@@ -3,6 +3,7 @@ pub mod scheduler;
pub mod worker;
use crate::banner::BannerApi;
use crate::data::scrape_jobs;
use crate::services::Service;
use crate::state::ReferenceCache;
use crate::status::{ServiceStatus, ServiceStatusRegistry};
@@ -49,7 +50,17 @@ impl ScraperService {
}
/// Starts the scheduler and a pool of workers.
pub fn start(&mut self) {
///
/// Force-unlocks any jobs left locked by a previous unclean shutdown before
/// spawning workers, so those jobs re-enter the queue immediately.
pub async fn start(&mut self) {
// Recover jobs left locked by a previous crash/unclean shutdown
match scrape_jobs::force_unlock_all(&self.db_pool).await {
Ok(0) => {}
Ok(count) => warn!(count, "Force-unlocked stale jobs from previous run"),
Err(e) => warn!(error = ?e, "Failed to force-unlock stale jobs"),
}
info!("ScraperService starting");
// Create shutdown channel
@@ -92,7 +103,7 @@ impl Service for ScraperService {
}
async fn run(&mut self) -> Result<(), anyhow::Error> {
self.start();
self.start().await;
std::future::pending::<()>().await;
Ok(())
}
+15 -2
View File
@@ -10,6 +10,9 @@ use tokio::sync::broadcast;
use tokio::time;
use tracing::{Instrument, debug, error, info, trace, warn};
/// Maximum time a single job is allowed to run before being considered stuck.
const JOB_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// A single worker instance.
///
/// Each worker runs in its own asynchronous task and continuously polls the
@@ -62,13 +65,23 @@ impl Worker {
let max_retries = job.max_retries;
let start = std::time::Instant::now();
// Process the job, racing against shutdown signal
// Process the job, racing against shutdown signal and timeout
let process_result = tokio::select! {
_ = shutdown_rx.recv() => {
self.handle_shutdown_during_processing(job_id).await;
break;
}
result = self.process_job(job) => result
result = async {
match time::timeout(JOB_TIMEOUT, self.process_job(job)).await {
Ok(result) => result,
Err(_elapsed) => {
Err(JobError::Recoverable(anyhow::anyhow!(
"job timed out after {}s",
JOB_TIMEOUT.as_secs()
)))
}
}
} => result
};
let duration = start.elapsed();
+8 -5
View File
@@ -241,7 +241,7 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) {
json!({"subject": "CS"}),
ScrapePriority::Medium,
true,
2, // retry_count
3, // retry_count (already used all 3 retries)
3, // max_retries
)
.await;
@@ -251,7 +251,7 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) {
.unwrap();
assert!(
!has_retries,
"should NOT have retries remaining (2→3, max=3)"
"should NOT have retries remaining (3→4, max=3)"
);
let (retry_count,): (i32,) =
@@ -260,7 +260,7 @@ async fn unlock_and_increment_retry_exhausted(pool: PgPool) {
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(retry_count, 3);
assert_eq!(retry_count, 4);
}
#[sqlx::test]
@@ -346,7 +346,7 @@ async fn find_existing_payloads_returns_matching(pool: PgPool) {
}
#[sqlx::test]
async fn find_existing_payloads_ignores_locked(pool: PgPool) {
async fn find_existing_payloads_includes_locked(pool: PgPool) {
let payload = json!({"subject": "CS"});
helpers::insert_scrape_job(
@@ -365,7 +365,10 @@ async fn find_existing_payloads_ignores_locked(pool: PgPool) {
.await
.unwrap();
assert!(existing.is_empty(), "locked jobs should be ignored");
assert!(
existing.contains(&payload.to_string()),
"locked jobs should be included in deduplication"
);
}
#[sqlx::test]