mirror of
https://github.com/Xevion/banner.git
synced 2025-12-06 03:14:24 -06:00
feat: implement comprehensive retry mechanism and improve observability
Add retry tracking to scrape jobs with configurable max retries (default 5), implement automatic database migrations on startup, and significantly reduce logging noise from infrastructure layers. Enhanced tracing with structured spans for better debugging while keeping output readable by suppressing verbose trace logs from rate limiters and session management. Improved error handling with detailed retry context and proper session cookie validation.
This commit is contained in:
3
migrations/20251103093649_add_retry_tracking.sql
Normal file
3
migrations/20251103093649_add_retry_tracking.sql
Normal file
@@ -0,0 +1,3 @@
|
||||
-- Add retry tracking columns to scrape_jobs table
|
||||
ALTER TABLE scrape_jobs ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0);
|
||||
ALTER TABLE scrape_jobs ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 5 CHECK (max_retries >= 0);
|
||||
@@ -62,6 +62,14 @@ impl App {
|
||||
"database pool established"
|
||||
);
|
||||
|
||||
// Run database migrations
|
||||
info!("Running database migrations...");
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&db_pool)
|
||||
.await
|
||||
.expect("Failed to run database migrations");
|
||||
info!("Database migrations completed successfully");
|
||||
|
||||
// Create BannerApi and AppState
|
||||
let banner_api = BannerApi::new_with_config(
|
||||
config.banner_base_url.clone(),
|
||||
|
||||
@@ -152,6 +152,13 @@ impl BannerApi {
|
||||
}
|
||||
|
||||
/// Performs a course search and handles common response processing.
|
||||
#[tracing::instrument(
|
||||
skip(self, query),
|
||||
fields(
|
||||
term = %term,
|
||||
subject = %query.get_subject().unwrap_or(&"all".to_string())
|
||||
)
|
||||
)]
|
||||
async fn perform_search(
|
||||
&self,
|
||||
term: &str,
|
||||
@@ -318,12 +325,6 @@ impl BannerApi {
|
||||
sort: &str,
|
||||
sort_descending: bool,
|
||||
) -> Result<SearchResult, BannerApiError> {
|
||||
debug!(
|
||||
term = term,
|
||||
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
|
||||
max_results = query.get_max_results(),
|
||||
"Starting course search"
|
||||
);
|
||||
self.perform_search(term, query, sort, sort_descending)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -3,10 +3,13 @@
|
||||
use http::Extensions;
|
||||
use reqwest::{Request, Response};
|
||||
use reqwest_middleware::{Middleware, Next};
|
||||
use tracing::{trace, warn};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
pub struct TransparentMiddleware;
|
||||
|
||||
/// Threshold for logging slow requests at DEBUG level (in milliseconds)
|
||||
const SLOW_REQUEST_THRESHOLD_MS: u128 = 1000;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Middleware for TransparentMiddleware {
|
||||
async fn handle(
|
||||
@@ -15,33 +18,56 @@ impl Middleware for TransparentMiddleware {
|
||||
extensions: &mut Extensions,
|
||||
next: Next<'_>,
|
||||
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
||||
trace!(
|
||||
domain = req.url().domain(),
|
||||
headers = ?req.headers(),
|
||||
"{method} {path}",
|
||||
method = req.method().to_string(),
|
||||
path = req.url().path(),
|
||||
);
|
||||
let method = req.method().to_string();
|
||||
let path = req.url().path().to_string();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let response_result = next.run(req, extensions).await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
match response_result {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
trace!(
|
||||
"{code} {reason} {path}",
|
||||
code = response.status().as_u16(),
|
||||
reason = response.status().canonical_reason().unwrap_or("??"),
|
||||
path = response.url().path(),
|
||||
);
|
||||
let duration_ms = duration.as_millis();
|
||||
if duration_ms >= SLOW_REQUEST_THRESHOLD_MS {
|
||||
debug!(
|
||||
method = method,
|
||||
path = path,
|
||||
status = response.status().as_u16(),
|
||||
duration_ms = duration_ms,
|
||||
"Request completed (slow)"
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
method = method,
|
||||
path = path,
|
||||
status = response.status().as_u16(),
|
||||
duration_ms = duration_ms,
|
||||
"Request completed"
|
||||
);
|
||||
}
|
||||
Ok(response)
|
||||
} else {
|
||||
let e = response.error_for_status_ref().unwrap_err();
|
||||
warn!(error = ?e, "Request failed (server)");
|
||||
warn!(
|
||||
method = method,
|
||||
path = path,
|
||||
error = ?e,
|
||||
status = response.status().as_u16(),
|
||||
duration_ms = duration.as_millis(),
|
||||
"Request failed"
|
||||
);
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(error = ?error, "Request failed (middleware)");
|
||||
warn!(
|
||||
method = method,
|
||||
path = path,
|
||||
error = ?error,
|
||||
duration_ms = duration.as_millis(),
|
||||
"Request failed"
|
||||
);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::banner::rate_limiter::{RequestType, SharedRateLimiter};
|
||||
use http::Extensions;
|
||||
use reqwest::{Request, Response};
|
||||
use reqwest_middleware::{Middleware, Next};
|
||||
use tracing::{debug, trace, warn};
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
/// Middleware that enforces rate limiting based on request URL patterns
|
||||
@@ -18,6 +18,16 @@ impl RateLimitMiddleware {
|
||||
Self { rate_limiter }
|
||||
}
|
||||
|
||||
/// Returns a human-readable description of the rate limit for a request type
|
||||
fn get_rate_limit_description(request_type: RequestType) -> &'static str {
|
||||
match request_type {
|
||||
RequestType::Session => "6 rpm (~10s interval)",
|
||||
RequestType::Search => "30 rpm (~2s interval)",
|
||||
RequestType::Metadata => "20 rpm (~3s interval)",
|
||||
RequestType::Reset => "10 rpm (~6s interval)",
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the request type based on the URL path
|
||||
fn get_request_type(url: &Url) -> RequestType {
|
||||
let path = url.path();
|
||||
@@ -53,49 +63,22 @@ impl Middleware for RateLimitMiddleware {
|
||||
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
||||
let request_type = Self::get_request_type(req.url());
|
||||
|
||||
trace!(
|
||||
url = %req.url(),
|
||||
request_type = ?request_type,
|
||||
"Rate limiting request"
|
||||
);
|
||||
|
||||
// Wait for permission to make the request
|
||||
let start = std::time::Instant::now();
|
||||
self.rate_limiter.wait_for_permission(request_type).await;
|
||||
let wait_duration = start.elapsed();
|
||||
|
||||
trace!(
|
||||
url = %req.url(),
|
||||
request_type = ?request_type,
|
||||
"Rate limit permission granted, making request"
|
||||
);
|
||||
// Only log if rate limiting caused significant delay (>= 500ms)
|
||||
if wait_duration.as_millis() >= 500 {
|
||||
let limit_desc = Self::get_rate_limit_description(request_type);
|
||||
debug!(
|
||||
request_type = ?request_type,
|
||||
wait_ms = wait_duration.as_millis(),
|
||||
rate_limit = limit_desc,
|
||||
"Rate limit caused delay"
|
||||
);
|
||||
}
|
||||
|
||||
// Make the actual request
|
||||
let response_result = next.run(req, extensions).await;
|
||||
|
||||
match response_result {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
trace!(
|
||||
url = %response.url(),
|
||||
status = response.status().as_u16(),
|
||||
"Request completed successfully"
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
url = %response.url(),
|
||||
status = response.status().as_u16(),
|
||||
"Request completed with error status"
|
||||
);
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(
|
||||
url = ?error.url(),
|
||||
error = ?error,
|
||||
"Request failed"
|
||||
);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
next.run(req, extensions).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use governor::{
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
/// Different types of Banner API requests with different rate limits
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
@@ -99,12 +98,8 @@ impl BannerRateLimiter {
|
||||
RequestType::Reset => &self.reset_limiter,
|
||||
};
|
||||
|
||||
trace!(request_type = ?request_type, "Waiting for rate limit permission");
|
||||
|
||||
// Wait until we can make the request
|
||||
// Wait until we can make the request (logging handled by middleware)
|
||||
limiter.until_ready().await;
|
||||
|
||||
trace!(request_type = ?request_type, "Rate limit permission granted");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,6 @@ impl BannerSession {
|
||||
|
||||
/// Updates the last activity timestamp
|
||||
pub fn touch(&mut self) {
|
||||
trace!(id = self.unique_session_id, "Session was used");
|
||||
self.last_activity = Some(Instant::now());
|
||||
}
|
||||
|
||||
@@ -162,7 +161,7 @@ impl TermPool {
|
||||
async fn release(&self, session: BannerSession) {
|
||||
let id = session.unique_session_id.clone();
|
||||
if session.is_expired() {
|
||||
trace!(id = id, "Session is now expired, dropping.");
|
||||
debug!(id = id, "Session expired, dropping");
|
||||
// Wake up a waiter, as it might need to create a new session
|
||||
// if this was the last one.
|
||||
self.notifier.notify_one();
|
||||
@@ -171,10 +170,8 @@ impl TermPool {
|
||||
|
||||
let mut queue = self.sessions.lock().await;
|
||||
queue.push_back(session);
|
||||
let queue_size = queue.len();
|
||||
drop(queue); // Release lock before notifying
|
||||
|
||||
trace!(id = id, queue_size, "Session returned to pool");
|
||||
self.notifier.notify_one();
|
||||
}
|
||||
}
|
||||
@@ -204,22 +201,21 @@ impl SessionPool {
|
||||
.or_insert_with(|| Arc::new(TermPool::new()))
|
||||
.clone();
|
||||
|
||||
let start = Instant::now();
|
||||
let mut waited_for_creation = false;
|
||||
|
||||
loop {
|
||||
// Fast path: Try to get an existing, non-expired session.
|
||||
{
|
||||
let mut queue = term_pool.sessions.lock().await;
|
||||
if let Some(session) = queue.pop_front() {
|
||||
if !session.is_expired() {
|
||||
trace!(id = session.unique_session_id, "Reusing session from pool");
|
||||
return Ok(PooledSession {
|
||||
session: Some(session),
|
||||
pool: Arc::clone(&term_pool),
|
||||
});
|
||||
} else {
|
||||
trace!(
|
||||
id = session.unique_session_id,
|
||||
"Popped an expired session, discarding."
|
||||
);
|
||||
debug!(id = session.unique_session_id, "Discarded expired session");
|
||||
}
|
||||
}
|
||||
} // MutexGuard is dropped, lock is released.
|
||||
@@ -229,7 +225,10 @@ impl SessionPool {
|
||||
if *is_creating_guard {
|
||||
// Another task is already creating a session. Release the lock and wait.
|
||||
drop(is_creating_guard);
|
||||
trace!("Another task is creating a session, waiting for notification...");
|
||||
if !waited_for_creation {
|
||||
trace!("Waiting for another task to create session");
|
||||
waited_for_creation = true;
|
||||
}
|
||||
term_pool.notifier.notified().await;
|
||||
// Loop back to the top to try the fast path again.
|
||||
continue;
|
||||
@@ -240,12 +239,11 @@ impl SessionPool {
|
||||
drop(is_creating_guard);
|
||||
|
||||
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
|
||||
trace!("Pool empty, racing notifier vs rate limiter...");
|
||||
trace!("Pool empty, creating new session");
|
||||
tokio::select! {
|
||||
_ = term_pool.notifier.notified() => {
|
||||
// A session was returned while we were waiting!
|
||||
// We are no longer the creator. Reset the flag and loop to race for the new session.
|
||||
trace!("Notified that a session was returned. Looping to retry.");
|
||||
let mut guard = term_pool.is_creating.lock().await;
|
||||
*guard = false;
|
||||
drop(guard);
|
||||
@@ -253,7 +251,6 @@ impl SessionPool {
|
||||
}
|
||||
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
|
||||
// The rate limit has elapsed. It's our job to create the session.
|
||||
trace!("Rate limiter ready. Proceeding to create a new session.");
|
||||
let new_session_result = self.create_session(&term).await;
|
||||
|
||||
// After creation, we are no longer the creator. Reset the flag
|
||||
@@ -265,7 +262,12 @@ impl SessionPool {
|
||||
|
||||
match new_session_result {
|
||||
Ok(new_session) => {
|
||||
debug!(id = new_session.unique_session_id, "Successfully created new session");
|
||||
let elapsed = start.elapsed();
|
||||
debug!(
|
||||
id = new_session.unique_session_id,
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"Created new session"
|
||||
);
|
||||
return Ok(PooledSession {
|
||||
session: Some(new_session),
|
||||
pool: term_pool,
|
||||
@@ -298,8 +300,12 @@ impl SessionPool {
|
||||
.get_all("Set-Cookie")
|
||||
.iter()
|
||||
.filter_map(|header_value| {
|
||||
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) {
|
||||
Some((cookie.name().to_string(), cookie.value().to_string()))
|
||||
if let Ok(cookie_str) = header_value.to_str() {
|
||||
if let Ok(cookie) = Cookie::parse(cookie_str) {
|
||||
Some((cookie.name().to_string(), cookie.value().to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -310,16 +316,12 @@ impl SessionPool {
|
||||
return Err(anyhow::anyhow!("Failed to get cookies"));
|
||||
}
|
||||
|
||||
let jsessionid = cookies.get("JSESSIONID").unwrap();
|
||||
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
|
||||
let jsessionid = cookies.get("JSESSIONID")
|
||||
.ok_or_else(|| anyhow::anyhow!("JSESSIONID cookie missing after validation"))?;
|
||||
let ssb_cookie = cookies.get("SSB_COOKIE")
|
||||
.ok_or_else(|| anyhow::anyhow!("SSB_COOKIE cookie missing after validation"))?;
|
||||
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
|
||||
|
||||
trace!(
|
||||
jsessionid = jsessionid,
|
||||
ssb_cookie = ssb_cookie,
|
||||
"New session cookies acquired"
|
||||
);
|
||||
|
||||
self.http
|
||||
.get(format!("{}/selfServiceMenu/data", self.base_url))
|
||||
.header("Cookie", &cookie_header)
|
||||
@@ -435,8 +437,15 @@ impl SessionPool {
|
||||
|
||||
let redirect: RedirectResponse = response.json().await?;
|
||||
|
||||
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string();
|
||||
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap();
|
||||
let base_url_path = self.base_url.parse::<Url>()
|
||||
.context("Failed to parse base URL")?
|
||||
.path()
|
||||
.to_string();
|
||||
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path)
|
||||
.ok_or_else(|| anyhow::anyhow!(
|
||||
"Redirect URL '{}' does not start with expected prefix '{}'",
|
||||
redirect.fwd_url, base_url_path
|
||||
))?;
|
||||
|
||||
// Follow the redirect
|
||||
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
|
||||
@@ -454,7 +463,6 @@ impl SessionPool {
|
||||
));
|
||||
}
|
||||
|
||||
trace!(term = term, "successfully selected term");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,4 +72,8 @@ pub struct ScrapeJob {
|
||||
pub execute_at: DateTime<Utc>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub locked_at: Option<DateTime<Utc>>,
|
||||
/// Number of retry attempts for this job (non-negative, enforced by CHECK constraint)
|
||||
pub retry_count: i32,
|
||||
/// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint)
|
||||
pub max_retries: i32,
|
||||
}
|
||||
|
||||
@@ -7,10 +7,13 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
||||
/// Configure and initialize logging for the application
|
||||
pub fn setup_logging(config: &Config, tracing_format: TracingFormat) {
|
||||
// Configure logging based on config
|
||||
// Note: Even when base_level is trace or debug, we suppress trace logs from noisy
|
||||
// infrastructure modules to keep output readable. These modules use debug for important
|
||||
// events and trace only for very detailed debugging.
|
||||
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||
let base_level = &config.log_level;
|
||||
EnvFilter::new(format!(
|
||||
"warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn",
|
||||
"warn,banner={},banner::rate_limiter=warn,banner::session=debug,banner::rate_limit_middleware=warn,banner::middleware=debug",
|
||||
base_level
|
||||
))
|
||||
});
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::data::models::TargetType;
|
||||
use crate::error::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
use tracing::{debug, info, trace};
|
||||
use tracing::{debug, info};
|
||||
|
||||
/// Job implementation for scraping subject data
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -24,9 +24,9 @@ impl Job for SubjectJob {
|
||||
TargetType::Subject
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))]
|
||||
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> {
|
||||
let subject_code = &self.subject;
|
||||
debug!(subject = subject_code, "Processing subject job");
|
||||
|
||||
// Get the current term
|
||||
let term = Term::get_current().inner().to_string();
|
||||
@@ -85,9 +85,7 @@ impl SubjectJob {
|
||||
.bind(chrono::Utc::now())
|
||||
.execute(db_pool)
|
||||
.await
|
||||
.map(|result| {
|
||||
trace!(subject = course.subject, crn = course.course_reference_number, result = ?result, "Course upserted");
|
||||
})
|
||||
.map(|_| ())
|
||||
.map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Periodically analyzes data and enqueues prioritized scrape jobs.
|
||||
pub struct Scheduler {
|
||||
@@ -99,6 +99,7 @@ impl Scheduler {
|
||||
/// 3. Create jobs only for subjects that don't have pending jobs
|
||||
///
|
||||
/// This is a static method (not &self) to allow it to be called from spawned tasks.
|
||||
#[tracing::instrument(skip_all, fields(term))]
|
||||
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
|
||||
// For now, we will implement a simple baseline scheduling strategy:
|
||||
// 1. Get a list of all subjects from the Banner API.
|
||||
@@ -106,6 +107,7 @@ impl Scheduler {
|
||||
// 3. Create new jobs only for subjects that don't have existing jobs.
|
||||
let term = Term::get_current().inner().to_string();
|
||||
|
||||
tracing::Span::current().record("term", term.as_str());
|
||||
debug!(term = term, "Enqueuing subject jobs");
|
||||
|
||||
let subjects = banner_api.get_subjects("", &term, 1, 500).await?;
|
||||
@@ -137,6 +139,7 @@ impl Scheduler {
|
||||
.collect();
|
||||
|
||||
// Filter out subjects that already have jobs and prepare new jobs
|
||||
let mut skipped_count = 0;
|
||||
let new_jobs: Vec<_> = subjects
|
||||
.into_iter()
|
||||
.filter_map(|subject| {
|
||||
@@ -145,7 +148,7 @@ impl Scheduler {
|
||||
let payload_str = payload.to_string();
|
||||
|
||||
if existing_payloads.contains(&payload_str) {
|
||||
trace!(subject = subject.code, "Job already exists, skipping");
|
||||
skipped_count += 1;
|
||||
None
|
||||
} else {
|
||||
Some((payload, subject.code))
|
||||
@@ -153,6 +156,10 @@ impl Scheduler {
|
||||
})
|
||||
.collect();
|
||||
|
||||
if skipped_count > 0 {
|
||||
debug!(count = skipped_count, "Skipped subjects with existing jobs");
|
||||
}
|
||||
|
||||
// Insert all new jobs in a single batch
|
||||
if !new_jobs.is_empty() {
|
||||
let now = chrono::Utc::now();
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
|
||||
/// A single worker instance.
|
||||
///
|
||||
@@ -57,7 +57,9 @@ impl Worker {
|
||||
};
|
||||
|
||||
let job_id = job.id;
|
||||
debug!(worker_id = self.id, job_id, "Processing job");
|
||||
let retry_count = job.retry_count;
|
||||
let max_retries = job.max_retries;
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Process the job, racing against shutdown signal
|
||||
let process_result = tokio::select! {
|
||||
@@ -68,8 +70,10 @@ impl Worker {
|
||||
result = self.process_job(job) => result
|
||||
};
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
// Handle the job processing result
|
||||
self.handle_job_result(job_id, process_result).await;
|
||||
self.handle_job_result(job_id, retry_count, max_retries, process_result, duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,20 +110,31 @@ impl Worker {
|
||||
// Get the job implementation
|
||||
let job_impl = job_type.boxed();
|
||||
|
||||
debug!(
|
||||
worker_id = self.id,
|
||||
// Create span with job context
|
||||
let span = tracing::debug_span!(
|
||||
"process_job",
|
||||
job_id = job.id,
|
||||
description = job_impl.description(),
|
||||
"Processing job"
|
||||
job_type = job_impl.description()
|
||||
);
|
||||
|
||||
// Process the job - API errors are recoverable
|
||||
job_impl
|
||||
.process(&self.banner_api, &self.db_pool)
|
||||
.await
|
||||
.map_err(JobError::Recoverable)?;
|
||||
async move {
|
||||
debug!(
|
||||
worker_id = self.id,
|
||||
job_id = job.id,
|
||||
description = job_impl.description(),
|
||||
"Processing job"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
// Process the job - API errors are recoverable
|
||||
job_impl
|
||||
.process(&self.banner_api, &self.db_pool)
|
||||
.await
|
||||
.map_err(JobError::Recoverable)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_job(&self, job_id: i32) -> Result<()> {
|
||||
@@ -135,10 +150,24 @@ impl Worker {
|
||||
.bind(job_id)
|
||||
.execute(&self.db_pool)
|
||||
.await?;
|
||||
info!(worker_id = self.id, job_id, "Job unlocked for retry");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result<bool> {
|
||||
let result = sqlx::query_scalar::<_, Option<i32>>(
|
||||
"UPDATE scrape_jobs
|
||||
SET locked_at = NULL, retry_count = retry_count + 1
|
||||
WHERE id = $1
|
||||
RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END"
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(max_retries)
|
||||
.fetch_one(&self.db_pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.is_some())
|
||||
}
|
||||
|
||||
/// Handle shutdown signal received during job processing
|
||||
async fn handle_shutdown_during_processing(&self, job_id: i32) {
|
||||
info!(worker_id = self.id, job_id, "Shutdown received during job processing");
|
||||
@@ -158,19 +187,30 @@ impl Worker {
|
||||
}
|
||||
|
||||
/// Handle the result of job processing
|
||||
async fn handle_job_result(&self, job_id: i32, result: Result<(), JobError>) {
|
||||
async fn handle_job_result(&self, job_id: i32, retry_count: i32, max_retries: i32, result: Result<(), JobError>, duration: std::time::Duration) {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
debug!(worker_id = self.id, job_id, "Job completed successfully");
|
||||
debug!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
"Job completed successfully"
|
||||
);
|
||||
if let Err(e) = self.delete_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
|
||||
}
|
||||
}
|
||||
Err(JobError::Recoverable(e)) => {
|
||||
self.handle_recoverable_error(job_id, e).await;
|
||||
self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration).await;
|
||||
}
|
||||
Err(JobError::Unrecoverable(e)) => {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Job corrupted, deleting");
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
error = ?e,
|
||||
"Job corrupted, deleting"
|
||||
);
|
||||
if let Err(e) = self.delete_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
|
||||
}
|
||||
@@ -179,18 +219,63 @@ impl Worker {
|
||||
}
|
||||
|
||||
/// Handle recoverable errors by logging appropriately and unlocking the job
|
||||
async fn handle_recoverable_error(&self, job_id: i32, e: anyhow::Error) {
|
||||
async fn handle_recoverable_error(&self, job_id: i32, retry_count: i32, max_retries: i32, e: anyhow::Error, duration: std::time::Duration) {
|
||||
let next_attempt = retry_count.saturating_add(1);
|
||||
let remaining_retries = max_retries.saturating_sub(next_attempt);
|
||||
|
||||
// Log the error appropriately based on type
|
||||
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
|
||||
warn!(
|
||||
worker_id = self.id,
|
||||
job_id, "Invalid session detected, forcing session refresh"
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
retry_attempt = next_attempt,
|
||||
max_retries = max_retries,
|
||||
remaining_retries = remaining_retries,
|
||||
"Invalid session detected, will retry"
|
||||
);
|
||||
} else {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
retry_attempt = next_attempt,
|
||||
max_retries = max_retries,
|
||||
remaining_retries = remaining_retries,
|
||||
error = ?e,
|
||||
"Failed to process job, will retry"
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(e) = self.unlock_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock job");
|
||||
// Atomically unlock and increment retry count, checking if retry is allowed
|
||||
match self.unlock_and_increment_retry(job_id, max_retries).await {
|
||||
Ok(can_retry) if can_retry => {
|
||||
info!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
retry_attempt = next_attempt,
|
||||
remaining_retries = remaining_retries,
|
||||
"Job unlocked for retry"
|
||||
);
|
||||
}
|
||||
Ok(_) => {
|
||||
// Max retries exceeded (detected atomically)
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
retry_count = next_attempt,
|
||||
max_retries = max_retries,
|
||||
error = ?e,
|
||||
"Job failed permanently (max retries exceeded), deleting"
|
||||
);
|
||||
if let Err(e) = self.delete_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user