Compare commits

...

4 Commits

Author SHA1 Message Date
Ryan Walters
3292d35521 build(docker): copy migrations directory to build context
Ensures database migration files are available during the Docker build process.
2025-11-03 12:07:27 -06:00
Ryan Walters
71ac0782d0 feat(json): enhance error context with debug mode detailed reporting
Improve JSON parsing error messages with build-specific behavior:
- Debug builds: Show full parent object context and type mismatch details
- Release builds: Keep minimal snippets to avoid log spam

Add comprehensive test coverage for error handling and path parsing.
2025-11-03 12:04:20 -06:00
Ryan Walters
1c6d2d4b6e perf: implement batch operations and optimize database indexes
Add batch upsert functionality to reduce database round-trips from N to 1 when inserting courses. Create comprehensive database indexes for common query patterns including term/subject lookups, time-series metrics, and job scheduling. Remove redundant indexes and add monitoring guidance for BRIN index effectiveness.
2025-11-03 11:18:42 -06:00
Ryan Walters
51f8256e61 feat: implement comprehensive retry mechanism and improve observability
Add retry tracking to scrape jobs with configurable max retries (default 5), implement
automatic database migrations on startup, and significantly reduce logging noise from
infrastructure layers. Enhanced tracing with structured spans for better debugging while
keeping output readable by suppressing verbose trace logs from rate limiters and session
management. Improved error handling with detailed retry context and proper session cookie
validation.
2025-11-03 10:18:07 -06:00
18 changed files with 856 additions and 175 deletions

View File

