feat: setup recoverable/unrecoverable job error distinction, delete unrecoverable jobs

This commit is contained in:
2025-09-13 00:48:11 -05:00
parent 77ab71d4d5
commit bbc78131ec
2 changed files with 129 additions and 48 deletions

View File

@@ -1,10 +1,67 @@
pub mod subject; pub mod subject;
use crate::banner::BannerApi;
use crate::data::models::TargetType; use crate::data::models::TargetType;
use crate::error::Result; use crate::error::Result;
use crate::{banner::BannerApi, scraper::jobs::subject::SubjectJob};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::PgPool; use sqlx::PgPool;
use std::fmt;
/// Errors that can occur during job parsing
#[derive(Debug)]
pub enum JobParseError {
InvalidJson(serde_json::Error),
UnsupportedTargetType(TargetType),
MissingRequiredField(String),
}
impl fmt::Display for JobParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
JobParseError::InvalidJson(e) => write!(f, "Invalid JSON in job payload: {}", e),
JobParseError::UnsupportedTargetType(t) => {
write!(f, "Unsupported target type: {:?}", t)
}
JobParseError::MissingRequiredField(field) => {
write!(f, "Missing required field: {}", field)
}
}
}
}
impl std::error::Error for JobParseError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
JobParseError::InvalidJson(e) => Some(e),
_ => None,
}
}
}
/// Errors that can occur during job processing
#[derive(Debug)]
pub enum JobError {
Recoverable(anyhow::Error), // API failures, network issues
Unrecoverable(anyhow::Error), // Parse errors, corrupted data
}
impl fmt::Display for JobError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
JobError::Recoverable(e) => write!(f, "Recoverable error: {}", e),
JobError::Unrecoverable(e) => write!(f, "Unrecoverable error: {}", e),
}
}
}
impl std::error::Error for JobError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
JobError::Recoverable(e) => e.source(),
JobError::Unrecoverable(e) => e.source(),
}
}
}
/// Common trait interface for all job types /// Common trait interface for all job types
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -22,7 +79,7 @@ pub trait Job: Send + Sync {
/// Main job enum that dispatches to specific job implementations /// Main job enum that dispatches to specific job implementations
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobType { pub enum JobType {
Subject(SubjectJob), Subject(subject::SubjectJob),
} }
impl JobType { impl JobType {
@@ -30,23 +87,26 @@ impl JobType {
pub fn from_target_type_and_payload( pub fn from_target_type_and_payload(
target_type: TargetType, target_type: TargetType,
payload: serde_json::Value, payload: serde_json::Value,
) -> Result<Self> { ) -> Result<Self, JobParseError> {
match target_type { match target_type {
TargetType::Subject => { TargetType::Subject => {
let subject_payload: SubjectJob = serde_json::from_value(payload)?; let subject_job: subject::SubjectJob =
Ok(JobType::Subject(subject_payload)) serde_json::from_value(payload).map_err(JobParseError::InvalidJson)?;
Ok(JobType::Subject(subject_job))
} }
_ => Err(anyhow::anyhow!( _ => Err(JobParseError::UnsupportedTargetType(target_type)),
"Unsupported target type: {:?}",
target_type
)),
} }
} }
/// Convert to a Job trait object /// Convert to a Job trait object
pub fn as_job(self) -> Box<dyn Job> { pub fn as_job(self) -> Box<dyn Job> {
match self { match self {
JobType::Subject(payload) => Box::new(payload), JobType::Subject(job) => Box::new(job),
} }
} }
} }
/// Helper function to create a subject job
pub fn create_subject_job(subject: String) -> JobType {
JobType::Subject(subject::SubjectJob::new(subject))
}

View File

@@ -1,7 +1,7 @@
use crate::banner::{BannerApi, BannerApiError}; use crate::banner::{BannerApi, BannerApiError};
use crate::data::models::ScrapeJob; use crate::data::models::ScrapeJob;
use crate::error::Result; use crate::error::Result;
use crate::scraper::jobs::JobType; use crate::scraper::jobs::{JobError, JobType};
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -35,38 +35,58 @@ impl Worker {
Ok(Some(job)) => { Ok(Some(job)) => {
let job_id = job.id; let job_id = job.id;
debug!(worker_id = self.id, job_id = job.id, "Processing job"); debug!(worker_id = self.id, job_id = job.id, "Processing job");
if let Err(e) = self.process_job(job).await { match self.process_job(job).await {
// Check if the error is due to an invalid session Ok(()) => {
if let Some(BannerApiError::InvalidSession(_)) = debug!(worker_id = self.id, job_id, "Job completed");
e.downcast_ref::<BannerApiError>() // If successful, delete the job.
{ if let Err(delete_err) = self.delete_job(job_id).await {
warn!( error!(
worker_id = self.id, worker_id = self.id,
job_id, "Invalid session detected. Forcing session refresh." job_id,
); ?delete_err,
} else { "Failed to delete job"
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job"); );
}
} }
Err(JobError::Recoverable(e)) => {
// Check if the error is due to an invalid session
if let Some(BannerApiError::InvalidSession(_)) =
e.downcast_ref::<BannerApiError>()
{
warn!(
worker_id = self.id,
job_id, "Invalid session detected. Forcing session refresh."
);
} else {
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
}
// Unlock the job so it can be retried // Unlock the job so it can be retried
if let Err(unlock_err) = self.unlock_job(job_id).await { if let Err(unlock_err) = self.unlock_job(job_id).await {
error!( error!(
worker_id = self.id, worker_id = self.id,
job_id, job_id,
?unlock_err, ?unlock_err,
"Failed to unlock job" "Failed to unlock job"
); );
}
} }
} else { Err(JobError::Unrecoverable(e)) => {
debug!(worker_id = self.id, job_id, "Job completed");
// If successful, delete the job.
if let Err(delete_err) = self.delete_job(job_id).await {
error!( error!(
worker_id = self.id, worker_id = self.id,
job_id, job_id,
?delete_err, error = ?e,
"Failed to delete job" "Job corrupted, deleting"
); );
// Parse errors are unrecoverable - delete the job
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete corrupted job"
);
}
} }
} }
} }
@@ -109,29 +129,30 @@ impl Worker {
Ok(job) Ok(job)
} }
async fn process_job(&self, job: ScrapeJob) -> Result<()> { async fn process_job(&self, job: ScrapeJob) -> Result<(), JobError> {
// Convert the database job to our job type // Convert the database job to our job type
let job_type = JobType::from_target_type_and_payload( let job_type = JobType::from_target_type_and_payload(job.target_type, job.target_payload)
job.target_type, .map_err(|e| JobError::Unrecoverable(anyhow::anyhow!(e)))?; // Parse errors are unrecoverable
job.target_payload,
)?;
// Get the job implementation // Get the job implementation
let job_impl = job_type.as_job(); let job_impl = job_type.as_job();
debug!( debug!(
worker_id = self.id, worker_id = self.id,
job_id = job.id, job_id = job.id,
description = job_impl.description(), description = job_impl.description(),
"Processing job" "Processing job"
); );
// Process the job // Process the job - API errors are recoverable
job_impl.process(&self.banner_api, &self.db_pool).await job_impl
.process(&self.banner_api, &self.db_pool)
.await
.map_err(JobError::Recoverable)?;
Ok(())
} }
async fn delete_job(&self, job_id: i32) -> Result<()> { async fn delete_job(&self, job_id: i32) -> Result<()> {
sqlx::query("DELETE FROM scrape_jobs WHERE id = $1") sqlx::query("DELETE FROM scrape_jobs WHERE id = $1")
.bind(job_id) .bind(job_id)