use crate::banner::{BannerApi, BannerApiError}; use crate::data::models::{ScrapeJob, ScrapeJobStatus, UpsertCounts}; use crate::data::scrape_jobs; use crate::error::Result; use crate::scraper::jobs::{JobError, JobType}; use crate::web::ws::ScrapeJobEvent; use chrono::{DateTime, Utc}; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; use tokio::time; use tracing::{Instrument, debug, error, info, trace, warn}; /// Maximum time a single job is allowed to run before being considered stuck. const JOB_TIMEOUT: Duration = Duration::from_secs(5 * 60); /// A single worker instance. /// /// Each worker runs in its own asynchronous task and continuously polls the /// database for scrape jobs to execute. pub struct Worker { id: usize, // For logging purposes db_pool: PgPool, banner_api: Arc, job_events_tx: broadcast::Sender, } impl Worker { pub fn new( id: usize, db_pool: PgPool, banner_api: Arc, job_events_tx: broadcast::Sender, ) -> Self { Self { id, db_pool, banner_api, job_events_tx, } } /// Runs the worker's main loop. pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) { info!(worker_id = self.id, "Worker started"); loop { // Fetch and lock a job, racing against shutdown signal let job = tokio::select! { _ = shutdown_rx.recv() => { info!(worker_id = self.id, "Worker received shutdown signal, exiting gracefully"); break; } result = self.fetch_and_lock_job() => { match result { Ok(Some(job)) => job, Ok(None) => { trace!(worker_id = self.id, "No jobs available, waiting"); time::sleep(Duration::from_secs(5)).await; continue; } Err(e) => { warn!(worker_id = self.id, error = ?e, "Failed to fetch job, waiting"); time::sleep(Duration::from_secs(10)).await; continue; } } } }; let job_id = job.id; let retry_count = job.retry_count; let max_retries = job.max_retries; let target_type = job.target_type; let payload = job.target_payload.clone(); let priority = job.priority; let queued_at = job.queued_at; let started_at = Utc::now(); let start = std::time::Instant::now(); // Emit JobLocked event let locked_at = started_at.to_rfc3339(); debug!(job_id, "Emitting JobLocked event"); let _ = self.job_events_tx.send(ScrapeJobEvent::JobLocked { id: job_id, locked_at, status: ScrapeJobStatus::Processing, }); // Process the job, racing against shutdown signal and timeout let process_result = tokio::select! { _ = shutdown_rx.recv() => { self.handle_shutdown_during_processing(job_id).await; break; } result = async { match time::timeout(JOB_TIMEOUT, self.process_job(job)).await { Ok(result) => result, Err(_elapsed) => { Err(JobError::Recoverable(anyhow::anyhow!( "job timed out after {}s", JOB_TIMEOUT.as_secs() ))) } } } => result }; let duration = start.elapsed(); // Handle the job processing result self.handle_job_result( job_id, retry_count, max_retries, process_result, duration, target_type, payload, priority, queued_at, started_at, ) .await; } } /// Atomically fetches a job from the queue, locking it for processing. /// /// This uses a `FOR UPDATE SKIP LOCKED` query to ensure that multiple /// workers can poll the queue concurrently without conflicts. async fn fetch_and_lock_job(&self) -> Result> { scrape_jobs::fetch_and_lock_job(&self.db_pool).await } async fn process_job(&self, job: ScrapeJob) -> Result { // Convert the database job to our job type let job_type = JobType::from_target_type_and_payload(job.target_type, job.target_payload) .map_err(|e| JobError::Unrecoverable(anyhow::anyhow!(e)))?; // Parse errors are unrecoverable // Get the job implementation let job_impl = job_type.boxed(); // Create span with job context let span = tracing::info_span!( "process_job", job_id = job.id, job_type = job_impl.description() ); async move { debug!( worker_id = self.id, job_id = job.id, description = job_impl.description(), "Processing job" ); // Process the job - API errors are recoverable job_impl .process(&self.banner_api, &self.db_pool) .await .map_err(JobError::Recoverable) } .instrument(span) .await } async fn delete_job(&self, job_id: i32) -> Result<()> { scrape_jobs::delete_job(job_id, &self.db_pool).await } async fn unlock_job(&self, job_id: i32) -> Result<()> { scrape_jobs::unlock_job(job_id, &self.db_pool).await } async fn unlock_and_increment_retry( &self, job_id: i32, max_retries: i32, ) -> Result>> { scrape_jobs::unlock_and_increment_retry(job_id, max_retries, &self.db_pool).await } /// Handle shutdown signal received during job processing async fn handle_shutdown_during_processing(&self, job_id: i32) { info!( worker_id = self.id, job_id, "Shutdown received during job processing" ); if let Err(e) = self.unlock_job(job_id).await { warn!( worker_id = self.id, job_id, error = ?e, "Failed to unlock job during shutdown" ); } else { debug!(worker_id = self.id, job_id, "Job unlocked during shutdown"); } info!(worker_id = self.id, "Worker exiting gracefully"); } /// Handle the result of job processing #[allow(clippy::too_many_arguments)] async fn handle_job_result( &self, job_id: i32, retry_count: i32, max_retries: i32, result: Result, duration: std::time::Duration, target_type: crate::data::models::TargetType, payload: serde_json::Value, priority: crate::data::models::ScrapePriority, queued_at: DateTime, started_at: DateTime, ) { let duration_ms = duration.as_millis() as i32; match result { Ok(counts) => { debug!( worker_id = self.id, job_id, duration_ms = duration.as_millis(), courses_fetched = counts.courses_fetched, courses_changed = counts.courses_changed, courses_unchanged = counts.courses_unchanged, "Job completed successfully" ); // Log the result if let Err(e) = scrape_jobs::insert_job_result( target_type, payload, priority, queued_at, started_at, duration_ms, true, None, retry_count, Some(&counts), &self.db_pool, ) .await { error!(worker_id = self.id, job_id, error = ?e, "Failed to insert job result"); } if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job"); } debug!(job_id, "Emitting JobCompleted event"); let _ = self .job_events_tx .send(ScrapeJobEvent::JobCompleted { id: job_id }); } Err(JobError::Recoverable(e)) => { self.handle_recoverable_error( job_id, retry_count, max_retries, e, duration, target_type, payload, priority, queued_at, started_at, ) .await; } Err(JobError::Unrecoverable(e)) => { // Log the failed result let err_msg = format!("{e:#}"); if let Err(log_err) = scrape_jobs::insert_job_result( target_type, payload, priority, queued_at, started_at, duration_ms, false, Some(&err_msg), retry_count, None, &self.db_pool, ) .await { error!(worker_id = self.id, job_id, error = ?log_err, "Failed to insert job result"); } error!( worker_id = self.id, job_id, duration_ms = duration.as_millis(), error = ?e, "Job corrupted, deleting" ); if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job"); } debug!(job_id, "Emitting JobDeleted event"); let _ = self .job_events_tx .send(ScrapeJobEvent::JobDeleted { id: job_id }); } } } /// Handle recoverable errors by logging appropriately and unlocking the job #[allow(clippy::too_many_arguments)] async fn handle_recoverable_error( &self, job_id: i32, retry_count: i32, max_retries: i32, e: anyhow::Error, duration: std::time::Duration, target_type: crate::data::models::TargetType, payload: serde_json::Value, priority: crate::data::models::ScrapePriority, queued_at: DateTime, started_at: DateTime, ) { let next_attempt = retry_count.saturating_add(1); let remaining_retries = max_retries.saturating_sub(next_attempt); // Log the error appropriately based on type if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::() { warn!( worker_id = self.id, job_id, duration_ms = duration.as_millis(), retry_attempt = next_attempt, max_retries = max_retries, remaining_retries = remaining_retries, "Invalid session detected, will retry" ); } else { error!( worker_id = self.id, job_id, duration_ms = duration.as_millis(), retry_attempt = next_attempt, max_retries = max_retries, remaining_retries = remaining_retries, error = ?e, "Failed to process job, will retry" ); } // Atomically unlock and increment retry count, checking if retry is allowed match self.unlock_and_increment_retry(job_id, max_retries).await { Ok(Some(new_queued_at)) => { debug!( worker_id = self.id, job_id, retry_attempt = next_attempt, remaining_retries = remaining_retries, "Job unlocked for retry" ); debug!(job_id, "Emitting JobRetried event"); let _ = self.job_events_tx.send(ScrapeJobEvent::JobRetried { id: job_id, retry_count: next_attempt, queued_at: new_queued_at.to_rfc3339(), status: ScrapeJobStatus::Pending, }); // Don't log a result yet — the job will be retried } Ok(None) => { // Max retries exceeded — log final failure result let duration_ms = duration.as_millis() as i32; let err_msg = format!("{e:#}"); if let Err(log_err) = scrape_jobs::insert_job_result( target_type, payload, priority, queued_at, started_at, duration_ms, false, Some(&err_msg), next_attempt, None, &self.db_pool, ) .await { error!(worker_id = self.id, job_id, error = ?log_err, "Failed to insert job result"); } error!( worker_id = self.id, job_id, duration_ms = duration.as_millis(), retry_count = next_attempt, max_retries = max_retries, error = ?e, "Job failed permanently (max retries exceeded), deleting" ); if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job"); } debug!(job_id, "Emitting JobExhausted and JobDeleted events"); let _ = self .job_events_tx .send(ScrapeJobEvent::JobExhausted { id: job_id }); let _ = self .job_events_tx .send(ScrapeJobEvent::JobDeleted { id: job_id }); } Err(e) => { error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count"); } } } }