@@ -59,6 +59,7 @@ RUN cargo build --release
# Copy source code # Copy source code
RUN rm src/*.rs RUN rm src/*.rs
COPY ./src ./src/ COPY ./src ./src/
COPY ./migrations ./migrations/
# Copy built frontend assets # Copy built frontend assets
COPY --from=frontend-builder /app/dist ./web/dist COPY --from=frontend-builder /app/dist ./web/dist

View File

@@ -0,0 +1,3 @@
-- Add retry tracking columns to scrape_jobs table
ALTER TABLE scrape_jobs ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0);
ALTER TABLE scrape_jobs ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 5 CHECK (max_retries >= 0);

View File

@@ -0,0 +1,45 @@
-- Performance optimization indexes
-- Index for term-based queries (most common access pattern)
CREATE INDEX IF NOT EXISTS idx_courses_term_code ON courses(term_code);
-- Index for subject-based filtering
CREATE INDEX IF NOT EXISTS idx_courses_subject ON courses(subject);
-- Composite index for subject + term queries
CREATE INDEX IF NOT EXISTS idx_courses_subject_term ON courses(subject, term_code);
-- Index for course number lookups
CREATE INDEX IF NOT EXISTS idx_courses_course_number ON courses(course_number);
-- Index for last scraped timestamp (useful for finding stale data)
CREATE INDEX IF NOT EXISTS idx_courses_last_scraped ON courses(last_scraped_at);
-- Index for course metrics time-series queries
-- BRIN index is optimal for time-series data
CREATE INDEX IF NOT EXISTS idx_course_metrics_timestamp ON course_metrics USING BRIN(timestamp);
-- B-tree index for specific course metric lookups
CREATE INDEX IF NOT EXISTS idx_course_metrics_course_timestamp
ON course_metrics(course_id, timestamp DESC);
-- Partial index for pending scrape jobs (only unlocked jobs)
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_pending
ON scrape_jobs(execute_at ASC)
WHERE locked_at IS NULL;
-- Index for high-priority job processing
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_priority_pending
ON scrape_jobs(priority DESC, execute_at ASC)
WHERE locked_at IS NULL;
-- Index for retry tracking
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_retry_count
ON scrape_jobs(retry_count)
WHERE retry_count > 0 AND locked_at IS NULL;
-- Analyze tables to update statistics
ANALYZE courses;
ANALYZE course_metrics;
ANALYZE course_audits;
ANALYZE scrape_jobs;

View File

@@ -0,0 +1,53 @@
-- Index Optimization Follow-up Migration
-- Reason: Redundant with composite index idx_courses_subject_term
DROP INDEX IF EXISTS idx_courses_subject;
-- Remove: idx_scrape_jobs_retry_count
DROP INDEX IF EXISTS idx_scrape_jobs_retry_count;
-- Purpose: Optimize the scheduler's frequent query (runs every 60 seconds)
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_scheduler_lookup
ON scrape_jobs(target_type, target_payload)
WHERE locked_at IS NULL;
-- Note: We use (target_type, target_payload) instead of including locked_at
-- in the index columns because:
-- 1. The WHERE clause filters locked_at IS NULL (partial index optimization)
-- 2. target_payload is JSONB and already large; keeping it as an indexed column
-- allows PostgreSQL to use index-only scans for the SELECT target_payload query
-- 3. This design minimizes index size while maximizing query performance
-- Purpose: Enable efficient audit trail queries by course
CREATE INDEX IF NOT EXISTS idx_course_audits_course_timestamp
ON course_audits(course_id, timestamp DESC);
-- Purpose: Enable queries like "Show all changes in the last 24 hours"
CREATE INDEX IF NOT EXISTS idx_course_audits_timestamp
ON course_audits(timestamp DESC);
-- The BRIN index on course_metrics(timestamp) assumes data is inserted in
-- chronological order. BRIN indexes are only effective when data is physically
-- ordered on disk. If you perform:
-- - Backfills of historical data
-- - Out-of-order inserts
-- - Frequent UPDATEs that move rows
--
-- Then the BRIN index effectiveness will degrade. Monitor with:
-- SELECT * FROM brin_page_items(get_raw_page('idx_course_metrics_timestamp', 1));
--
-- If you see poor selectivity, consider:
-- 1. REINDEX to rebuild after bulk loads
-- 2. Switch to B-tree if inserts are not time-ordered
-- 3. Use CLUSTER to physically reorder the table (requires downtime)
COMMENT ON INDEX idx_course_metrics_timestamp IS
'BRIN index - requires chronologically ordered inserts for efficiency. Monitor selectivity.';
-- Update statistics for query planner
ANALYZE courses;
ANALYZE course_metrics;
ANALYZE course_audits;
ANALYZE scrape_jobs;

View File

@@ -62,6 +62,14 @@ impl App {
"database pool established" "database pool established"
); );
// Run database migrations
info!("Running database migrations...");
sqlx::migrate!("./migrations")
.run(&db_pool)
.await
.expect("Failed to run database migrations");
info!("Database migrations completed successfully");
// Create BannerApi and AppState // Create BannerApi and AppState
let banner_api = BannerApi::new_with_config( let banner_api = BannerApi::new_with_config(
config.banner_base_url.clone(), config.banner_base_url.clone(),

View File

@@ -152,6 +152,13 @@ impl BannerApi {
} }
/// Performs a course search and handles common response processing. /// Performs a course search and handles common response processing.
#[tracing::instrument(
skip(self, query),
fields(
term = %term,
subject = %query.get_subject().unwrap_or(&"all".to_string())
)
)]
async fn perform_search( async fn perform_search(
&self, &self,
term: &str, term: &str,
@@ -318,12 +325,6 @@ impl BannerApi {
sort: &str, sort: &str,
sort_descending: bool, sort_descending: bool,
) -> Result<SearchResult, BannerApiError> { ) -> Result<SearchResult, BannerApiError> {
debug!(
term = term,
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
max_results = query.get_max_results(),
"Starting course search"
);
self.perform_search(term, query, sort, sort_descending) self.perform_search(term, query, sort, sort_descending)
.await .await
} }

View File

@@ -1,10 +1,14 @@
//! JSON parsing utilities for the Banner API client. //! JSON parsing utilities for the Banner API client.
use anyhow::Result; use anyhow::Result;
use serde_json; use serde_json::{self, Value};
/// Attempt to parse JSON and, on failure, include a contextual snippet of the /// Attempt to parse JSON and, on failure, include a contextual snippet of the
/// line where the error occurred. This prevents dumping huge JSON bodies to logs. /// line where the error occurred.
///
/// In debug builds, this provides detailed context including the full JSON object
/// containing the error and type mismatch information. In release builds, it shows
/// a minimal snippet to prevent dumping huge JSON bodies to production logs.
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> { pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
let jd = &mut serde_json::Deserializer::from_str(body); let jd = &mut serde_json::Deserializer::from_str(body);
match serde_path_to_error::deserialize(jd) { match serde_path_to_error::deserialize(jd) {
@@ -12,27 +16,244 @@ pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Re
Err(err) => { Err(err) => {
let inner_err = err.inner(); let inner_err = err.inner();
let (line, column) = (inner_err.line(), inner_err.column()); let (line, column) = (inner_err.line(), inner_err.column());
let snippet = build_error_snippet(body, line, column, 20);
let path = err.path().to_string(); let path = err.path().to_string();
let msg = inner_err.to_string(); let msg = inner_err.to_string();
let loc = format!(" at line {line} column {column}"); let loc = format!(" at line {line} column {column}");
let msg_without_loc = msg.strip_suffix(&loc).unwrap_or(&msg).to_string(); let msg_without_loc = msg.strip_suffix(&loc).unwrap_or(&msg).to_string();
let mut final_err = String::new(); // Build error message differently for debug vs release builds
if !path.is_empty() && path != "." { let final_err = if cfg!(debug_assertions) {
final_err.push_str(&format!("for path '{}' ", path)); // Debug mode: provide detailed context
} let type_info = parse_type_mismatch(&msg_without_loc);
final_err.push_str(&format!( let context = extract_json_object_at_path(body, err.path(), line, column);
"({msg_without_loc}) at line {line} column {column}"
)); let mut err_msg = String::new();
final_err.push_str(&format!("\n{snippet}")); if !path.is_empty() && path != "." {
err_msg.push_str(&format!("for path '{}'\n", path));
}
err_msg.push_str(&format!("({}) at line {} column {}\n\n", type_info, line, column));
err_msg.push_str(&context);
err_msg
} else {
// Release mode: minimal snippet to keep logs concise
let snippet = build_error_snippet(body, line, column, 20);
let mut err_msg = String::new();
if !path.is_empty() && path != "." {
err_msg.push_str(&format!("for path '{}' ", path));
}
err_msg.push_str(&format!(
"({}) at line {} column {}",
msg_without_loc, line, column
));
err_msg.push_str(&format!("\n{}", snippet));
err_msg
};
Err(anyhow::anyhow!(final_err)) Err(anyhow::anyhow!(final_err))
} }
} }
} }
/// Extract type mismatch information from a serde error message.
///
/// Parses error messages like "invalid type: null, expected a string" to extract
/// the expected and actual types for clearer error reporting.
///
/// Returns a formatted string like "(expected a string, got null)" or the original
/// message if parsing fails.
fn parse_type_mismatch(error_msg: &str) -> String {
// Try to parse "invalid type: X, expected Y" format
if let Some(invalid_start) = error_msg.find("invalid type: ") {
let after_prefix = &error_msg[invalid_start + "invalid type: ".len()..];
if let Some(comma_pos) = after_prefix.find(", expected ") {
let actual_type = &after_prefix[..comma_pos];
let expected_part = &after_prefix[comma_pos + ", expected ".len()..];
// Clean up expected part (remove " at line X column Y" if present)
let expected_type = expected_part
.split(" at line ")
.next()
.unwrap_or(expected_part)
.trim();
return format!("expected {}, got {}", expected_type, actual_type);
}
}
// Try to parse "expected X at line Y" format
if error_msg.starts_with("expected ") {
if let Some(expected_part) = error_msg.split(" at line ").next() {
return expected_part.to_string();
}
}
// Fallback: return original message without location info
error_msg.to_string()
}
/// Extract and pretty-print the JSON object/array containing the parse error.
///
/// This function navigates to the error location using the serde path and extracts
/// the parent object or array to provide better context for debugging.
///
/// # Arguments
/// * `body` - The raw JSON string
/// * `path` - The serde path to the error (e.g., "data[0].faculty[0].displayName")
/// * `line` - Line number of the error (for fallback)
/// * `column` - Column number of the error (for fallback)
///
/// # Returns
/// A formatted string containing the JSON object with the error, or a fallback snippet
fn extract_json_object_at_path(
body: &str,
path: &serde_path_to_error::Path,
line: usize,
column: usize,
) -> String {
// Try to parse the entire JSON structure
let root_value: Value = match serde_json::from_str(body) {
Ok(v) => v,
Err(_) => {
// If we can't parse the JSON at all, fall back to line snippet
return build_error_snippet(body, line, column, 20);
}
};
// Navigate to the error location using the path
let path_str = path.to_string();
let segments = parse_path_segments(&path_str);
let (context_value, context_name) = navigate_to_context(&root_value, &segments);
// Pretty-print the context value with limited depth to avoid huge output
match serde_json::to_string_pretty(&context_value) {
Ok(pretty) => {
// Limit output to ~50 lines to prevent log spam
let lines: Vec<&str> = pretty.lines().collect();
let truncated = if lines.len() > 50 {
let mut result = lines[..47].join("\n");
result.push_str("\n ... (truncated, ");
result.push_str(&(lines.len() - 47).to_string());
result.push_str(" more lines)");
result
} else {
pretty
};
format!("{} at '{}':\n{}", context_name, path_str, truncated)
}
Err(_) => {
// Fallback to simple snippet if pretty-print fails
build_error_snippet(body, line, column, 20)
}
}
}
/// Parse a JSON path string into segments for navigation.
///
/// Converts paths like "data[0].faculty[1].displayName" into a sequence of
/// object keys and array indices.
fn parse_path_segments(path: &str) -> Vec<PathSegment> {
let mut segments = Vec::new();
let mut current = String::new();
let mut in_bracket = false;
for ch in path.chars() {
match ch {
'.' if !in_bracket => {
if !current.is_empty() {
segments.push(PathSegment::Key(current.clone()));
current.clear();
}
}
'[' => {
if !current.is_empty() {
segments.push(PathSegment::Key(current.clone()));
current.clear();
}
in_bracket = true;
}
']' => {
if in_bracket && !current.is_empty() {
if let Ok(index) = current.parse::<usize>() {
segments.push(PathSegment::Index(index));
}
current.clear();
}
in_bracket = false;
}
_ => current.push(ch),
}
}
if !current.is_empty() {
segments.push(PathSegment::Key(current));
}
segments
}
/// Represents a segment in a JSON path (either an object key or array index).
#[derive(Debug)]
enum PathSegment {
Key(String),
Index(usize),
}
/// Navigate through a JSON value using path segments and return the appropriate context.
///
/// This function walks the JSON structure and returns the parent object/array that
/// contains the error, providing meaningful context for debugging.
///
/// # Returns
/// A tuple of (context_value, description) where context_value is the JSON to display
/// and description is a human-readable name for what we're showing.
fn navigate_to_context<'a>(
mut current: &'a Value,
segments: &[PathSegment],
) -> (&'a Value, &'static str) {
// If path is empty or just root, return the whole value
if segments.is_empty() {
return (current, "Root object");
}
// Try to navigate to the parent of the error location
// We want to show the containing object/array, not just the failing field
let parent_depth = segments.len().saturating_sub(1);
for (i, segment) in segments.iter().enumerate() {
// Stop one level before the end to show the parent context
if i >= parent_depth {
break;
}
match segment {
PathSegment::Key(key) => {
if let Some(next) = current.get(key) {
current = next;
} else {
// Can't navigate further, return what we have
return (current, "Partial context (navigation stopped)");
}
}
PathSegment::Index(idx) => {
if let Some(next) = current.get(idx) {
current = next;
} else {
return (current, "Partial context (index out of bounds)");
}
}
}
}
(current, "Object containing error")
}
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String { fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or(""); let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
if target_line.is_empty() { if target_line.is_empty() {
@@ -53,3 +274,139 @@ fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usiz
format!("...{slice}...\n {indicator}") format!("...{slice}...\n {indicator}")
} }
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[test]
fn test_parse_type_mismatch_invalid_type() {
let msg = "invalid type: null, expected a string at line 45 column 29";
let result = parse_type_mismatch(msg);
assert_eq!(result, "expected a string, got null");
}
#[test]
fn test_parse_type_mismatch_expected() {
let msg = "expected value at line 1 column 1";
let result = parse_type_mismatch(msg);
assert_eq!(result, "expected value");
}
#[test]
fn test_parse_path_segments_simple() {
let segments = parse_path_segments("data.name");
assert_eq!(segments.len(), 2);
match &segments[0] {
PathSegment::Key(k) => assert_eq!(k, "data"),
_ => panic!("Expected Key segment"),
}
}
#[test]
fn test_parse_path_segments_with_array() {
let segments = parse_path_segments("data[0].faculty[1].displayName");
assert_eq!(segments.len(), 5);
match &segments[0] {
PathSegment::Key(k) => assert_eq!(k, "data"),
_ => panic!("Expected Key segment"),
}
match &segments[1] {
PathSegment::Index(i) => assert_eq!(*i, 0),
_ => panic!("Expected Index segment"),
}
}
#[test]
fn test_parse_json_with_context_null_value() {
#[derive(Debug, Deserialize)]
struct TestStruct {
name: String,
}
let json = r#"{"name": null}"#;
let result: Result<TestStruct> = parse_json_with_context(json);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
// Should contain path info
assert!(err_msg.contains("name"));
// In debug mode, should contain detailed context
if cfg!(debug_assertions) {
assert!(err_msg.contains("expected"));
}
}
#[test]
fn test_navigate_to_context() {
let json = r#"{"data": [{"faculty": [{"name": "John"}]}]}"#;
let value: Value = serde_json::from_str(json).unwrap();
let segments = parse_path_segments("data[0].faculty[0].name");
let (context, _) = navigate_to_context(&value, &segments);
// Should return the faculty[0] object (parent of 'name')
assert!(context.is_object());
assert!(context.get("name").is_some());
}
#[test]
fn test_realistic_banner_error() {
#[derive(Debug, Deserialize)]
struct Course {
#[allow(dead_code)]
#[serde(rename = "courseTitle")]
course_title: String,
faculty: Vec<Faculty>,
}
#[derive(Debug, Deserialize)]
struct Faculty {
#[serde(rename = "displayName")]
display_name: String,
#[allow(dead_code)]
email: String,
}
#[derive(Debug, Deserialize)]
struct SearchResult {
data: Vec<Course>,
}
// Simulate Banner API response with null faculty displayName
// This mimics the actual error from SPN subject scrape
let json = r#"{
"data": [
{
"courseTitle": "Spanish Conversation",
"faculty": [
{
"displayName": null,
"email": "instructor@utsa.edu"
}
]
}
]
}"#;
let result: Result<SearchResult> = parse_json_with_context(json);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
println!("\n=== Error output in debug mode ===\n{}\n", err_msg);
// Verify error contains key information
assert!(err_msg.contains("data[0].faculty[0].displayName"));
// In debug mode, should show detailed context
if cfg!(debug_assertions) {
// Should show type mismatch info
assert!(err_msg.contains("expected") && err_msg.contains("got"));
// Should show surrounding JSON context with the faculty object
assert!(err_msg.contains("email"));
}
}
}

View File

@@ -3,10 +3,13 @@
use http::Extensions; use http::Extensions;
use reqwest::{Request, Response}; use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next}; use reqwest_middleware::{Middleware, Next};
use tracing::{trace, warn}; use tracing::{debug, trace, warn};
pub struct TransparentMiddleware; pub struct TransparentMiddleware;
/// Threshold for logging slow requests at DEBUG level (in milliseconds)
const SLOW_REQUEST_THRESHOLD_MS: u128 = 1000;
#[async_trait::async_trait] #[async_trait::async_trait]
impl Middleware for TransparentMiddleware { impl Middleware for TransparentMiddleware {
async fn handle( async fn handle(
@@ -15,33 +18,56 @@ impl Middleware for TransparentMiddleware {
extensions: &mut Extensions, extensions: &mut Extensions,
next: Next<'_>, next: Next<'_>,
) -> std::result::Result<Response, reqwest_middleware::Error> { ) -> std::result::Result<Response, reqwest_middleware::Error> {
trace!( let method = req.method().to_string();
domain = req.url().domain(), let path = req.url().path().to_string();
headers = ?req.headers(),
"{method} {path}", let start = std::time::Instant::now();
method = req.method().to_string(),
path = req.url().path(),
);
let response_result = next.run(req, extensions).await; let response_result = next.run(req, extensions).await;
let duration = start.elapsed();
match response_result { match response_result {
Ok(response) => { Ok(response) => {
if response.status().is_success() { if response.status().is_success() {
trace!( let duration_ms = duration.as_millis();
"{code} {reason} {path}", if duration_ms >= SLOW_REQUEST_THRESHOLD_MS {
code = response.status().as_u16(), debug!(
reason = response.status().canonical_reason().unwrap_or("??"), method = method,
path = response.url().path(), path = path,
); status = response.status().as_u16(),
duration_ms = duration_ms,
"Request completed (slow)"
);
} else {
trace!(
method = method,
path = path,
status = response.status().as_u16(),
duration_ms = duration_ms,
"Request completed"
);
}
Ok(response) Ok(response)
} else { } else {
let e = response.error_for_status_ref().unwrap_err(); let e = response.error_for_status_ref().unwrap_err();
warn!(error = ?e, "Request failed (server)"); warn!(
method = method,
path = path,
error = ?e,
status = response.status().as_u16(),
duration_ms = duration.as_millis(),
"Request failed"
);
Ok(response) Ok(response)
} }
} }
Err(error) => { Err(error) => {
warn!(error = ?error, "Request failed (middleware)"); warn!(
method = method,
path = path,
error = ?error,
duration_ms = duration.as_millis(),
"Request failed"
);
Err(error) Err(error)
} }
} }

View File

@@ -4,7 +4,7 @@ use crate::banner::rate_limiter::{RequestType, SharedRateLimiter};
use http::Extensions; use http::Extensions;
use reqwest::{Request, Response}; use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next}; use reqwest_middleware::{Middleware, Next};
use tracing::{debug, trace, warn}; use tracing::debug;
use url::Url; use url::Url;
/// Middleware that enforces rate limiting based on request URL patterns /// Middleware that enforces rate limiting based on request URL patterns
@@ -18,6 +18,16 @@ impl RateLimitMiddleware {
Self { rate_limiter } Self { rate_limiter }
} }
/// Returns a human-readable description of the rate limit for a request type
fn get_rate_limit_description(request_type: RequestType) -> &'static str {
match request_type {
RequestType::Session => "6 rpm (~10s interval)",
RequestType::Search => "30 rpm (~2s interval)",
RequestType::Metadata => "20 rpm (~3s interval)",
RequestType::Reset => "10 rpm (~6s interval)",
}
}
/// Determines the request type based on the URL path /// Determines the request type based on the URL path
fn get_request_type(url: &Url) -> RequestType { fn get_request_type(url: &Url) -> RequestType {
let path = url.path(); let path = url.path();
@@ -53,49 +63,22 @@ impl Middleware for RateLimitMiddleware {
) -> std::result::Result<Response, reqwest_middleware::Error> { ) -> std::result::Result<Response, reqwest_middleware::Error> {
let request_type = Self::get_request_type(req.url()); let request_type = Self::get_request_type(req.url());
trace!( let start = std::time::Instant::now();
url = %req.url(),
request_type = ?request_type,
"Rate limiting request"
);
// Wait for permission to make the request
self.rate_limiter.wait_for_permission(request_type).await; self.rate_limiter.wait_for_permission(request_type).await;
let wait_duration = start.elapsed();
trace!( // Only log if rate limiting caused significant delay (>= 500ms)
url = %req.url(), if wait_duration.as_millis() >= 500 {
request_type = ?request_type, let limit_desc = Self::get_rate_limit_description(request_type);
"Rate limit permission granted, making request" debug!(
); request_type = ?request_type,
wait_ms = wait_duration.as_millis(),
rate_limit = limit_desc,
"Rate limit caused delay"
);
}
// Make the actual request // Make the actual request
let response_result = next.run(req, extensions).await; next.run(req, extensions).await
match response_result {
Ok(response) => {
if response.status().is_success() {
trace!(
url = %response.url(),
status = response.status().as_u16(),
"Request completed successfully"
);
} else {
warn!(
url = %response.url(),
status = response.status().as_u16(),
"Request completed with error status"
);
}
Ok(response)
}
Err(error) => {
warn!(
url = ?error.url(),
error = ?error,
"Request failed"
);
Err(error)
}
}
} }
} }

View File

@@ -8,7 +8,6 @@ use governor::{
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tracing::{debug, trace, warn};
/// Different types of Banner API requests with different rate limits /// Different types of Banner API requests with different rate limits
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@@ -99,12 +98,8 @@ impl BannerRateLimiter {
RequestType::Reset => &self.reset_limiter, RequestType::Reset => &self.reset_limiter,
}; };
trace!(request_type = ?request_type, "Waiting for rate limit permission"); // Wait until we can make the request (logging handled by middleware)
// Wait until we can make the request
limiter.until_ready().await; limiter.until_ready().await;
trace!(request_type = ?request_type, "Rate limit permission granted");
} }
} }

View File

@@ -82,7 +82,6 @@ impl BannerSession {
/// Updates the last activity timestamp /// Updates the last activity timestamp
pub fn touch(&mut self) { pub fn touch(&mut self) {
trace!(id = self.unique_session_id, "Session was used");
self.last_activity = Some(Instant::now()); self.last_activity = Some(Instant::now());
} }
@@ -162,7 +161,7 @@ impl TermPool {
async fn release(&self, session: BannerSession) { async fn release(&self, session: BannerSession) {
let id = session.unique_session_id.clone(); let id = session.unique_session_id.clone();
if session.is_expired() { if session.is_expired() {
trace!(id = id, "Session is now expired, dropping."); debug!(id = id, "Session expired, dropping");
// Wake up a waiter, as it might need to create a new session // Wake up a waiter, as it might need to create a new session
// if this was the last one. // if this was the last one.
self.notifier.notify_one(); self.notifier.notify_one();
@@ -171,10 +170,8 @@ impl TermPool {
let mut queue = self.sessions.lock().await; let mut queue = self.sessions.lock().await;
queue.push_back(session); queue.push_back(session);
let queue_size = queue.len();
drop(queue); // Release lock before notifying drop(queue); // Release lock before notifying
trace!(id = id, queue_size, "Session returned to pool");
self.notifier.notify_one(); self.notifier.notify_one();
} }
} }
@@ -204,22 +201,21 @@ impl SessionPool {
.or_insert_with(|| Arc::new(TermPool::new())) .or_insert_with(|| Arc::new(TermPool::new()))
.clone(); .clone();
let start = Instant::now();
let mut waited_for_creation = false;
loop { loop {
// Fast path: Try to get an existing, non-expired session. // Fast path: Try to get an existing, non-expired session.
{ {
let mut queue = term_pool.sessions.lock().await; let mut queue = term_pool.sessions.lock().await;
if let Some(session) = queue.pop_front() { if let Some(session) = queue.pop_front() {
if !session.is_expired() { if !session.is_expired() {
trace!(id = session.unique_session_id, "Reusing session from pool");
return Ok(PooledSession { return Ok(PooledSession {
session: Some(session), session: Some(session),
pool: Arc::clone(&term_pool), pool: Arc::clone(&term_pool),
}); });
} else { } else {
trace!( debug!(id = session.unique_session_id, "Discarded expired session");
id = session.unique_session_id,
"Popped an expired session, discarding."
);
} }
} }
} // MutexGuard is dropped, lock is released. } // MutexGuard is dropped, lock is released.
@@ -229,7 +225,10 @@ impl SessionPool {
if *is_creating_guard { if *is_creating_guard {
// Another task is already creating a session. Release the lock and wait. // Another task is already creating a session. Release the lock and wait.
drop(is_creating_guard); drop(is_creating_guard);
trace!("Another task is creating a session, waiting for notification..."); if !waited_for_creation {
trace!("Waiting for another task to create session");
waited_for_creation = true;
}
term_pool.notifier.notified().await; term_pool.notifier.notified().await;
// Loop back to the top to try the fast path again. // Loop back to the top to try the fast path again.
continue; continue;
@@ -240,12 +239,11 @@ impl SessionPool {
drop(is_creating_guard); drop(is_creating_guard);
// Race: wait for a session to be returned OR for the rate limiter to allow a new one. // Race: wait for a session to be returned OR for the rate limiter to allow a new one.
trace!("Pool empty, racing notifier vs rate limiter..."); trace!("Pool empty, creating new session");
tokio::select! { tokio::select! {
_ = term_pool.notifier.notified() => { _ = term_pool.notifier.notified() => {
// A session was returned while we were waiting! // A session was returned while we were waiting!
// We are no longer the creator. Reset the flag and loop to race for the new session. // We are no longer the creator. Reset the flag and loop to race for the new session.
trace!("Notified that a session was returned. Looping to retry.");
let mut guard = term_pool.is_creating.lock().await; let mut guard = term_pool.is_creating.lock().await;
*guard = false; *guard = false;
drop(guard); drop(guard);
@@ -253,7 +251,6 @@ impl SessionPool {
} }
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => { _ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
// The rate limit has elapsed. It's our job to create the session. // The rate limit has elapsed. It's our job to create the session.
trace!("Rate limiter ready. Proceeding to create a new session.");
let new_session_result = self.create_session(&term).await; let new_session_result = self.create_session(&term).await;
// After creation, we are no longer the creator. Reset the flag // After creation, we are no longer the creator. Reset the flag
@@ -265,7 +262,12 @@ impl SessionPool {
match new_session_result { match new_session_result {
Ok(new_session) => { Ok(new_session) => {
debug!(id = new_session.unique_session_id, "Successfully created new session"); let elapsed = start.elapsed();
debug!(
id = new_session.unique_session_id,
elapsed_ms = elapsed.as_millis(),
"Created new session"
);
return Ok(PooledSession { return Ok(PooledSession {
session: Some(new_session), session: Some(new_session),
pool: term_pool, pool: term_pool,
@@ -298,8 +300,12 @@ impl SessionPool {
.get_all("Set-Cookie") .get_all("Set-Cookie")
.iter() .iter()
.filter_map(|header_value| { .filter_map(|header_value| {
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) { if let Ok(cookie_str) = header_value.to_str() {
Some((cookie.name().to_string(), cookie.value().to_string())) if let Ok(cookie) = Cookie::parse(cookie_str) {
Some((cookie.name().to_string(), cookie.value().to_string()))
} else {
None
}
} else { } else {
None None
} }
@@ -310,16 +316,12 @@ impl SessionPool {
return Err(anyhow::anyhow!("Failed to get cookies")); return Err(anyhow::anyhow!("Failed to get cookies"));
} }
let jsessionid = cookies.get("JSESSIONID").unwrap(); let jsessionid = cookies.get("JSESSIONID")
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap(); .ok_or_else(|| anyhow::anyhow!("JSESSIONID cookie missing after validation"))?;
let ssb_cookie = cookies.get("SSB_COOKIE")
.ok_or_else(|| anyhow::anyhow!("SSB_COOKIE cookie missing after validation"))?;
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie); let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
trace!(
jsessionid = jsessionid,
ssb_cookie = ssb_cookie,
"New session cookies acquired"
);
self.http self.http
.get(format!("{}/selfServiceMenu/data", self.base_url)) .get(format!("{}/selfServiceMenu/data", self.base_url))
.header("Cookie", &cookie_header) .header("Cookie", &cookie_header)
@@ -435,8 +437,15 @@ impl SessionPool {
let redirect: RedirectResponse = response.json().await?; let redirect: RedirectResponse = response.json().await?;
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string(); let base_url_path = self.base_url.parse::<Url>()
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap(); .context("Failed to parse base URL")?
.path()
.to_string();
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path)
.ok_or_else(|| anyhow::anyhow!(
"Redirect URL '{}' does not start with expected prefix '{}'",
redirect.fwd_url, base_url_path
))?;
// Follow the redirect // Follow the redirect
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect); let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
@@ -454,7 +463,6 @@ impl SessionPool {
)); ));
} }
trace!(term = term, "successfully selected term");
Ok(()) Ok(())
} }
} }

138
src/data/batch.rs Normal file
View File

@@ -0,0 +1,138 @@
//! Batch database operations for improved performance.
use crate::banner::Course;
use crate::error::Result;
use sqlx::PgPool;
use std::time::Instant;
use tracing::info;
/// Batch upsert courses in a single database query.
///
/// This function performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses
/// in a single round-trip to the database, significantly reducing overhead compared
/// to individual inserts.
///
/// # Performance
/// - Reduces N database round-trips to 1
/// - Typical usage: 50-200 courses per batch
/// - PostgreSQL parameter limit: 65,535 (we use ~10 per course)
///
/// # Arguments
/// * `courses` - Slice of Course structs from the Banner API
/// * `db_pool` - PostgreSQL connection pool
///
/// # Returns
/// * `Ok(())` on success
/// * `Err(_)` if the database operation fails
///
/// # Example
/// ```no_run
/// use banner::data::batch::batch_upsert_courses;
/// use banner::banner::Course;
/// use sqlx::PgPool;
///
/// async fn example(courses: &[Course], pool: &PgPool) -> anyhow::Result<()> {
/// batch_upsert_courses(courses, pool).await?;
/// Ok(())
/// }
/// ```
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> {
// Early return for empty batches
if courses.is_empty() {
info!("No courses to upsert, skipping batch operation");
return Ok(());
}
let start = Instant::now();
let course_count = courses.len();
// Extract course fields into vectors for UNNEST
let crns: Vec<&str> = courses
.iter()
.map(|c| c.course_reference_number.as_str())
.collect();
let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect();
let course_numbers: Vec<&str> = courses
.iter()
.map(|c| c.course_number.as_str())
.collect();
let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect();
let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect();
let enrollments: Vec<i32> = courses.iter().map(|c| c.enrollment).collect();
let max_enrollments: Vec<i32> = courses.iter().map(|c| c.maximum_enrollment).collect();
let wait_counts: Vec<i32> = courses.iter().map(|c| c.wait_count).collect();
let wait_capacities: Vec<i32> = courses.iter().map(|c| c.wait_capacity).collect();
// Perform batch upsert using UNNEST for efficient bulk insertion
let result = sqlx::query(
r#"
INSERT INTO courses (
crn, subject, course_number, title, term_code,
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
)
SELECT * FROM UNNEST(
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
array_fill(NOW()::timestamptz, ARRAY[$10])
) AS t(
crn, subject, course_number, title, term_code,
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
)
ON CONFLICT (crn, term_code)
DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&crns)
.bind(&subjects)
.bind(&course_numbers)
.bind(&titles)
.bind(&term_codes)
.bind(&enrollments)
.bind(&max_enrollments)
.bind(&wait_counts)
.bind(&wait_capacities)
.bind(course_count as i32)
.execute(db_pool)
.await
.map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?;
let duration = start.elapsed();
info!(
courses_count = course_count,
rows_affected = result.rows_affected(),
duration_ms = duration.as_millis(),
"Batch upserted courses"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_batch_returns_ok() {
// This is a basic compile-time test
// Runtime tests would require sqlx::test macro and a test database
let courses: Vec<Course> = vec![];
assert_eq!(courses.len(), 0);
}
}

View File

@@ -1,3 +1,4 @@
//! Database models and schema. //! Database models and schema.
pub mod batch;
pub mod models; pub mod models;

View File

@@ -72,4 +72,8 @@ pub struct ScrapeJob {
pub execute_at: DateTime<Utc>, pub execute_at: DateTime<Utc>,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub locked_at: Option<DateTime<Utc>>, pub locked_at: Option<DateTime<Utc>>,
/// Number of retry attempts for this job (non-negative, enforced by CHECK constraint)
pub retry_count: i32,
/// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint)
pub max_retries: i32,
} }

View File

@@ -7,10 +7,13 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber};
/// Configure and initialize logging for the application /// Configure and initialize logging for the application
pub fn setup_logging(config: &Config, tracing_format: TracingFormat) { pub fn setup_logging(config: &Config, tracing_format: TracingFormat) {
// Configure logging based on config // Configure logging based on config
// Note: Even when base_level is trace or debug, we suppress trace logs from noisy
// infrastructure modules to keep output readable. These modules use debug for important
// events and trace only for very detailed debugging.
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let base_level = &config.log_level; let base_level = &config.log_level;
EnvFilter::new(format!( EnvFilter::new(format!(
"warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn", "warn,banner={},banner::rate_limiter=warn,banner::session=debug,banner::rate_limit_middleware=warn,banner::middleware=debug",
base_level base_level
)) ))
}); });

View File

@@ -1,10 +1,11 @@
use super::Job; use super::Job;
use crate::banner::{BannerApi, Course, SearchQuery, Term}; use crate::banner::{BannerApi, SearchQuery, Term};
use crate::data::batch::batch_upsert_courses;
use crate::data::models::TargetType; use crate::data::models::TargetType;
use crate::error::Result; use crate::error::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::PgPool; use sqlx::PgPool;
use tracing::{debug, info, trace}; use tracing::{debug, info};
/// Job implementation for scraping subject data /// Job implementation for scraping subject data
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -24,9 +25,9 @@ impl Job for SubjectJob {
TargetType::Subject TargetType::Subject
} }
#[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))]
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> { async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> {
let subject_code = &self.subject; let subject_code = &self.subject;
debug!(subject = subject_code, "Processing subject job");
// Get the current term // Get the current term
let term = Term::get_current().inner().to_string(); let term = Term::get_current().inner().to_string();
@@ -42,9 +43,7 @@ impl Job for SubjectJob {
count = courses_from_api.len(), count = courses_from_api.len(),
"Found courses" "Found courses"
); );
for course in courses_from_api { batch_upsert_courses(&courses_from_api, db_pool).await?;
self.upsert_course(&course, db_pool).await?;
}
} }
debug!(subject = subject_code, "Subject job completed"); debug!(subject = subject_code, "Subject job completed");
@@ -55,39 +54,3 @@ impl Job for SubjectJob {
format!("Scrape subject: {}", self.subject) format!("Scrape subject: {}", self.subject)
} }
} }
impl SubjectJob {
async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> {
sqlx::query(
r#"
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (crn, term_code) DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&course.course_reference_number)
.bind(&course.subject)
.bind(&course.course_number)
.bind(&course.course_title)
.bind(&course.term)
.bind(course.enrollment)
.bind(course.maximum_enrollment)
.bind(course.wait_count)
.bind(course.wait_capacity)
.bind(chrono::Utc::now())
.execute(db_pool)
.await
.map(|result| {
trace!(subject = course.subject, crn = course.course_reference_number, result = ?result, "Course upserted");
})
.map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}"))
}
}

View File

@@ -9,7 +9,7 @@ use std::time::Duration;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::time; use tokio::time;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, warn};
/// Periodically analyzes data and enqueues prioritized scrape jobs. /// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler { pub struct Scheduler {
@@ -99,6 +99,7 @@ impl Scheduler {
/// 3. Create jobs only for subjects that don't have pending jobs /// 3. Create jobs only for subjects that don't have pending jobs
/// ///
/// This is a static method (not &self) to allow it to be called from spawned tasks. /// This is a static method (not &self) to allow it to be called from spawned tasks.
#[tracing::instrument(skip_all, fields(term))]
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> { async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy: // For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API. // 1. Get a list of all subjects from the Banner API.
@@ -106,6 +107,7 @@ impl Scheduler {
// 3. Create new jobs only for subjects that don't have existing jobs. // 3. Create new jobs only for subjects that don't have existing jobs.
let term = Term::get_current().inner().to_string(); let term = Term::get_current().inner().to_string();
tracing::Span::current().record("term", term.as_str());
debug!(term = term, "Enqueuing subject jobs"); debug!(term = term, "Enqueuing subject jobs");
let subjects = banner_api.get_subjects("", &term, 1, 500).await?; let subjects = banner_api.get_subjects("", &term, 1, 500).await?;
@@ -137,6 +139,7 @@ impl Scheduler {
.collect(); .collect();
// Filter out subjects that already have jobs and prepare new jobs // Filter out subjects that already have jobs and prepare new jobs
let mut skipped_count = 0;
let new_jobs: Vec<_> = subjects let new_jobs: Vec<_> = subjects
.into_iter() .into_iter()
.filter_map(|subject| { .filter_map(|subject| {
@@ -145,7 +148,7 @@ impl Scheduler {
let payload_str = payload.to_string(); let payload_str = payload.to_string();
if existing_payloads.contains(&payload_str) { if existing_payloads.contains(&payload_str) {
trace!(subject = subject.code, "Job already exists, skipping"); skipped_count += 1;
None None
} else { } else {
Some((payload, subject.code)) Some((payload, subject.code))
@@ -153,6 +156,10 @@ impl Scheduler {
}) })
.collect(); .collect();
if skipped_count > 0 {
debug!(count = skipped_count, "Skipped subjects with existing jobs");
}
// Insert all new jobs in a single batch // Insert all new jobs in a single batch
if !new_jobs.is_empty() { if !new_jobs.is_empty() {
let now = chrono::Utc::now(); let now = chrono::Utc::now();

View File

@@ -7,7 +7,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::time; use tokio::time;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn, Instrument};
/// A single worker instance. /// A single worker instance.
/// ///
@@ -57,7 +57,9 @@ impl Worker {
}; };
let job_id = job.id; let job_id = job.id;
debug!(worker_id = self.id, job_id, "Processing job"); let retry_count = job.retry_count;
let max_retries = job.max_retries;
let start = std::time::Instant::now();
// Process the job, racing against shutdown signal // Process the job, racing against shutdown signal
let process_result = tokio::select! { let process_result = tokio::select! {
@@ -68,8 +70,10 @@ impl Worker {
result = self.process_job(job) => result result = self.process_job(job) => result
}; };
let duration = start.elapsed();
// Handle the job processing result // Handle the job processing result
self.handle_job_result(job_id, process_result).await; self.handle_job_result(job_id, retry_count, max_retries, process_result, duration).await;
} }
} }
@@ -106,20 +110,31 @@ impl Worker {
// Get the job implementation // Get the job implementation
let job_impl = job_type.boxed(); let job_impl = job_type.boxed();
debug!( // Create span with job context
worker_id = self.id, let span = tracing::debug_span!(
"process_job",
job_id = job.id, job_id = job.id,
description = job_impl.description(), job_type = job_impl.description()
"Processing job"
); );
// Process the job - API errors are recoverable async move {
job_impl debug!(
.process(&self.banner_api, &self.db_pool) worker_id = self.id,
.await job_id = job.id,
.map_err(JobError::Recoverable)?; description = job_impl.description(),
"Processing job"
);
Ok(()) // Process the job - API errors are recoverable
job_impl
.process(&self.banner_api, &self.db_pool)
.await
.map_err(JobError::Recoverable)?;
Ok(())
}
.instrument(span)
.await
} }
async fn delete_job(&self, job_id: i32) -> Result<()> { async fn delete_job(&self, job_id: i32) -> Result<()> {
@@ -135,10 +150,24 @@ impl Worker {
.bind(job_id) .bind(job_id)
.execute(&self.db_pool) .execute(&self.db_pool)
.await?; .await?;
info!(worker_id = self.id, job_id, "Job unlocked for retry");
Ok(()) Ok(())
} }
async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result<bool> {
let result = sqlx::query_scalar::<_, Option<i32>>(
"UPDATE scrape_jobs
SET locked_at = NULL, retry_count = retry_count + 1
WHERE id = $1
RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END"
)
.bind(job_id)
.bind(max_retries)
.fetch_one(&self.db_pool)
.await?;
Ok(result.is_some())
}
/// Handle shutdown signal received during job processing /// Handle shutdown signal received during job processing
async fn handle_shutdown_during_processing(&self, job_id: i32) { async fn handle_shutdown_during_processing(&self, job_id: i32) {
info!(worker_id = self.id, job_id, "Shutdown received during job processing"); info!(worker_id = self.id, job_id, "Shutdown received during job processing");
@@ -158,19 +187,30 @@ impl Worker {
} }
/// Handle the result of job processing /// Handle the result of job processing
async fn handle_job_result(&self, job_id: i32, result: Result<(), JobError>) { async fn handle_job_result(&self, job_id: i32, retry_count: i32, max_retries: i32, result: Result<(), JobError>, duration: std::time::Duration) {
match result { match result {
Ok(()) => { Ok(()) => {
debug!(worker_id = self.id, job_id, "Job completed successfully"); debug!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
"Job completed successfully"
);
if let Err(e) = self.delete_job(job_id).await { if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job"); error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
} }
} }
Err(JobError::Recoverable(e)) => { Err(JobError::Recoverable(e)) => {
self.handle_recoverable_error(job_id, e).await; self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration).await;
} }
Err(JobError::Unrecoverable(e)) => { Err(JobError::Unrecoverable(e)) => {
error!(worker_id = self.id, job_id, error = ?e, "Job corrupted, deleting"); error!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
error = ?e,
"Job corrupted, deleting"
);
if let Err(e) = self.delete_job(job_id).await { if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job"); error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
} }
@@ -179,18 +219,63 @@ impl Worker {
} }
/// Handle recoverable errors by logging appropriately and unlocking the job /// Handle recoverable errors by logging appropriately and unlocking the job
async fn handle_recoverable_error(&self, job_id: i32, e: anyhow::Error) { async fn handle_recoverable_error(&self, job_id: i32, retry_count: i32, max_retries: i32, e: anyhow::Error, duration: std::time::Duration) {
let next_attempt = retry_count.saturating_add(1);
let remaining_retries = max_retries.saturating_sub(next_attempt);
// Log the error appropriately based on type
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() { if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
warn!( warn!(
worker_id = self.id, worker_id = self.id,
job_id, "Invalid session detected, forcing session refresh" job_id,
duration_ms = duration.as_millis(),
retry_attempt = next_attempt,
max_retries = max_retries,
remaining_retries = remaining_retries,
"Invalid session detected, will retry"
); );
} else { } else {
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job"); error!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
retry_attempt = next_attempt,
max_retries = max_retries,
remaining_retries = remaining_retries,
error = ?e,
"Failed to process job, will retry"
);
} }
if let Err(e) = self.unlock_job(job_id).await { // Atomically unlock and increment retry count, checking if retry is allowed
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock job"); match self.unlock_and_increment_retry(job_id, max_retries).await {
Ok(can_retry) if can_retry => {
info!(
worker_id = self.id,
job_id,
retry_attempt = next_attempt,
remaining_retries = remaining_retries,
"Job unlocked for retry"
);
}
Ok(_) => {
// Max retries exceeded (detected atomically)
error!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
retry_count = next_attempt,
max_retries = max_retries,
error = ?e,
"Job failed permanently (max retries exceeded), deleting"
);
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job");
}
}
Err(e) => {
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count");
}
} }
} }
} }