diff --git a/src/scraper/jobs/mod.rs b/src/scraper/jobs/mod.rs index 4537f11..ca64c38 100644 --- a/src/scraper/jobs/mod.rs +++ b/src/scraper/jobs/mod.rs @@ -1,10 +1,67 @@ pub mod subject; +use crate::banner::BannerApi; use crate::data::models::TargetType; use crate::error::Result; -use crate::{banner::BannerApi, scraper::jobs::subject::SubjectJob}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::fmt; + +/// Errors that can occur during job parsing +#[derive(Debug)] +pub enum JobParseError { + InvalidJson(serde_json::Error), + UnsupportedTargetType(TargetType), + MissingRequiredField(String), +} + +impl fmt::Display for JobParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + JobParseError::InvalidJson(e) => write!(f, "Invalid JSON in job payload: {}", e), + JobParseError::UnsupportedTargetType(t) => { + write!(f, "Unsupported target type: {:?}", t) + } + JobParseError::MissingRequiredField(field) => { + write!(f, "Missing required field: {}", field) + } + } + } +} + +impl std::error::Error for JobParseError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + JobParseError::InvalidJson(e) => Some(e), + _ => None, + } + } +} + +/// Errors that can occur during job processing +#[derive(Debug)] +pub enum JobError { + Recoverable(anyhow::Error), // API failures, network issues + Unrecoverable(anyhow::Error), // Parse errors, corrupted data +} + +impl fmt::Display for JobError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + JobError::Recoverable(e) => write!(f, "Recoverable error: {}", e), + JobError::Unrecoverable(e) => write!(f, "Unrecoverable error: {}", e), + } + } +} + +impl std::error::Error for JobError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + JobError::Recoverable(e) => e.source(), + JobError::Unrecoverable(e) => e.source(), + } + } +} /// Common trait interface for all job types #[async_trait::async_trait] @@ -22,7 +79,7 @@ pub trait Job: Send + Sync { /// Main job enum that dispatches to specific job implementations #[derive(Debug, Clone, Serialize, Deserialize)] pub enum JobType { - Subject(SubjectJob), + Subject(subject::SubjectJob), } impl JobType { @@ -30,23 +87,26 @@ impl JobType { pub fn from_target_type_and_payload( target_type: TargetType, payload: serde_json::Value, - ) -> Result { + ) -> Result { match target_type { TargetType::Subject => { - let subject_payload: SubjectJob = serde_json::from_value(payload)?; - Ok(JobType::Subject(subject_payload)) + let subject_job: subject::SubjectJob = + serde_json::from_value(payload).map_err(JobParseError::InvalidJson)?; + Ok(JobType::Subject(subject_job)) } - _ => Err(anyhow::anyhow!( - "Unsupported target type: {:?}", - target_type - )), + _ => Err(JobParseError::UnsupportedTargetType(target_type)), } } /// Convert to a Job trait object pub fn as_job(self) -> Box { match self { - JobType::Subject(payload) => Box::new(payload), + JobType::Subject(job) => Box::new(job), } } } + +/// Helper function to create a subject job +pub fn create_subject_job(subject: String) -> JobType { + JobType::Subject(subject::SubjectJob::new(subject)) +} diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index 60e8f4f..8e8654d 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -1,7 +1,7 @@ use crate::banner::{BannerApi, BannerApiError}; use crate::data::models::ScrapeJob; use crate::error::Result; -use crate::scraper::jobs::JobType; +use crate::scraper::jobs::{JobError, JobType}; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; @@ -35,38 +35,58 @@ impl Worker { Ok(Some(job)) => { let job_id = job.id; debug!(worker_id = self.id, job_id = job.id, "Processing job"); - if let Err(e) = self.process_job(job).await { - // Check if the error is due to an invalid session - if let Some(BannerApiError::InvalidSession(_)) = - e.downcast_ref::() - { - warn!( - worker_id = self.id, - job_id, "Invalid session detected. Forcing session refresh." - ); - } else { - error!(worker_id = self.id, job_id, error = ?e, "Failed to process job"); + match self.process_job(job).await { + Ok(()) => { + debug!(worker_id = self.id, job_id, "Job completed"); + // If successful, delete the job. + if let Err(delete_err) = self.delete_job(job_id).await { + error!( + worker_id = self.id, + job_id, + ?delete_err, + "Failed to delete job" + ); + } } + Err(JobError::Recoverable(e)) => { + // Check if the error is due to an invalid session + if let Some(BannerApiError::InvalidSession(_)) = + e.downcast_ref::() + { + warn!( + worker_id = self.id, + job_id, "Invalid session detected. Forcing session refresh." + ); + } else { + error!(worker_id = self.id, job_id, error = ?e, "Failed to process job"); + } - // Unlock the job so it can be retried - if let Err(unlock_err) = self.unlock_job(job_id).await { - error!( - worker_id = self.id, - job_id, - ?unlock_err, - "Failed to unlock job" - ); + // Unlock the job so it can be retried + if let Err(unlock_err) = self.unlock_job(job_id).await { + error!( + worker_id = self.id, + job_id, + ?unlock_err, + "Failed to unlock job" + ); + } } - } else { - debug!(worker_id = self.id, job_id, "Job completed"); - // If successful, delete the job. - if let Err(delete_err) = self.delete_job(job_id).await { + Err(JobError::Unrecoverable(e)) => { error!( worker_id = self.id, job_id, - ?delete_err, - "Failed to delete job" + error = ?e, + "Job corrupted, deleting" ); + // Parse errors are unrecoverable - delete the job + if let Err(delete_err) = self.delete_job(job_id).await { + error!( + worker_id = self.id, + job_id, + ?delete_err, + "Failed to delete corrupted job" + ); + } } } } @@ -109,29 +129,30 @@ impl Worker { Ok(job) } - async fn process_job(&self, job: ScrapeJob) -> Result<()> { + async fn process_job(&self, job: ScrapeJob) -> Result<(), JobError> { // Convert the database job to our job type - let job_type = JobType::from_target_type_and_payload( - job.target_type, - job.target_payload, - )?; - + let job_type = JobType::from_target_type_and_payload(job.target_type, job.target_payload) + .map_err(|e| JobError::Unrecoverable(anyhow::anyhow!(e)))?; // Parse errors are unrecoverable + // Get the job implementation let job_impl = job_type.as_job(); - + debug!( worker_id = self.id, job_id = job.id, description = job_impl.description(), "Processing job" ); - - // Process the job - job_impl.process(&self.banner_api, &self.db_pool).await + + // Process the job - API errors are recoverable + job_impl + .process(&self.banner_api, &self.db_pool) + .await + .map_err(JobError::Recoverable)?; + + Ok(()) } - - async fn delete_job(&self, job_id: i32) -> Result<()> { sqlx::query("DELETE FROM scrape_jobs WHERE id = $1") .bind(job_id)