mirror of
https://github.com/Xevion/banner.git
synced 2025-12-06 05:14:26 -06:00
Compare commits
4 Commits
b1ed2434f8
...
3292d35521
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3292d35521 | ||
|
|
71ac0782d0 | ||
|
|
1c6d2d4b6e | ||
|
|
51f8256e61 |
@@ -59,6 +59,7 @@ RUN cargo build --release
|
|||||||
# Copy source code
|
# Copy source code
|
||||||
RUN rm src/*.rs
|
RUN rm src/*.rs
|
||||||
COPY ./src ./src/
|
COPY ./src ./src/
|
||||||
|
COPY ./migrations ./migrations/
|
||||||
|
|
||||||
# Copy built frontend assets
|
# Copy built frontend assets
|
||||||
COPY --from=frontend-builder /app/dist ./web/dist
|
COPY --from=frontend-builder /app/dist ./web/dist
|
||||||
|
|||||||
3
migrations/20251103093649_add_retry_tracking.sql
Normal file
3
migrations/20251103093649_add_retry_tracking.sql
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
-- Add retry tracking columns to scrape_jobs table
|
||||||
|
ALTER TABLE scrape_jobs ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0);
|
||||||
|
ALTER TABLE scrape_jobs ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 5 CHECK (max_retries >= 0);
|
||||||
45
migrations/20251103104300_add_performance_indexes.sql
Normal file
45
migrations/20251103104300_add_performance_indexes.sql
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
-- Performance optimization indexes
|
||||||
|
|
||||||
|
-- Index for term-based queries (most common access pattern)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_courses_term_code ON courses(term_code);
|
||||||
|
|
||||||
|
-- Index for subject-based filtering
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_courses_subject ON courses(subject);
|
||||||
|
|
||||||
|
-- Composite index for subject + term queries
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_courses_subject_term ON courses(subject, term_code);
|
||||||
|
|
||||||
|
-- Index for course number lookups
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_courses_course_number ON courses(course_number);
|
||||||
|
|
||||||
|
-- Index for last scraped timestamp (useful for finding stale data)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_courses_last_scraped ON courses(last_scraped_at);
|
||||||
|
|
||||||
|
-- Index for course metrics time-series queries
|
||||||
|
-- BRIN index is optimal for time-series data
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_course_metrics_timestamp ON course_metrics USING BRIN(timestamp);
|
||||||
|
|
||||||
|
-- B-tree index for specific course metric lookups
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_course_metrics_course_timestamp
|
||||||
|
ON course_metrics(course_id, timestamp DESC);
|
||||||
|
|
||||||
|
-- Partial index for pending scrape jobs (only unlocked jobs)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_pending
|
||||||
|
ON scrape_jobs(execute_at ASC)
|
||||||
|
WHERE locked_at IS NULL;
|
||||||
|
|
||||||
|
-- Index for high-priority job processing
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_priority_pending
|
||||||
|
ON scrape_jobs(priority DESC, execute_at ASC)
|
||||||
|
WHERE locked_at IS NULL;
|
||||||
|
|
||||||
|
-- Index for retry tracking
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_retry_count
|
||||||
|
ON scrape_jobs(retry_count)
|
||||||
|
WHERE retry_count > 0 AND locked_at IS NULL;
|
||||||
|
|
||||||
|
-- Analyze tables to update statistics
|
||||||
|
ANALYZE courses;
|
||||||
|
ANALYZE course_metrics;
|
||||||
|
ANALYZE course_audits;
|
||||||
|
ANALYZE scrape_jobs;
|
||||||
53
migrations/20251103104400_optimize_indexes.sql
Normal file
53
migrations/20251103104400_optimize_indexes.sql
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
-- Index Optimization Follow-up Migration
|
||||||
|
|
||||||
|
-- Reason: Redundant with composite index idx_courses_subject_term
|
||||||
|
DROP INDEX IF EXISTS idx_courses_subject;
|
||||||
|
|
||||||
|
-- Remove: idx_scrape_jobs_retry_count
|
||||||
|
DROP INDEX IF EXISTS idx_scrape_jobs_retry_count;
|
||||||
|
|
||||||
|
-- Purpose: Optimize the scheduler's frequent query (runs every 60 seconds)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_scheduler_lookup
|
||||||
|
ON scrape_jobs(target_type, target_payload)
|
||||||
|
WHERE locked_at IS NULL;
|
||||||
|
|
||||||
|
-- Note: We use (target_type, target_payload) instead of including locked_at
|
||||||
|
-- in the index columns because:
|
||||||
|
-- 1. The WHERE clause filters locked_at IS NULL (partial index optimization)
|
||||||
|
-- 2. target_payload is JSONB and already large; keeping it as an indexed column
|
||||||
|
-- allows PostgreSQL to use index-only scans for the SELECT target_payload query
|
||||||
|
-- 3. This design minimizes index size while maximizing query performance
|
||||||
|
|
||||||
|
|
||||||
|
-- Purpose: Enable efficient audit trail queries by course
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_course_audits_course_timestamp
|
||||||
|
ON course_audits(course_id, timestamp DESC);
|
||||||
|
|
||||||
|
-- Purpose: Enable queries like "Show all changes in the last 24 hours"
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_course_audits_timestamp
|
||||||
|
ON course_audits(timestamp DESC);
|
||||||
|
|
||||||
|
|
||||||
|
-- The BRIN index on course_metrics(timestamp) assumes data is inserted in
|
||||||
|
-- chronological order. BRIN indexes are only effective when data is physically
|
||||||
|
-- ordered on disk. If you perform:
|
||||||
|
-- - Backfills of historical data
|
||||||
|
-- - Out-of-order inserts
|
||||||
|
-- - Frequent UPDATEs that move rows
|
||||||
|
--
|
||||||
|
-- Then the BRIN index effectiveness will degrade. Monitor with:
|
||||||
|
-- SELECT * FROM brin_page_items(get_raw_page('idx_course_metrics_timestamp', 1));
|
||||||
|
--
|
||||||
|
-- If you see poor selectivity, consider:
|
||||||
|
-- 1. REINDEX to rebuild after bulk loads
|
||||||
|
-- 2. Switch to B-tree if inserts are not time-ordered
|
||||||
|
-- 3. Use CLUSTER to physically reorder the table (requires downtime)
|
||||||
|
|
||||||
|
COMMENT ON INDEX idx_course_metrics_timestamp IS
|
||||||
|
'BRIN index - requires chronologically ordered inserts for efficiency. Monitor selectivity.';
|
||||||
|
|
||||||
|
-- Update statistics for query planner
|
||||||
|
ANALYZE courses;
|
||||||
|
ANALYZE course_metrics;
|
||||||
|
ANALYZE course_audits;
|
||||||
|
ANALYZE scrape_jobs;
|
||||||
@@ -62,6 +62,14 @@ impl App {
|
|||||||
"database pool established"
|
"database pool established"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Run database migrations
|
||||||
|
info!("Running database migrations...");
|
||||||
|
sqlx::migrate!("./migrations")
|
||||||
|
.run(&db_pool)
|
||||||
|
.await
|
||||||
|
.expect("Failed to run database migrations");
|
||||||
|
info!("Database migrations completed successfully");
|
||||||
|
|
||||||
// Create BannerApi and AppState
|
// Create BannerApi and AppState
|
||||||
let banner_api = BannerApi::new_with_config(
|
let banner_api = BannerApi::new_with_config(
|
||||||
config.banner_base_url.clone(),
|
config.banner_base_url.clone(),
|
||||||
|
|||||||
@@ -152,6 +152,13 @@ impl BannerApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Performs a course search and handles common response processing.
|
/// Performs a course search and handles common response processing.
|
||||||
|
#[tracing::instrument(
|
||||||
|
skip(self, query),
|
||||||
|
fields(
|
||||||
|
term = %term,
|
||||||
|
subject = %query.get_subject().unwrap_or(&"all".to_string())
|
||||||
|
)
|
||||||
|
)]
|
||||||
async fn perform_search(
|
async fn perform_search(
|
||||||
&self,
|
&self,
|
||||||
term: &str,
|
term: &str,
|
||||||
@@ -318,12 +325,6 @@ impl BannerApi {
|
|||||||
sort: &str,
|
sort: &str,
|
||||||
sort_descending: bool,
|
sort_descending: bool,
|
||||||
) -> Result<SearchResult, BannerApiError> {
|
) -> Result<SearchResult, BannerApiError> {
|
||||||
debug!(
|
|
||||||
term = term,
|
|
||||||
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
|
|
||||||
max_results = query.get_max_results(),
|
|
||||||
"Starting course search"
|
|
||||||
);
|
|
||||||
self.perform_search(term, query, sort, sort_descending)
|
self.perform_search(term, query, sort, sort_descending)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,14 @@
|
|||||||
//! JSON parsing utilities for the Banner API client.
|
//! JSON parsing utilities for the Banner API client.
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use serde_json;
|
use serde_json::{self, Value};
|
||||||
|
|
||||||
/// Attempt to parse JSON and, on failure, include a contextual snippet of the
|
/// Attempt to parse JSON and, on failure, include a contextual snippet of the
|
||||||
/// line where the error occurred. This prevents dumping huge JSON bodies to logs.
|
/// line where the error occurred.
|
||||||
|
///
|
||||||
|
/// In debug builds, this provides detailed context including the full JSON object
|
||||||
|
/// containing the error and type mismatch information. In release builds, it shows
|
||||||
|
/// a minimal snippet to prevent dumping huge JSON bodies to production logs.
|
||||||
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
|
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
|
||||||
let jd = &mut serde_json::Deserializer::from_str(body);
|
let jd = &mut serde_json::Deserializer::from_str(body);
|
||||||
match serde_path_to_error::deserialize(jd) {
|
match serde_path_to_error::deserialize(jd) {
|
||||||
@@ -12,27 +16,244 @@ pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Re
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
let inner_err = err.inner();
|
let inner_err = err.inner();
|
||||||
let (line, column) = (inner_err.line(), inner_err.column());
|
let (line, column) = (inner_err.line(), inner_err.column());
|
||||||
let snippet = build_error_snippet(body, line, column, 20);
|
|
||||||
let path = err.path().to_string();
|
let path = err.path().to_string();
|
||||||
|
|
||||||
let msg = inner_err.to_string();
|
let msg = inner_err.to_string();
|
||||||
let loc = format!(" at line {line} column {column}");
|
let loc = format!(" at line {line} column {column}");
|
||||||
let msg_without_loc = msg.strip_suffix(&loc).unwrap_or(&msg).to_string();
|
let msg_without_loc = msg.strip_suffix(&loc).unwrap_or(&msg).to_string();
|
||||||
|
|
||||||
let mut final_err = String::new();
|
// Build error message differently for debug vs release builds
|
||||||
if !path.is_empty() && path != "." {
|
let final_err = if cfg!(debug_assertions) {
|
||||||
final_err.push_str(&format!("for path '{}' ", path));
|
// Debug mode: provide detailed context
|
||||||
}
|
let type_info = parse_type_mismatch(&msg_without_loc);
|
||||||
final_err.push_str(&format!(
|
let context = extract_json_object_at_path(body, err.path(), line, column);
|
||||||
"({msg_without_loc}) at line {line} column {column}"
|
|
||||||
));
|
let mut err_msg = String::new();
|
||||||
final_err.push_str(&format!("\n{snippet}"));
|
if !path.is_empty() && path != "." {
|
||||||
|
err_msg.push_str(&format!("for path '{}'\n", path));
|
||||||
|
}
|
||||||
|
err_msg.push_str(&format!("({}) at line {} column {}\n\n", type_info, line, column));
|
||||||
|
err_msg.push_str(&context);
|
||||||
|
|
||||||
|
err_msg
|
||||||
|
} else {
|
||||||
|
// Release mode: minimal snippet to keep logs concise
|
||||||
|
let snippet = build_error_snippet(body, line, column, 20);
|
||||||
|
|
||||||
|
let mut err_msg = String::new();
|
||||||
|
if !path.is_empty() && path != "." {
|
||||||
|
err_msg.push_str(&format!("for path '{}' ", path));
|
||||||
|
}
|
||||||
|
err_msg.push_str(&format!(
|
||||||
|
"({}) at line {} column {}",
|
||||||
|
msg_without_loc, line, column
|
||||||
|
));
|
||||||
|
err_msg.push_str(&format!("\n{}", snippet));
|
||||||
|
|
||||||
|
err_msg
|
||||||
|
};
|
||||||
|
|
||||||
Err(anyhow::anyhow!(final_err))
|
Err(anyhow::anyhow!(final_err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extract type mismatch information from a serde error message.
|
||||||
|
///
|
||||||
|
/// Parses error messages like "invalid type: null, expected a string" to extract
|
||||||
|
/// the expected and actual types for clearer error reporting.
|
||||||
|
///
|
||||||
|
/// Returns a formatted string like "(expected a string, got null)" or the original
|
||||||
|
/// message if parsing fails.
|
||||||
|
fn parse_type_mismatch(error_msg: &str) -> String {
|
||||||
|
// Try to parse "invalid type: X, expected Y" format
|
||||||
|
if let Some(invalid_start) = error_msg.find("invalid type: ") {
|
||||||
|
let after_prefix = &error_msg[invalid_start + "invalid type: ".len()..];
|
||||||
|
|
||||||
|
if let Some(comma_pos) = after_prefix.find(", expected ") {
|
||||||
|
let actual_type = &after_prefix[..comma_pos];
|
||||||
|
let expected_part = &after_prefix[comma_pos + ", expected ".len()..];
|
||||||
|
|
||||||
|
// Clean up expected part (remove " at line X column Y" if present)
|
||||||
|
let expected_type = expected_part
|
||||||
|
.split(" at line ")
|
||||||
|
.next()
|
||||||
|
.unwrap_or(expected_part)
|
||||||
|
.trim();
|
||||||
|
|
||||||
|
return format!("expected {}, got {}", expected_type, actual_type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to parse "expected X at line Y" format
|
||||||
|
if error_msg.starts_with("expected ") {
|
||||||
|
if let Some(expected_part) = error_msg.split(" at line ").next() {
|
||||||
|
return expected_part.to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: return original message without location info
|
||||||
|
error_msg.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract and pretty-print the JSON object/array containing the parse error.
|
||||||
|
///
|
||||||
|
/// This function navigates to the error location using the serde path and extracts
|
||||||
|
/// the parent object or array to provide better context for debugging.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `body` - The raw JSON string
|
||||||
|
/// * `path` - The serde path to the error (e.g., "data[0].faculty[0].displayName")
|
||||||
|
/// * `line` - Line number of the error (for fallback)
|
||||||
|
/// * `column` - Column number of the error (for fallback)
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// A formatted string containing the JSON object with the error, or a fallback snippet
|
||||||
|
fn extract_json_object_at_path(
|
||||||
|
body: &str,
|
||||||
|
path: &serde_path_to_error::Path,
|
||||||
|
line: usize,
|
||||||
|
column: usize,
|
||||||
|
) -> String {
|
||||||
|
// Try to parse the entire JSON structure
|
||||||
|
let root_value: Value = match serde_json::from_str(body) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(_) => {
|
||||||
|
// If we can't parse the JSON at all, fall back to line snippet
|
||||||
|
return build_error_snippet(body, line, column, 20);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Navigate to the error location using the path
|
||||||
|
let path_str = path.to_string();
|
||||||
|
let segments = parse_path_segments(&path_str);
|
||||||
|
|
||||||
|
let (context_value, context_name) = navigate_to_context(&root_value, &segments);
|
||||||
|
|
||||||
|
// Pretty-print the context value with limited depth to avoid huge output
|
||||||
|
match serde_json::to_string_pretty(&context_value) {
|
||||||
|
Ok(pretty) => {
|
||||||
|
// Limit output to ~50 lines to prevent log spam
|
||||||
|
let lines: Vec<&str> = pretty.lines().collect();
|
||||||
|
let truncated = if lines.len() > 50 {
|
||||||
|
let mut result = lines[..47].join("\n");
|
||||||
|
result.push_str("\n ... (truncated, ");
|
||||||
|
result.push_str(&(lines.len() - 47).to_string());
|
||||||
|
result.push_str(" more lines)");
|
||||||
|
result
|
||||||
|
} else {
|
||||||
|
pretty
|
||||||
|
};
|
||||||
|
|
||||||
|
format!("{} at '{}':\n{}", context_name, path_str, truncated)
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Fallback to simple snippet if pretty-print fails
|
||||||
|
build_error_snippet(body, line, column, 20)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse a JSON path string into segments for navigation.
|
||||||
|
///
|
||||||
|
/// Converts paths like "data[0].faculty[1].displayName" into a sequence of
|
||||||
|
/// object keys and array indices.
|
||||||
|
fn parse_path_segments(path: &str) -> Vec<PathSegment> {
|
||||||
|
let mut segments = Vec::new();
|
||||||
|
let mut current = String::new();
|
||||||
|
let mut in_bracket = false;
|
||||||
|
|
||||||
|
for ch in path.chars() {
|
||||||
|
match ch {
|
||||||
|
'.' if !in_bracket => {
|
||||||
|
if !current.is_empty() {
|
||||||
|
segments.push(PathSegment::Key(current.clone()));
|
||||||
|
current.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
'[' => {
|
||||||
|
if !current.is_empty() {
|
||||||
|
segments.push(PathSegment::Key(current.clone()));
|
||||||
|
current.clear();
|
||||||
|
}
|
||||||
|
in_bracket = true;
|
||||||
|
}
|
||||||
|
']' => {
|
||||||
|
if in_bracket && !current.is_empty() {
|
||||||
|
if let Ok(index) = current.parse::<usize>() {
|
||||||
|
segments.push(PathSegment::Index(index));
|
||||||
|
}
|
||||||
|
current.clear();
|
||||||
|
}
|
||||||
|
in_bracket = false;
|
||||||
|
}
|
||||||
|
_ => current.push(ch),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !current.is_empty() {
|
||||||
|
segments.push(PathSegment::Key(current));
|
||||||
|
}
|
||||||
|
|
||||||
|
segments
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents a segment in a JSON path (either an object key or array index).
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum PathSegment {
|
||||||
|
Key(String),
|
||||||
|
Index(usize),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Navigate through a JSON value using path segments and return the appropriate context.
|
||||||
|
///
|
||||||
|
/// This function walks the JSON structure and returns the parent object/array that
|
||||||
|
/// contains the error, providing meaningful context for debugging.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// A tuple of (context_value, description) where context_value is the JSON to display
|
||||||
|
/// and description is a human-readable name for what we're showing.
|
||||||
|
fn navigate_to_context<'a>(
|
||||||
|
mut current: &'a Value,
|
||||||
|
segments: &[PathSegment],
|
||||||
|
) -> (&'a Value, &'static str) {
|
||||||
|
// If path is empty or just root, return the whole value
|
||||||
|
if segments.is_empty() {
|
||||||
|
return (current, "Root object");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to navigate to the parent of the error location
|
||||||
|
// We want to show the containing object/array, not just the failing field
|
||||||
|
let parent_depth = segments.len().saturating_sub(1);
|
||||||
|
|
||||||
|
for (i, segment) in segments.iter().enumerate() {
|
||||||
|
// Stop one level before the end to show the parent context
|
||||||
|
if i >= parent_depth {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
match segment {
|
||||||
|
PathSegment::Key(key) => {
|
||||||
|
if let Some(next) = current.get(key) {
|
||||||
|
current = next;
|
||||||
|
} else {
|
||||||
|
// Can't navigate further, return what we have
|
||||||
|
return (current, "Partial context (navigation stopped)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PathSegment::Index(idx) => {
|
||||||
|
if let Some(next) = current.get(idx) {
|
||||||
|
current = next;
|
||||||
|
} else {
|
||||||
|
return (current, "Partial context (index out of bounds)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(current, "Object containing error")
|
||||||
|
}
|
||||||
|
|
||||||
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
|
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
|
||||||
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
|
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
|
||||||
if target_line.is_empty() {
|
if target_line.is_empty() {
|
||||||
@@ -53,3 +274,139 @@ fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usiz
|
|||||||
|
|
||||||
format!("...{slice}...\n {indicator}")
|
format!("...{slice}...\n {indicator}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_type_mismatch_invalid_type() {
|
||||||
|
let msg = "invalid type: null, expected a string at line 45 column 29";
|
||||||
|
let result = parse_type_mismatch(msg);
|
||||||
|
assert_eq!(result, "expected a string, got null");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_type_mismatch_expected() {
|
||||||
|
let msg = "expected value at line 1 column 1";
|
||||||
|
let result = parse_type_mismatch(msg);
|
||||||
|
assert_eq!(result, "expected value");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_path_segments_simple() {
|
||||||
|
let segments = parse_path_segments("data.name");
|
||||||
|
assert_eq!(segments.len(), 2);
|
||||||
|
match &segments[0] {
|
||||||
|
PathSegment::Key(k) => assert_eq!(k, "data"),
|
||||||
|
_ => panic!("Expected Key segment"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_path_segments_with_array() {
|
||||||
|
let segments = parse_path_segments("data[0].faculty[1].displayName");
|
||||||
|
assert_eq!(segments.len(), 5);
|
||||||
|
match &segments[0] {
|
||||||
|
PathSegment::Key(k) => assert_eq!(k, "data"),
|
||||||
|
_ => panic!("Expected Key segment"),
|
||||||
|
}
|
||||||
|
match &segments[1] {
|
||||||
|
PathSegment::Index(i) => assert_eq!(*i, 0),
|
||||||
|
_ => panic!("Expected Index segment"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_json_with_context_null_value() {
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct TestStruct {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
let json = r#"{"name": null}"#;
|
||||||
|
let result: Result<TestStruct> = parse_json_with_context(json);
|
||||||
|
|
||||||
|
assert!(result.is_err());
|
||||||
|
let err_msg = result.unwrap_err().to_string();
|
||||||
|
|
||||||
|
// Should contain path info
|
||||||
|
assert!(err_msg.contains("name"));
|
||||||
|
|
||||||
|
// In debug mode, should contain detailed context
|
||||||
|
if cfg!(debug_assertions) {
|
||||||
|
assert!(err_msg.contains("expected"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_navigate_to_context() {
|
||||||
|
let json = r#"{"data": [{"faculty": [{"name": "John"}]}]}"#;
|
||||||
|
let value: Value = serde_json::from_str(json).unwrap();
|
||||||
|
|
||||||
|
let segments = parse_path_segments("data[0].faculty[0].name");
|
||||||
|
let (context, _) = navigate_to_context(&value, &segments);
|
||||||
|
|
||||||
|
// Should return the faculty[0] object (parent of 'name')
|
||||||
|
assert!(context.is_object());
|
||||||
|
assert!(context.get("name").is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_realistic_banner_error() {
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct Course {
|
||||||
|
#[allow(dead_code)]
|
||||||
|
#[serde(rename = "courseTitle")]
|
||||||
|
course_title: String,
|
||||||
|
faculty: Vec<Faculty>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct Faculty {
|
||||||
|
#[serde(rename = "displayName")]
|
||||||
|
display_name: String,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
email: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct SearchResult {
|
||||||
|
data: Vec<Course>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate Banner API response with null faculty displayName
|
||||||
|
// This mimics the actual error from SPN subject scrape
|
||||||
|
let json = r#"{
|
||||||
|
"data": [
|
||||||
|
{
|
||||||
|
"courseTitle": "Spanish Conversation",
|
||||||
|
"faculty": [
|
||||||
|
{
|
||||||
|
"displayName": null,
|
||||||
|
"email": "instructor@utsa.edu"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}"#;
|
||||||
|
|
||||||
|
let result: Result<SearchResult> = parse_json_with_context(json);
|
||||||
|
assert!(result.is_err());
|
||||||
|
|
||||||
|
let err_msg = result.unwrap_err().to_string();
|
||||||
|
println!("\n=== Error output in debug mode ===\n{}\n", err_msg);
|
||||||
|
|
||||||
|
// Verify error contains key information
|
||||||
|
assert!(err_msg.contains("data[0].faculty[0].displayName"));
|
||||||
|
|
||||||
|
// In debug mode, should show detailed context
|
||||||
|
if cfg!(debug_assertions) {
|
||||||
|
// Should show type mismatch info
|
||||||
|
assert!(err_msg.contains("expected") && err_msg.contains("got"));
|
||||||
|
// Should show surrounding JSON context with the faculty object
|
||||||
|
assert!(err_msg.contains("email"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,10 +3,13 @@
|
|||||||
use http::Extensions;
|
use http::Extensions;
|
||||||
use reqwest::{Request, Response};
|
use reqwest::{Request, Response};
|
||||||
use reqwest_middleware::{Middleware, Next};
|
use reqwest_middleware::{Middleware, Next};
|
||||||
use tracing::{trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
pub struct TransparentMiddleware;
|
pub struct TransparentMiddleware;
|
||||||
|
|
||||||
|
/// Threshold for logging slow requests at DEBUG level (in milliseconds)
|
||||||
|
const SLOW_REQUEST_THRESHOLD_MS: u128 = 1000;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Middleware for TransparentMiddleware {
|
impl Middleware for TransparentMiddleware {
|
||||||
async fn handle(
|
async fn handle(
|
||||||
@@ -15,33 +18,56 @@ impl Middleware for TransparentMiddleware {
|
|||||||
extensions: &mut Extensions,
|
extensions: &mut Extensions,
|
||||||
next: Next<'_>,
|
next: Next<'_>,
|
||||||
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
||||||
trace!(
|
let method = req.method().to_string();
|
||||||
domain = req.url().domain(),
|
let path = req.url().path().to_string();
|
||||||
headers = ?req.headers(),
|
|
||||||
"{method} {path}",
|
let start = std::time::Instant::now();
|
||||||
method = req.method().to_string(),
|
|
||||||
path = req.url().path(),
|
|
||||||
);
|
|
||||||
let response_result = next.run(req, extensions).await;
|
let response_result = next.run(req, extensions).await;
|
||||||
|
let duration = start.elapsed();
|
||||||
|
|
||||||
match response_result {
|
match response_result {
|
||||||
Ok(response) => {
|
Ok(response) => {
|
||||||
if response.status().is_success() {
|
if response.status().is_success() {
|
||||||
trace!(
|
let duration_ms = duration.as_millis();
|
||||||
"{code} {reason} {path}",
|
if duration_ms >= SLOW_REQUEST_THRESHOLD_MS {
|
||||||
code = response.status().as_u16(),
|
debug!(
|
||||||
reason = response.status().canonical_reason().unwrap_or("??"),
|
method = method,
|
||||||
path = response.url().path(),
|
path = path,
|
||||||
);
|
status = response.status().as_u16(),
|
||||||
|
duration_ms = duration_ms,
|
||||||
|
"Request completed (slow)"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
trace!(
|
||||||
|
method = method,
|
||||||
|
path = path,
|
||||||
|
status = response.status().as_u16(),
|
||||||
|
duration_ms = duration_ms,
|
||||||
|
"Request completed"
|
||||||
|
);
|
||||||
|
}
|
||||||
Ok(response)
|
Ok(response)
|
||||||
} else {
|
} else {
|
||||||
let e = response.error_for_status_ref().unwrap_err();
|
let e = response.error_for_status_ref().unwrap_err();
|
||||||
warn!(error = ?e, "Request failed (server)");
|
warn!(
|
||||||
|
method = method,
|
||||||
|
path = path,
|
||||||
|
error = ?e,
|
||||||
|
status = response.status().as_u16(),
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
"Request failed"
|
||||||
|
);
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
warn!(error = ?error, "Request failed (middleware)");
|
warn!(
|
||||||
|
method = method,
|
||||||
|
path = path,
|
||||||
|
error = ?error,
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
"Request failed"
|
||||||
|
);
|
||||||
Err(error)
|
Err(error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use crate::banner::rate_limiter::{RequestType, SharedRateLimiter};
|
|||||||
use http::Extensions;
|
use http::Extensions;
|
||||||
use reqwest::{Request, Response};
|
use reqwest::{Request, Response};
|
||||||
use reqwest_middleware::{Middleware, Next};
|
use reqwest_middleware::{Middleware, Next};
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::debug;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
/// Middleware that enforces rate limiting based on request URL patterns
|
/// Middleware that enforces rate limiting based on request URL patterns
|
||||||
@@ -18,6 +18,16 @@ impl RateLimitMiddleware {
|
|||||||
Self { rate_limiter }
|
Self { rate_limiter }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a human-readable description of the rate limit for a request type
|
||||||
|
fn get_rate_limit_description(request_type: RequestType) -> &'static str {
|
||||||
|
match request_type {
|
||||||
|
RequestType::Session => "6 rpm (~10s interval)",
|
||||||
|
RequestType::Search => "30 rpm (~2s interval)",
|
||||||
|
RequestType::Metadata => "20 rpm (~3s interval)",
|
||||||
|
RequestType::Reset => "10 rpm (~6s interval)",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Determines the request type based on the URL path
|
/// Determines the request type based on the URL path
|
||||||
fn get_request_type(url: &Url) -> RequestType {
|
fn get_request_type(url: &Url) -> RequestType {
|
||||||
let path = url.path();
|
let path = url.path();
|
||||||
@@ -53,49 +63,22 @@ impl Middleware for RateLimitMiddleware {
|
|||||||
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
||||||
let request_type = Self::get_request_type(req.url());
|
let request_type = Self::get_request_type(req.url());
|
||||||
|
|
||||||
trace!(
|
let start = std::time::Instant::now();
|
||||||
url = %req.url(),
|
|
||||||
request_type = ?request_type,
|
|
||||||
"Rate limiting request"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Wait for permission to make the request
|
|
||||||
self.rate_limiter.wait_for_permission(request_type).await;
|
self.rate_limiter.wait_for_permission(request_type).await;
|
||||||
|
let wait_duration = start.elapsed();
|
||||||
|
|
||||||
trace!(
|
// Only log if rate limiting caused significant delay (>= 500ms)
|
||||||
url = %req.url(),
|
if wait_duration.as_millis() >= 500 {
|
||||||
request_type = ?request_type,
|
let limit_desc = Self::get_rate_limit_description(request_type);
|
||||||
"Rate limit permission granted, making request"
|
debug!(
|
||||||
);
|
request_type = ?request_type,
|
||||||
|
wait_ms = wait_duration.as_millis(),
|
||||||
|
rate_limit = limit_desc,
|
||||||
|
"Rate limit caused delay"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Make the actual request
|
// Make the actual request
|
||||||
let response_result = next.run(req, extensions).await;
|
next.run(req, extensions).await
|
||||||
|
|
||||||
match response_result {
|
|
||||||
Ok(response) => {
|
|
||||||
if response.status().is_success() {
|
|
||||||
trace!(
|
|
||||||
url = %response.url(),
|
|
||||||
status = response.status().as_u16(),
|
|
||||||
"Request completed successfully"
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
warn!(
|
|
||||||
url = %response.url(),
|
|
||||||
status = response.status().as_u16(),
|
|
||||||
"Request completed with error status"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
warn!(
|
|
||||||
url = ?error.url(),
|
|
||||||
error = ?error,
|
|
||||||
"Request failed"
|
|
||||||
);
|
|
||||||
Err(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ use governor::{
|
|||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tracing::{debug, trace, warn};
|
|
||||||
|
|
||||||
/// Different types of Banner API requests with different rate limits
|
/// Different types of Banner API requests with different rate limits
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
@@ -99,12 +98,8 @@ impl BannerRateLimiter {
|
|||||||
RequestType::Reset => &self.reset_limiter,
|
RequestType::Reset => &self.reset_limiter,
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!(request_type = ?request_type, "Waiting for rate limit permission");
|
// Wait until we can make the request (logging handled by middleware)
|
||||||
|
|
||||||
// Wait until we can make the request
|
|
||||||
limiter.until_ready().await;
|
limiter.until_ready().await;
|
||||||
|
|
||||||
trace!(request_type = ?request_type, "Rate limit permission granted");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -82,7 +82,6 @@ impl BannerSession {
|
|||||||
|
|
||||||
/// Updates the last activity timestamp
|
/// Updates the last activity timestamp
|
||||||
pub fn touch(&mut self) {
|
pub fn touch(&mut self) {
|
||||||
trace!(id = self.unique_session_id, "Session was used");
|
|
||||||
self.last_activity = Some(Instant::now());
|
self.last_activity = Some(Instant::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,7 +161,7 @@ impl TermPool {
|
|||||||
async fn release(&self, session: BannerSession) {
|
async fn release(&self, session: BannerSession) {
|
||||||
let id = session.unique_session_id.clone();
|
let id = session.unique_session_id.clone();
|
||||||
if session.is_expired() {
|
if session.is_expired() {
|
||||||
trace!(id = id, "Session is now expired, dropping.");
|
debug!(id = id, "Session expired, dropping");
|
||||||
// Wake up a waiter, as it might need to create a new session
|
// Wake up a waiter, as it might need to create a new session
|
||||||
// if this was the last one.
|
// if this was the last one.
|
||||||
self.notifier.notify_one();
|
self.notifier.notify_one();
|
||||||
@@ -171,10 +170,8 @@ impl TermPool {
|
|||||||
|
|
||||||
let mut queue = self.sessions.lock().await;
|
let mut queue = self.sessions.lock().await;
|
||||||
queue.push_back(session);
|
queue.push_back(session);
|
||||||
let queue_size = queue.len();
|
|
||||||
drop(queue); // Release lock before notifying
|
drop(queue); // Release lock before notifying
|
||||||
|
|
||||||
trace!(id = id, queue_size, "Session returned to pool");
|
|
||||||
self.notifier.notify_one();
|
self.notifier.notify_one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -204,22 +201,21 @@ impl SessionPool {
|
|||||||
.or_insert_with(|| Arc::new(TermPool::new()))
|
.or_insert_with(|| Arc::new(TermPool::new()))
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
let mut waited_for_creation = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Fast path: Try to get an existing, non-expired session.
|
// Fast path: Try to get an existing, non-expired session.
|
||||||
{
|
{
|
||||||
let mut queue = term_pool.sessions.lock().await;
|
let mut queue = term_pool.sessions.lock().await;
|
||||||
if let Some(session) = queue.pop_front() {
|
if let Some(session) = queue.pop_front() {
|
||||||
if !session.is_expired() {
|
if !session.is_expired() {
|
||||||
trace!(id = session.unique_session_id, "Reusing session from pool");
|
|
||||||
return Ok(PooledSession {
|
return Ok(PooledSession {
|
||||||
session: Some(session),
|
session: Some(session),
|
||||||
pool: Arc::clone(&term_pool),
|
pool: Arc::clone(&term_pool),
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
trace!(
|
debug!(id = session.unique_session_id, "Discarded expired session");
|
||||||
id = session.unique_session_id,
|
|
||||||
"Popped an expired session, discarding."
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // MutexGuard is dropped, lock is released.
|
} // MutexGuard is dropped, lock is released.
|
||||||
@@ -229,7 +225,10 @@ impl SessionPool {
|
|||||||
if *is_creating_guard {
|
if *is_creating_guard {
|
||||||
// Another task is already creating a session. Release the lock and wait.
|
// Another task is already creating a session. Release the lock and wait.
|
||||||
drop(is_creating_guard);
|
drop(is_creating_guard);
|
||||||
trace!("Another task is creating a session, waiting for notification...");
|
if !waited_for_creation {
|
||||||
|
trace!("Waiting for another task to create session");
|
||||||
|
waited_for_creation = true;
|
||||||
|
}
|
||||||
term_pool.notifier.notified().await;
|
term_pool.notifier.notified().await;
|
||||||
// Loop back to the top to try the fast path again.
|
// Loop back to the top to try the fast path again.
|
||||||
continue;
|
continue;
|
||||||
@@ -240,12 +239,11 @@ impl SessionPool {
|
|||||||
drop(is_creating_guard);
|
drop(is_creating_guard);
|
||||||
|
|
||||||
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
|
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
|
||||||
trace!("Pool empty, racing notifier vs rate limiter...");
|
trace!("Pool empty, creating new session");
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = term_pool.notifier.notified() => {
|
_ = term_pool.notifier.notified() => {
|
||||||
// A session was returned while we were waiting!
|
// A session was returned while we were waiting!
|
||||||
// We are no longer the creator. Reset the flag and loop to race for the new session.
|
// We are no longer the creator. Reset the flag and loop to race for the new session.
|
||||||
trace!("Notified that a session was returned. Looping to retry.");
|
|
||||||
let mut guard = term_pool.is_creating.lock().await;
|
let mut guard = term_pool.is_creating.lock().await;
|
||||||
*guard = false;
|
*guard = false;
|
||||||
drop(guard);
|
drop(guard);
|
||||||
@@ -253,7 +251,6 @@ impl SessionPool {
|
|||||||
}
|
}
|
||||||
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
|
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
|
||||||
// The rate limit has elapsed. It's our job to create the session.
|
// The rate limit has elapsed. It's our job to create the session.
|
||||||
trace!("Rate limiter ready. Proceeding to create a new session.");
|
|
||||||
let new_session_result = self.create_session(&term).await;
|
let new_session_result = self.create_session(&term).await;
|
||||||
|
|
||||||
// After creation, we are no longer the creator. Reset the flag
|
// After creation, we are no longer the creator. Reset the flag
|
||||||
@@ -265,7 +262,12 @@ impl SessionPool {
|
|||||||
|
|
||||||
match new_session_result {
|
match new_session_result {
|
||||||
Ok(new_session) => {
|
Ok(new_session) => {
|
||||||
debug!(id = new_session.unique_session_id, "Successfully created new session");
|
let elapsed = start.elapsed();
|
||||||
|
debug!(
|
||||||
|
id = new_session.unique_session_id,
|
||||||
|
elapsed_ms = elapsed.as_millis(),
|
||||||
|
"Created new session"
|
||||||
|
);
|
||||||
return Ok(PooledSession {
|
return Ok(PooledSession {
|
||||||
session: Some(new_session),
|
session: Some(new_session),
|
||||||
pool: term_pool,
|
pool: term_pool,
|
||||||
@@ -298,8 +300,12 @@ impl SessionPool {
|
|||||||
.get_all("Set-Cookie")
|
.get_all("Set-Cookie")
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|header_value| {
|
.filter_map(|header_value| {
|
||||||
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) {
|
if let Ok(cookie_str) = header_value.to_str() {
|
||||||
Some((cookie.name().to_string(), cookie.value().to_string()))
|
if let Ok(cookie) = Cookie::parse(cookie_str) {
|
||||||
|
Some((cookie.name().to_string(), cookie.value().to_string()))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -310,16 +316,12 @@ impl SessionPool {
|
|||||||
return Err(anyhow::anyhow!("Failed to get cookies"));
|
return Err(anyhow::anyhow!("Failed to get cookies"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let jsessionid = cookies.get("JSESSIONID").unwrap();
|
let jsessionid = cookies.get("JSESSIONID")
|
||||||
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
|
.ok_or_else(|| anyhow::anyhow!("JSESSIONID cookie missing after validation"))?;
|
||||||
|
let ssb_cookie = cookies.get("SSB_COOKIE")
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("SSB_COOKIE cookie missing after validation"))?;
|
||||||
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
|
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
|
||||||
|
|
||||||
trace!(
|
|
||||||
jsessionid = jsessionid,
|
|
||||||
ssb_cookie = ssb_cookie,
|
|
||||||
"New session cookies acquired"
|
|
||||||
);
|
|
||||||
|
|
||||||
self.http
|
self.http
|
||||||
.get(format!("{}/selfServiceMenu/data", self.base_url))
|
.get(format!("{}/selfServiceMenu/data", self.base_url))
|
||||||
.header("Cookie", &cookie_header)
|
.header("Cookie", &cookie_header)
|
||||||
@@ -435,8 +437,15 @@ impl SessionPool {
|
|||||||
|
|
||||||
let redirect: RedirectResponse = response.json().await?;
|
let redirect: RedirectResponse = response.json().await?;
|
||||||
|
|
||||||
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string();
|
let base_url_path = self.base_url.parse::<Url>()
|
||||||
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap();
|
.context("Failed to parse base URL")?
|
||||||
|
.path()
|
||||||
|
.to_string();
|
||||||
|
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path)
|
||||||
|
.ok_or_else(|| anyhow::anyhow!(
|
||||||
|
"Redirect URL '{}' does not start with expected prefix '{}'",
|
||||||
|
redirect.fwd_url, base_url_path
|
||||||
|
))?;
|
||||||
|
|
||||||
// Follow the redirect
|
// Follow the redirect
|
||||||
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
|
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
|
||||||
@@ -454,7 +463,6 @@ impl SessionPool {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(term = term, "successfully selected term");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
138
src/data/batch.rs
Normal file
138
src/data/batch.rs
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
//! Batch database operations for improved performance.
|
||||||
|
|
||||||
|
use crate::banner::Course;
|
||||||
|
use crate::error::Result;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use std::time::Instant;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
/// Batch upsert courses in a single database query.
|
||||||
|
///
|
||||||
|
/// This function performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses
|
||||||
|
/// in a single round-trip to the database, significantly reducing overhead compared
|
||||||
|
/// to individual inserts.
|
||||||
|
///
|
||||||
|
/// # Performance
|
||||||
|
/// - Reduces N database round-trips to 1
|
||||||
|
/// - Typical usage: 50-200 courses per batch
|
||||||
|
/// - PostgreSQL parameter limit: 65,535 (we use ~10 per course)
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `courses` - Slice of Course structs from the Banner API
|
||||||
|
/// * `db_pool` - PostgreSQL connection pool
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// * `Ok(())` on success
|
||||||
|
/// * `Err(_)` if the database operation fails
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
/// ```no_run
|
||||||
|
/// use banner::data::batch::batch_upsert_courses;
|
||||||
|
/// use banner::banner::Course;
|
||||||
|
/// use sqlx::PgPool;
|
||||||
|
///
|
||||||
|
/// async fn example(courses: &[Course], pool: &PgPool) -> anyhow::Result<()> {
|
||||||
|
/// batch_upsert_courses(courses, pool).await?;
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> {
|
||||||
|
// Early return for empty batches
|
||||||
|
if courses.is_empty() {
|
||||||
|
info!("No courses to upsert, skipping batch operation");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
let course_count = courses.len();
|
||||||
|
|
||||||
|
// Extract course fields into vectors for UNNEST
|
||||||
|
let crns: Vec<&str> = courses
|
||||||
|
.iter()
|
||||||
|
.map(|c| c.course_reference_number.as_str())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect();
|
||||||
|
|
||||||
|
let course_numbers: Vec<&str> = courses
|
||||||
|
.iter()
|
||||||
|
.map(|c| c.course_number.as_str())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect();
|
||||||
|
|
||||||
|
let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect();
|
||||||
|
|
||||||
|
let enrollments: Vec<i32> = courses.iter().map(|c| c.enrollment).collect();
|
||||||
|
|
||||||
|
let max_enrollments: Vec<i32> = courses.iter().map(|c| c.maximum_enrollment).collect();
|
||||||
|
|
||||||
|
let wait_counts: Vec<i32> = courses.iter().map(|c| c.wait_count).collect();
|
||||||
|
|
||||||
|
let wait_capacities: Vec<i32> = courses.iter().map(|c| c.wait_capacity).collect();
|
||||||
|
|
||||||
|
// Perform batch upsert using UNNEST for efficient bulk insertion
|
||||||
|
let result = sqlx::query(
|
||||||
|
r#"
|
||||||
|
INSERT INTO courses (
|
||||||
|
crn, subject, course_number, title, term_code,
|
||||||
|
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
|
||||||
|
)
|
||||||
|
SELECT * FROM UNNEST(
|
||||||
|
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
|
||||||
|
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
|
||||||
|
array_fill(NOW()::timestamptz, ARRAY[$10])
|
||||||
|
) AS t(
|
||||||
|
crn, subject, course_number, title, term_code,
|
||||||
|
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
|
||||||
|
)
|
||||||
|
ON CONFLICT (crn, term_code)
|
||||||
|
DO UPDATE SET
|
||||||
|
subject = EXCLUDED.subject,
|
||||||
|
course_number = EXCLUDED.course_number,
|
||||||
|
title = EXCLUDED.title,
|
||||||
|
enrollment = EXCLUDED.enrollment,
|
||||||
|
max_enrollment = EXCLUDED.max_enrollment,
|
||||||
|
wait_count = EXCLUDED.wait_count,
|
||||||
|
wait_capacity = EXCLUDED.wait_capacity,
|
||||||
|
last_scraped_at = EXCLUDED.last_scraped_at
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(&crns)
|
||||||
|
.bind(&subjects)
|
||||||
|
.bind(&course_numbers)
|
||||||
|
.bind(&titles)
|
||||||
|
.bind(&term_codes)
|
||||||
|
.bind(&enrollments)
|
||||||
|
.bind(&max_enrollments)
|
||||||
|
.bind(&wait_counts)
|
||||||
|
.bind(&wait_capacities)
|
||||||
|
.bind(course_count as i32)
|
||||||
|
.execute(db_pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?;
|
||||||
|
|
||||||
|
let duration = start.elapsed();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
courses_count = course_count,
|
||||||
|
rows_affected = result.rows_affected(),
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
"Batch upserted courses"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_empty_batch_returns_ok() {
|
||||||
|
// This is a basic compile-time test
|
||||||
|
// Runtime tests would require sqlx::test macro and a test database
|
||||||
|
let courses: Vec<Course> = vec![];
|
||||||
|
assert_eq!(courses.len(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
//! Database models and schema.
|
//! Database models and schema.
|
||||||
|
|
||||||
|
pub mod batch;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
|
|||||||
@@ -72,4 +72,8 @@ pub struct ScrapeJob {
|
|||||||
pub execute_at: DateTime<Utc>,
|
pub execute_at: DateTime<Utc>,
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
pub locked_at: Option<DateTime<Utc>>,
|
pub locked_at: Option<DateTime<Utc>>,
|
||||||
|
/// Number of retry attempts for this job (non-negative, enforced by CHECK constraint)
|
||||||
|
pub retry_count: i32,
|
||||||
|
/// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint)
|
||||||
|
pub max_retries: i32,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,10 +7,13 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
|||||||
/// Configure and initialize logging for the application
|
/// Configure and initialize logging for the application
|
||||||
pub fn setup_logging(config: &Config, tracing_format: TracingFormat) {
|
pub fn setup_logging(config: &Config, tracing_format: TracingFormat) {
|
||||||
// Configure logging based on config
|
// Configure logging based on config
|
||||||
|
// Note: Even when base_level is trace or debug, we suppress trace logs from noisy
|
||||||
|
// infrastructure modules to keep output readable. These modules use debug for important
|
||||||
|
// events and trace only for very detailed debugging.
|
||||||
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||||
let base_level = &config.log_level;
|
let base_level = &config.log_level;
|
||||||
EnvFilter::new(format!(
|
EnvFilter::new(format!(
|
||||||
"warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn",
|
"warn,banner={},banner::rate_limiter=warn,banner::session=debug,banner::rate_limit_middleware=warn,banner::middleware=debug",
|
||||||
base_level
|
base_level
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
use super::Job;
|
use super::Job;
|
||||||
use crate::banner::{BannerApi, Course, SearchQuery, Term};
|
use crate::banner::{BannerApi, SearchQuery, Term};
|
||||||
|
use crate::data::batch::batch_upsert_courses;
|
||||||
use crate::data::models::TargetType;
|
use crate::data::models::TargetType;
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use tracing::{debug, info, trace};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
/// Job implementation for scraping subject data
|
/// Job implementation for scraping subject data
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@@ -24,9 +25,9 @@ impl Job for SubjectJob {
|
|||||||
TargetType::Subject
|
TargetType::Subject
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))]
|
||||||
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> {
|
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> {
|
||||||
let subject_code = &self.subject;
|
let subject_code = &self.subject;
|
||||||
debug!(subject = subject_code, "Processing subject job");
|
|
||||||
|
|
||||||
// Get the current term
|
// Get the current term
|
||||||
let term = Term::get_current().inner().to_string();
|
let term = Term::get_current().inner().to_string();
|
||||||
@@ -42,9 +43,7 @@ impl Job for SubjectJob {
|
|||||||
count = courses_from_api.len(),
|
count = courses_from_api.len(),
|
||||||
"Found courses"
|
"Found courses"
|
||||||
);
|
);
|
||||||
for course in courses_from_api {
|
batch_upsert_courses(&courses_from_api, db_pool).await?;
|
||||||
self.upsert_course(&course, db_pool).await?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(subject = subject_code, "Subject job completed");
|
debug!(subject = subject_code, "Subject job completed");
|
||||||
@@ -55,39 +54,3 @@ impl Job for SubjectJob {
|
|||||||
format!("Scrape subject: {}", self.subject)
|
format!("Scrape subject: {}", self.subject)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SubjectJob {
|
|
||||||
async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> {
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
|
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
||||||
ON CONFLICT (crn, term_code) DO UPDATE SET
|
|
||||||
subject = EXCLUDED.subject,
|
|
||||||
course_number = EXCLUDED.course_number,
|
|
||||||
title = EXCLUDED.title,
|
|
||||||
enrollment = EXCLUDED.enrollment,
|
|
||||||
max_enrollment = EXCLUDED.max_enrollment,
|
|
||||||
wait_count = EXCLUDED.wait_count,
|
|
||||||
wait_capacity = EXCLUDED.wait_capacity,
|
|
||||||
last_scraped_at = EXCLUDED.last_scraped_at
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(&course.course_reference_number)
|
|
||||||
.bind(&course.subject)
|
|
||||||
.bind(&course.course_number)
|
|
||||||
.bind(&course.course_title)
|
|
||||||
.bind(&course.term)
|
|
||||||
.bind(course.enrollment)
|
|
||||||
.bind(course.maximum_enrollment)
|
|
||||||
.bind(course.wait_count)
|
|
||||||
.bind(course.wait_capacity)
|
|
||||||
.bind(chrono::Utc::now())
|
|
||||||
.execute(db_pool)
|
|
||||||
.await
|
|
||||||
.map(|result| {
|
|
||||||
trace!(subject = course.subject, crn = course.course_reference_number, result = ?result, "Course upserted");
|
|
||||||
})
|
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use std::time::Duration;
|
|||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, error, info, trace, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
/// Periodically analyzes data and enqueues prioritized scrape jobs.
|
/// Periodically analyzes data and enqueues prioritized scrape jobs.
|
||||||
pub struct Scheduler {
|
pub struct Scheduler {
|
||||||
@@ -99,6 +99,7 @@ impl Scheduler {
|
|||||||
/// 3. Create jobs only for subjects that don't have pending jobs
|
/// 3. Create jobs only for subjects that don't have pending jobs
|
||||||
///
|
///
|
||||||
/// This is a static method (not &self) to allow it to be called from spawned tasks.
|
/// This is a static method (not &self) to allow it to be called from spawned tasks.
|
||||||
|
#[tracing::instrument(skip_all, fields(term))]
|
||||||
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
|
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
|
||||||
// For now, we will implement a simple baseline scheduling strategy:
|
// For now, we will implement a simple baseline scheduling strategy:
|
||||||
// 1. Get a list of all subjects from the Banner API.
|
// 1. Get a list of all subjects from the Banner API.
|
||||||
@@ -106,6 +107,7 @@ impl Scheduler {
|
|||||||
// 3. Create new jobs only for subjects that don't have existing jobs.
|
// 3. Create new jobs only for subjects that don't have existing jobs.
|
||||||
let term = Term::get_current().inner().to_string();
|
let term = Term::get_current().inner().to_string();
|
||||||
|
|
||||||
|
tracing::Span::current().record("term", term.as_str());
|
||||||
debug!(term = term, "Enqueuing subject jobs");
|
debug!(term = term, "Enqueuing subject jobs");
|
||||||
|
|
||||||
let subjects = banner_api.get_subjects("", &term, 1, 500).await?;
|
let subjects = banner_api.get_subjects("", &term, 1, 500).await?;
|
||||||
@@ -137,6 +139,7 @@ impl Scheduler {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Filter out subjects that already have jobs and prepare new jobs
|
// Filter out subjects that already have jobs and prepare new jobs
|
||||||
|
let mut skipped_count = 0;
|
||||||
let new_jobs: Vec<_> = subjects
|
let new_jobs: Vec<_> = subjects
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|subject| {
|
.filter_map(|subject| {
|
||||||
@@ -145,7 +148,7 @@ impl Scheduler {
|
|||||||
let payload_str = payload.to_string();
|
let payload_str = payload.to_string();
|
||||||
|
|
||||||
if existing_payloads.contains(&payload_str) {
|
if existing_payloads.contains(&payload_str) {
|
||||||
trace!(subject = subject.code, "Job already exists, skipping");
|
skipped_count += 1;
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some((payload, subject.code))
|
Some((payload, subject.code))
|
||||||
@@ -153,6 +156,10 @@ impl Scheduler {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
if skipped_count > 0 {
|
||||||
|
debug!(count = skipped_count, "Skipped subjects with existing jobs");
|
||||||
|
}
|
||||||
|
|
||||||
// Insert all new jobs in a single batch
|
// Insert all new jobs in a single batch
|
||||||
if !new_jobs.is_empty() {
|
if !new_jobs.is_empty() {
|
||||||
let now = chrono::Utc::now();
|
let now = chrono::Utc::now();
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tracing::{debug, error, info, trace, warn};
|
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||||
|
|
||||||
/// A single worker instance.
|
/// A single worker instance.
|
||||||
///
|
///
|
||||||
@@ -57,7 +57,9 @@ impl Worker {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let job_id = job.id;
|
let job_id = job.id;
|
||||||
debug!(worker_id = self.id, job_id, "Processing job");
|
let retry_count = job.retry_count;
|
||||||
|
let max_retries = job.max_retries;
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
// Process the job, racing against shutdown signal
|
// Process the job, racing against shutdown signal
|
||||||
let process_result = tokio::select! {
|
let process_result = tokio::select! {
|
||||||
@@ -68,8 +70,10 @@ impl Worker {
|
|||||||
result = self.process_job(job) => result
|
result = self.process_job(job) => result
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let duration = start.elapsed();
|
||||||
|
|
||||||
// Handle the job processing result
|
// Handle the job processing result
|
||||||
self.handle_job_result(job_id, process_result).await;
|
self.handle_job_result(job_id, retry_count, max_retries, process_result, duration).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -106,20 +110,31 @@ impl Worker {
|
|||||||
// Get the job implementation
|
// Get the job implementation
|
||||||
let job_impl = job_type.boxed();
|
let job_impl = job_type.boxed();
|
||||||
|
|
||||||
debug!(
|
// Create span with job context
|
||||||
worker_id = self.id,
|
let span = tracing::debug_span!(
|
||||||
|
"process_job",
|
||||||
job_id = job.id,
|
job_id = job.id,
|
||||||
description = job_impl.description(),
|
job_type = job_impl.description()
|
||||||
"Processing job"
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Process the job - API errors are recoverable
|
async move {
|
||||||
job_impl
|
debug!(
|
||||||
.process(&self.banner_api, &self.db_pool)
|
worker_id = self.id,
|
||||||
.await
|
job_id = job.id,
|
||||||
.map_err(JobError::Recoverable)?;
|
description = job_impl.description(),
|
||||||
|
"Processing job"
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
// Process the job - API errors are recoverable
|
||||||
|
job_impl
|
||||||
|
.process(&self.banner_api, &self.db_pool)
|
||||||
|
.await
|
||||||
|
.map_err(JobError::Recoverable)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.instrument(span)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_job(&self, job_id: i32) -> Result<()> {
|
async fn delete_job(&self, job_id: i32) -> Result<()> {
|
||||||
@@ -135,10 +150,24 @@ impl Worker {
|
|||||||
.bind(job_id)
|
.bind(job_id)
|
||||||
.execute(&self.db_pool)
|
.execute(&self.db_pool)
|
||||||
.await?;
|
.await?;
|
||||||
info!(worker_id = self.id, job_id, "Job unlocked for retry");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result<bool> {
|
||||||
|
let result = sqlx::query_scalar::<_, Option<i32>>(
|
||||||
|
"UPDATE scrape_jobs
|
||||||
|
SET locked_at = NULL, retry_count = retry_count + 1
|
||||||
|
WHERE id = $1
|
||||||
|
RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END"
|
||||||
|
)
|
||||||
|
.bind(job_id)
|
||||||
|
.bind(max_retries)
|
||||||
|
.fetch_one(&self.db_pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(result.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle shutdown signal received during job processing
|
/// Handle shutdown signal received during job processing
|
||||||
async fn handle_shutdown_during_processing(&self, job_id: i32) {
|
async fn handle_shutdown_during_processing(&self, job_id: i32) {
|
||||||
info!(worker_id = self.id, job_id, "Shutdown received during job processing");
|
info!(worker_id = self.id, job_id, "Shutdown received during job processing");
|
||||||
@@ -158,19 +187,30 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle the result of job processing
|
/// Handle the result of job processing
|
||||||
async fn handle_job_result(&self, job_id: i32, result: Result<(), JobError>) {
|
async fn handle_job_result(&self, job_id: i32, retry_count: i32, max_retries: i32, result: Result<(), JobError>, duration: std::time::Duration) {
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
debug!(worker_id = self.id, job_id, "Job completed successfully");
|
debug!(
|
||||||
|
worker_id = self.id,
|
||||||
|
job_id,
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
"Job completed successfully"
|
||||||
|
);
|
||||||
if let Err(e) = self.delete_job(job_id).await {
|
if let Err(e) = self.delete_job(job_id).await {
|
||||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
|
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(JobError::Recoverable(e)) => {
|
Err(JobError::Recoverable(e)) => {
|
||||||
self.handle_recoverable_error(job_id, e).await;
|
self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration).await;
|
||||||
}
|
}
|
||||||
Err(JobError::Unrecoverable(e)) => {
|
Err(JobError::Unrecoverable(e)) => {
|
||||||
error!(worker_id = self.id, job_id, error = ?e, "Job corrupted, deleting");
|
error!(
|
||||||
|
worker_id = self.id,
|
||||||
|
job_id,
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
error = ?e,
|
||||||
|
"Job corrupted, deleting"
|
||||||
|
);
|
||||||
if let Err(e) = self.delete_job(job_id).await {
|
if let Err(e) = self.delete_job(job_id).await {
|
||||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
|
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
|
||||||
}
|
}
|
||||||
@@ -179,18 +219,63 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle recoverable errors by logging appropriately and unlocking the job
|
/// Handle recoverable errors by logging appropriately and unlocking the job
|
||||||
async fn handle_recoverable_error(&self, job_id: i32, e: anyhow::Error) {
|
async fn handle_recoverable_error(&self, job_id: i32, retry_count: i32, max_retries: i32, e: anyhow::Error, duration: std::time::Duration) {
|
||||||
|
let next_attempt = retry_count.saturating_add(1);
|
||||||
|
let remaining_retries = max_retries.saturating_sub(next_attempt);
|
||||||
|
|
||||||
|
// Log the error appropriately based on type
|
||||||
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
|
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
|
||||||
warn!(
|
warn!(
|
||||||
worker_id = self.id,
|
worker_id = self.id,
|
||||||
job_id, "Invalid session detected, forcing session refresh"
|
job_id,
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
retry_attempt = next_attempt,
|
||||||
|
max_retries = max_retries,
|
||||||
|
remaining_retries = remaining_retries,
|
||||||
|
"Invalid session detected, will retry"
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
|
error!(
|
||||||
|
worker_id = self.id,
|
||||||
|
job_id,
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
retry_attempt = next_attempt,
|
||||||
|
max_retries = max_retries,
|
||||||
|
remaining_retries = remaining_retries,
|
||||||
|
error = ?e,
|
||||||
|
"Failed to process job, will retry"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = self.unlock_job(job_id).await {
|
// Atomically unlock and increment retry count, checking if retry is allowed
|
||||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock job");
|
match self.unlock_and_increment_retry(job_id, max_retries).await {
|
||||||
|
Ok(can_retry) if can_retry => {
|
||||||
|
info!(
|
||||||
|
worker_id = self.id,
|
||||||
|
job_id,
|
||||||
|
retry_attempt = next_attempt,
|
||||||
|
remaining_retries = remaining_retries,
|
||||||
|
"Job unlocked for retry"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
// Max retries exceeded (detected atomically)
|
||||||
|
error!(
|
||||||
|
worker_id = self.id,
|
||||||
|
job_id,
|
||||||
|
duration_ms = duration.as_millis(),
|
||||||
|
retry_count = next_attempt,
|
||||||
|
max_retries = max_retries,
|
||||||
|
error = ?e,
|
||||||
|
"Job failed permanently (max retries exceeded), deleting"
|
||||||
|
);
|
||||||
|
if let Err(e) = self.delete_job(job_id).await {
|
||||||
|
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user