diff --git a/migrations/20251103093649_add_retry_tracking.sql b/migrations/20251103093649_add_retry_tracking.sql new file mode 100644 index 0000000..f9f5bb1 --- /dev/null +++ b/migrations/20251103093649_add_retry_tracking.sql @@ -0,0 +1,3 @@ +-- Add retry tracking columns to scrape_jobs table +ALTER TABLE scrape_jobs ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0); +ALTER TABLE scrape_jobs ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 5 CHECK (max_retries >= 0); diff --git a/src/app.rs b/src/app.rs index 045415e..0ea6bdc 100644 --- a/src/app.rs +++ b/src/app.rs @@ -62,6 +62,14 @@ impl App { "database pool established" ); + // Run database migrations + info!("Running database migrations..."); + sqlx::migrate!("./migrations") + .run(&db_pool) + .await + .expect("Failed to run database migrations"); + info!("Database migrations completed successfully"); + // Create BannerApi and AppState let banner_api = BannerApi::new_with_config( config.banner_base_url.clone(), diff --git a/src/banner/api.rs b/src/banner/api.rs index 34cdfac..f8e3e18 100644 --- a/src/banner/api.rs +++ b/src/banner/api.rs @@ -152,6 +152,13 @@ impl BannerApi { } /// Performs a course search and handles common response processing. + #[tracing::instrument( + skip(self, query), + fields( + term = %term, + subject = %query.get_subject().unwrap_or(&"all".to_string()) + ) + )] async fn perform_search( &self, term: &str, @@ -318,12 +325,6 @@ impl BannerApi { sort: &str, sort_descending: bool, ) -> Result { - debug!( - term = term, - subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"), - max_results = query.get_max_results(), - "Starting course search" - ); self.perform_search(term, query, sort, sort_descending) .await } diff --git a/src/banner/middleware.rs b/src/banner/middleware.rs index 7ce4f8a..108743f 100644 --- a/src/banner/middleware.rs +++ b/src/banner/middleware.rs @@ -3,10 +3,13 @@ use http::Extensions; use reqwest::{Request, Response}; use reqwest_middleware::{Middleware, Next}; -use tracing::{trace, warn}; +use tracing::{debug, trace, warn}; pub struct TransparentMiddleware; +/// Threshold for logging slow requests at DEBUG level (in milliseconds) +const SLOW_REQUEST_THRESHOLD_MS: u128 = 1000; + #[async_trait::async_trait] impl Middleware for TransparentMiddleware { async fn handle( @@ -15,33 +18,56 @@ impl Middleware for TransparentMiddleware { extensions: &mut Extensions, next: Next<'_>, ) -> std::result::Result { - trace!( - domain = req.url().domain(), - headers = ?req.headers(), - "{method} {path}", - method = req.method().to_string(), - path = req.url().path(), - ); + let method = req.method().to_string(); + let path = req.url().path().to_string(); + + let start = std::time::Instant::now(); let response_result = next.run(req, extensions).await; + let duration = start.elapsed(); match response_result { Ok(response) => { if response.status().is_success() { - trace!( - "{code} {reason} {path}", - code = response.status().as_u16(), - reason = response.status().canonical_reason().unwrap_or("??"), - path = response.url().path(), - ); + let duration_ms = duration.as_millis(); + if duration_ms >= SLOW_REQUEST_THRESHOLD_MS { + debug!( + method = method, + path = path, + status = response.status().as_u16(), + duration_ms = duration_ms, + "Request completed (slow)" + ); + } else { + trace!( + method = method, + path = path, + status = response.status().as_u16(), + duration_ms = duration_ms, + "Request completed" + ); + } Ok(response) } else { let e = response.error_for_status_ref().unwrap_err(); - warn!(error = ?e, "Request failed (server)"); + warn!( + method = method, + path = path, + error = ?e, + status = response.status().as_u16(), + duration_ms = duration.as_millis(), + "Request failed" + ); Ok(response) } } Err(error) => { - warn!(error = ?error, "Request failed (middleware)"); + warn!( + method = method, + path = path, + error = ?error, + duration_ms = duration.as_millis(), + "Request failed" + ); Err(error) } } diff --git a/src/banner/rate_limit_middleware.rs b/src/banner/rate_limit_middleware.rs index fa425f7..39f0325 100644 --- a/src/banner/rate_limit_middleware.rs +++ b/src/banner/rate_limit_middleware.rs @@ -4,7 +4,7 @@ use crate::banner::rate_limiter::{RequestType, SharedRateLimiter}; use http::Extensions; use reqwest::{Request, Response}; use reqwest_middleware::{Middleware, Next}; -use tracing::{debug, trace, warn}; +use tracing::debug; use url::Url; /// Middleware that enforces rate limiting based on request URL patterns @@ -18,6 +18,16 @@ impl RateLimitMiddleware { Self { rate_limiter } } + /// Returns a human-readable description of the rate limit for a request type + fn get_rate_limit_description(request_type: RequestType) -> &'static str { + match request_type { + RequestType::Session => "6 rpm (~10s interval)", + RequestType::Search => "30 rpm (~2s interval)", + RequestType::Metadata => "20 rpm (~3s interval)", + RequestType::Reset => "10 rpm (~6s interval)", + } + } + /// Determines the request type based on the URL path fn get_request_type(url: &Url) -> RequestType { let path = url.path(); @@ -53,49 +63,22 @@ impl Middleware for RateLimitMiddleware { ) -> std::result::Result { let request_type = Self::get_request_type(req.url()); - trace!( - url = %req.url(), - request_type = ?request_type, - "Rate limiting request" - ); - - // Wait for permission to make the request + let start = std::time::Instant::now(); self.rate_limiter.wait_for_permission(request_type).await; + let wait_duration = start.elapsed(); - trace!( - url = %req.url(), - request_type = ?request_type, - "Rate limit permission granted, making request" - ); + // Only log if rate limiting caused significant delay (>= 500ms) + if wait_duration.as_millis() >= 500 { + let limit_desc = Self::get_rate_limit_description(request_type); + debug!( + request_type = ?request_type, + wait_ms = wait_duration.as_millis(), + rate_limit = limit_desc, + "Rate limit caused delay" + ); + } // Make the actual request - let response_result = next.run(req, extensions).await; - - match response_result { - Ok(response) => { - if response.status().is_success() { - trace!( - url = %response.url(), - status = response.status().as_u16(), - "Request completed successfully" - ); - } else { - warn!( - url = %response.url(), - status = response.status().as_u16(), - "Request completed with error status" - ); - } - Ok(response) - } - Err(error) => { - warn!( - url = ?error.url(), - error = ?error, - "Request failed" - ); - Err(error) - } - } + next.run(req, extensions).await } } diff --git a/src/banner/rate_limiter.rs b/src/banner/rate_limiter.rs index 7a9ae7b..8647932 100644 --- a/src/banner/rate_limiter.rs +++ b/src/banner/rate_limiter.rs @@ -8,7 +8,6 @@ use governor::{ use std::num::NonZeroU32; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, trace, warn}; /// Different types of Banner API requests with different rate limits #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -99,12 +98,8 @@ impl BannerRateLimiter { RequestType::Reset => &self.reset_limiter, }; - trace!(request_type = ?request_type, "Waiting for rate limit permission"); - - // Wait until we can make the request + // Wait until we can make the request (logging handled by middleware) limiter.until_ready().await; - - trace!(request_type = ?request_type, "Rate limit permission granted"); } } diff --git a/src/banner/session.rs b/src/banner/session.rs index 79ca9b2..e22d000 100644 --- a/src/banner/session.rs +++ b/src/banner/session.rs @@ -82,7 +82,6 @@ impl BannerSession { /// Updates the last activity timestamp pub fn touch(&mut self) { - trace!(id = self.unique_session_id, "Session was used"); self.last_activity = Some(Instant::now()); } @@ -162,7 +161,7 @@ impl TermPool { async fn release(&self, session: BannerSession) { let id = session.unique_session_id.clone(); if session.is_expired() { - trace!(id = id, "Session is now expired, dropping."); + debug!(id = id, "Session expired, dropping"); // Wake up a waiter, as it might need to create a new session // if this was the last one. self.notifier.notify_one(); @@ -171,10 +170,8 @@ impl TermPool { let mut queue = self.sessions.lock().await; queue.push_back(session); - let queue_size = queue.len(); drop(queue); // Release lock before notifying - trace!(id = id, queue_size, "Session returned to pool"); self.notifier.notify_one(); } } @@ -204,22 +201,21 @@ impl SessionPool { .or_insert_with(|| Arc::new(TermPool::new())) .clone(); + let start = Instant::now(); + let mut waited_for_creation = false; + loop { // Fast path: Try to get an existing, non-expired session. { let mut queue = term_pool.sessions.lock().await; if let Some(session) = queue.pop_front() { if !session.is_expired() { - trace!(id = session.unique_session_id, "Reusing session from pool"); return Ok(PooledSession { session: Some(session), pool: Arc::clone(&term_pool), }); } else { - trace!( - id = session.unique_session_id, - "Popped an expired session, discarding." - ); + debug!(id = session.unique_session_id, "Discarded expired session"); } } } // MutexGuard is dropped, lock is released. @@ -229,7 +225,10 @@ impl SessionPool { if *is_creating_guard { // Another task is already creating a session. Release the lock and wait. drop(is_creating_guard); - trace!("Another task is creating a session, waiting for notification..."); + if !waited_for_creation { + trace!("Waiting for another task to create session"); + waited_for_creation = true; + } term_pool.notifier.notified().await; // Loop back to the top to try the fast path again. continue; @@ -240,12 +239,11 @@ impl SessionPool { drop(is_creating_guard); // Race: wait for a session to be returned OR for the rate limiter to allow a new one. - trace!("Pool empty, racing notifier vs rate limiter..."); + trace!("Pool empty, creating new session"); tokio::select! { _ = term_pool.notifier.notified() => { // A session was returned while we were waiting! // We are no longer the creator. Reset the flag and loop to race for the new session. - trace!("Notified that a session was returned. Looping to retry."); let mut guard = term_pool.is_creating.lock().await; *guard = false; drop(guard); @@ -253,7 +251,6 @@ impl SessionPool { } _ = SESSION_CREATION_RATE_LIMITER.until_ready() => { // The rate limit has elapsed. It's our job to create the session. - trace!("Rate limiter ready. Proceeding to create a new session."); let new_session_result = self.create_session(&term).await; // After creation, we are no longer the creator. Reset the flag @@ -265,7 +262,12 @@ impl SessionPool { match new_session_result { Ok(new_session) => { - debug!(id = new_session.unique_session_id, "Successfully created new session"); + let elapsed = start.elapsed(); + debug!( + id = new_session.unique_session_id, + elapsed_ms = elapsed.as_millis(), + "Created new session" + ); return Ok(PooledSession { session: Some(new_session), pool: term_pool, @@ -298,8 +300,12 @@ impl SessionPool { .get_all("Set-Cookie") .iter() .filter_map(|header_value| { - if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) { - Some((cookie.name().to_string(), cookie.value().to_string())) + if let Ok(cookie_str) = header_value.to_str() { + if let Ok(cookie) = Cookie::parse(cookie_str) { + Some((cookie.name().to_string(), cookie.value().to_string())) + } else { + None + } } else { None } @@ -310,16 +316,12 @@ impl SessionPool { return Err(anyhow::anyhow!("Failed to get cookies")); } - let jsessionid = cookies.get("JSESSIONID").unwrap(); - let ssb_cookie = cookies.get("SSB_COOKIE").unwrap(); + let jsessionid = cookies.get("JSESSIONID") + .ok_or_else(|| anyhow::anyhow!("JSESSIONID cookie missing after validation"))?; + let ssb_cookie = cookies.get("SSB_COOKIE") + .ok_or_else(|| anyhow::anyhow!("SSB_COOKIE cookie missing after validation"))?; let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie); - trace!( - jsessionid = jsessionid, - ssb_cookie = ssb_cookie, - "New session cookies acquired" - ); - self.http .get(format!("{}/selfServiceMenu/data", self.base_url)) .header("Cookie", &cookie_header) @@ -435,8 +437,15 @@ impl SessionPool { let redirect: RedirectResponse = response.json().await?; - let base_url_path = self.base_url.parse::().unwrap().path().to_string(); - let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap(); + let base_url_path = self.base_url.parse::() + .context("Failed to parse base URL")? + .path() + .to_string(); + let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path) + .ok_or_else(|| anyhow::anyhow!( + "Redirect URL '{}' does not start with expected prefix '{}'", + redirect.fwd_url, base_url_path + ))?; // Follow the redirect let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect); @@ -454,7 +463,6 @@ impl SessionPool { )); } - trace!(term = term, "successfully selected term"); Ok(()) } } diff --git a/src/data/models.rs b/src/data/models.rs index aebc130..7843334 100644 --- a/src/data/models.rs +++ b/src/data/models.rs @@ -72,4 +72,8 @@ pub struct ScrapeJob { pub execute_at: DateTime, pub created_at: DateTime, pub locked_at: Option>, + /// Number of retry attempts for this job (non-negative, enforced by CHECK constraint) + pub retry_count: i32, + /// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint) + pub max_retries: i32, } diff --git a/src/logging.rs b/src/logging.rs index 6056e8b..d66effe 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -7,10 +7,13 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber}; /// Configure and initialize logging for the application pub fn setup_logging(config: &Config, tracing_format: TracingFormat) { // Configure logging based on config + // Note: Even when base_level is trace or debug, we suppress trace logs from noisy + // infrastructure modules to keep output readable. These modules use debug for important + // events and trace only for very detailed debugging. let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { let base_level = &config.log_level; EnvFilter::new(format!( - "warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn", + "warn,banner={},banner::rate_limiter=warn,banner::session=debug,banner::rate_limit_middleware=warn,banner::middleware=debug", base_level )) }); diff --git a/src/scraper/jobs/subject.rs b/src/scraper/jobs/subject.rs index ca7f9e7..36a534d 100644 --- a/src/scraper/jobs/subject.rs +++ b/src/scraper/jobs/subject.rs @@ -4,7 +4,7 @@ use crate::data::models::TargetType; use crate::error::Result; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; /// Job implementation for scraping subject data #[derive(Debug, Clone, Serialize, Deserialize)] @@ -24,9 +24,9 @@ impl Job for SubjectJob { TargetType::Subject } + #[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))] async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> { let subject_code = &self.subject; - debug!(subject = subject_code, "Processing subject job"); // Get the current term let term = Term::get_current().inner().to_string(); @@ -85,9 +85,7 @@ impl SubjectJob { .bind(chrono::Utc::now()) .execute(db_pool) .await - .map(|result| { - trace!(subject = course.subject, crn = course.course_reference_number, result = ?result, "Course upserted"); - }) + .map(|_| ()) .map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}")) } } diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs index 3f61ab0..2194cc0 100644 --- a/src/scraper/scheduler.rs +++ b/src/scraper/scheduler.rs @@ -9,7 +9,7 @@ use std::time::Duration; use tokio::sync::broadcast; use tokio::time; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; /// Periodically analyzes data and enqueues prioritized scrape jobs. pub struct Scheduler { @@ -99,6 +99,7 @@ impl Scheduler { /// 3. Create jobs only for subjects that don't have pending jobs /// /// This is a static method (not &self) to allow it to be called from spawned tasks. + #[tracing::instrument(skip_all, fields(term))] async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> { // For now, we will implement a simple baseline scheduling strategy: // 1. Get a list of all subjects from the Banner API. @@ -106,6 +107,7 @@ impl Scheduler { // 3. Create new jobs only for subjects that don't have existing jobs. let term = Term::get_current().inner().to_string(); + tracing::Span::current().record("term", term.as_str()); debug!(term = term, "Enqueuing subject jobs"); let subjects = banner_api.get_subjects("", &term, 1, 500).await?; @@ -137,6 +139,7 @@ impl Scheduler { .collect(); // Filter out subjects that already have jobs and prepare new jobs + let mut skipped_count = 0; let new_jobs: Vec<_> = subjects .into_iter() .filter_map(|subject| { @@ -145,7 +148,7 @@ impl Scheduler { let payload_str = payload.to_string(); if existing_payloads.contains(&payload_str) { - trace!(subject = subject.code, "Job already exists, skipping"); + skipped_count += 1; None } else { Some((payload, subject.code)) @@ -153,6 +156,10 @@ impl Scheduler { }) .collect(); + if skipped_count > 0 { + debug!(count = skipped_count, "Skipped subjects with existing jobs"); + } + // Insert all new jobs in a single batch if !new_jobs.is_empty() { let now = chrono::Utc::now(); diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index f252bb4..e1ecce9 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; use tokio::time; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn, Instrument}; /// A single worker instance. /// @@ -57,7 +57,9 @@ impl Worker { }; let job_id = job.id; - debug!(worker_id = self.id, job_id, "Processing job"); + let retry_count = job.retry_count; + let max_retries = job.max_retries; + let start = std::time::Instant::now(); // Process the job, racing against shutdown signal let process_result = tokio::select! { @@ -68,8 +70,10 @@ impl Worker { result = self.process_job(job) => result }; + let duration = start.elapsed(); + // Handle the job processing result - self.handle_job_result(job_id, process_result).await; + self.handle_job_result(job_id, retry_count, max_retries, process_result, duration).await; } } @@ -106,20 +110,31 @@ impl Worker { // Get the job implementation let job_impl = job_type.boxed(); - debug!( - worker_id = self.id, + // Create span with job context + let span = tracing::debug_span!( + "process_job", job_id = job.id, - description = job_impl.description(), - "Processing job" + job_type = job_impl.description() ); - // Process the job - API errors are recoverable - job_impl - .process(&self.banner_api, &self.db_pool) - .await - .map_err(JobError::Recoverable)?; + async move { + debug!( + worker_id = self.id, + job_id = job.id, + description = job_impl.description(), + "Processing job" + ); - Ok(()) + // Process the job - API errors are recoverable + job_impl + .process(&self.banner_api, &self.db_pool) + .await + .map_err(JobError::Recoverable)?; + + Ok(()) + } + .instrument(span) + .await } async fn delete_job(&self, job_id: i32) -> Result<()> { @@ -135,10 +150,24 @@ impl Worker { .bind(job_id) .execute(&self.db_pool) .await?; - info!(worker_id = self.id, job_id, "Job unlocked for retry"); Ok(()) } + async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result { + let result = sqlx::query_scalar::<_, Option>( + "UPDATE scrape_jobs + SET locked_at = NULL, retry_count = retry_count + 1 + WHERE id = $1 + RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END" + ) + .bind(job_id) + .bind(max_retries) + .fetch_one(&self.db_pool) + .await?; + + Ok(result.is_some()) + } + /// Handle shutdown signal received during job processing async fn handle_shutdown_during_processing(&self, job_id: i32) { info!(worker_id = self.id, job_id, "Shutdown received during job processing"); @@ -158,19 +187,30 @@ impl Worker { } /// Handle the result of job processing - async fn handle_job_result(&self, job_id: i32, result: Result<(), JobError>) { + async fn handle_job_result(&self, job_id: i32, retry_count: i32, max_retries: i32, result: Result<(), JobError>, duration: std::time::Duration) { match result { Ok(()) => { - debug!(worker_id = self.id, job_id, "Job completed successfully"); + debug!( + worker_id = self.id, + job_id, + duration_ms = duration.as_millis(), + "Job completed successfully" + ); if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job"); } } Err(JobError::Recoverable(e)) => { - self.handle_recoverable_error(job_id, e).await; + self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration).await; } Err(JobError::Unrecoverable(e)) => { - error!(worker_id = self.id, job_id, error = ?e, "Job corrupted, deleting"); + error!( + worker_id = self.id, + job_id, + duration_ms = duration.as_millis(), + error = ?e, + "Job corrupted, deleting" + ); if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job"); } @@ -179,18 +219,63 @@ impl Worker { } /// Handle recoverable errors by logging appropriately and unlocking the job - async fn handle_recoverable_error(&self, job_id: i32, e: anyhow::Error) { + async fn handle_recoverable_error(&self, job_id: i32, retry_count: i32, max_retries: i32, e: anyhow::Error, duration: std::time::Duration) { + let next_attempt = retry_count.saturating_add(1); + let remaining_retries = max_retries.saturating_sub(next_attempt); + + // Log the error appropriately based on type if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::() { warn!( worker_id = self.id, - job_id, "Invalid session detected, forcing session refresh" + job_id, + duration_ms = duration.as_millis(), + retry_attempt = next_attempt, + max_retries = max_retries, + remaining_retries = remaining_retries, + "Invalid session detected, will retry" ); } else { - error!(worker_id = self.id, job_id, error = ?e, "Failed to process job"); + error!( + worker_id = self.id, + job_id, + duration_ms = duration.as_millis(), + retry_attempt = next_attempt, + max_retries = max_retries, + remaining_retries = remaining_retries, + error = ?e, + "Failed to process job, will retry" + ); } - if let Err(e) = self.unlock_job(job_id).await { - error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock job"); + // Atomically unlock and increment retry count, checking if retry is allowed + match self.unlock_and_increment_retry(job_id, max_retries).await { + Ok(can_retry) if can_retry => { + info!( + worker_id = self.id, + job_id, + retry_attempt = next_attempt, + remaining_retries = remaining_retries, + "Job unlocked for retry" + ); + } + Ok(_) => { + // Max retries exceeded (detected atomically) + error!( + worker_id = self.id, + job_id, + duration_ms = duration.as_millis(), + retry_count = next_attempt, + max_retries = max_retries, + error = ?e, + "Job failed permanently (max retries exceeded), deleting" + ); + if let Err(e) = self.delete_job(job_id).await { + error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job"); + } + } + Err(e) => { + error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count"); + } } } }