diff --git a/migrations/20260131100000_add_scrape_job_results.sql b/migrations/20260131100000_add_scrape_job_results.sql new file mode 100644 index 0000000..b046643 --- /dev/null +++ b/migrations/20260131100000_add_scrape_job_results.sql @@ -0,0 +1,31 @@ +-- Scrape job results log: one row per completed (or failed) job for effectiveness tracking. +CREATE TABLE scrape_job_results ( + id BIGSERIAL PRIMARY KEY, + target_type target_type NOT NULL, + payload JSONB NOT NULL, + priority scrape_priority NOT NULL, + + -- Timing + queued_at TIMESTAMPTZ NOT NULL, + started_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + duration_ms INT NOT NULL, + + -- Outcome + success BOOLEAN NOT NULL, + error_message TEXT, + retry_count INT NOT NULL DEFAULT 0, + + -- Effectiveness (NULL when success = false) + courses_fetched INT, + courses_changed INT, + courses_unchanged INT, + audits_generated INT, + metrics_generated INT +); + +CREATE INDEX idx_scrape_job_results_target_time + ON scrape_job_results (target_type, completed_at); + +CREATE INDEX idx_scrape_job_results_completed + ON scrape_job_results (completed_at); diff --git a/src/data/batch.rs b/src/data/batch.rs index 8847261..d1e0811 100644 --- a/src/data/batch.rs +++ b/src/data/batch.rs @@ -1,11 +1,11 @@ //! Batch database operations for improved performance. use crate::banner::Course; -use crate::data::models::DbMeetingTime; +use crate::data::models::{DbMeetingTime, UpsertCounts}; use crate::error::Result; use sqlx::PgConnection; use sqlx::PgPool; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::time::Instant; use tracing::info; @@ -368,10 +368,10 @@ async fn insert_metrics(metrics: &[MetricEntry], conn: &mut PgConnection) -> Res /// # Performance /// - Reduces N database round-trips to 5 (old-data CTE + upsert, audits, metrics, instructors, junction) /// - Typical usage: 50-200 courses per batch -pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> { +pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result { if courses.is_empty() { info!("No courses to upsert, skipping batch operation"); - return Ok(()); + return Ok(UpsertCounts::default()); } let start = Instant::now(); @@ -388,6 +388,19 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul // Step 3: Compute audit/metric diffs let (audits, metrics) = compute_diffs(&diff_rows); + // Count courses that had at least one field change (existing rows only) + let changed_ids: HashSet = audits.iter().map(|a| a.course_id).collect(); + let existing_count = diff_rows.iter().filter(|r| r.old_id.is_some()).count() as i32; + let courses_changed = changed_ids.len() as i32; + + let counts = UpsertCounts { + courses_fetched: course_count as i32, + courses_changed, + courses_unchanged: existing_count - courses_changed, + audits_generated: audits.len() as i32, + metrics_generated: metrics.len() as i32, + }; + // Step 4: Insert audits and metrics insert_audits(&audits, &mut tx).await?; insert_metrics(&metrics, &mut tx).await?; @@ -403,13 +416,15 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul let duration = start.elapsed(); info!( courses_count = course_count, - audit_entries = audits.len(), - metric_entries = metrics.len(), + courses_changed = counts.courses_changed, + courses_unchanged = counts.courses_unchanged, + audit_entries = counts.audits_generated, + metric_entries = counts.metrics_generated, duration_ms = duration.as_millis(), "Batch upserted courses with instructors, audits, and metrics" ); - Ok(()) + Ok(counts) } // --------------------------------------------------------------------------- diff --git a/src/data/models.rs b/src/data/models.rs index f32a1f6..37abd64 100644 --- a/src/data/models.rs +++ b/src/data/models.rs @@ -159,6 +159,16 @@ pub struct CourseAudit { pub new_value: String, } +/// Aggregate counts returned by batch upsert, used for scrape job result logging. +#[derive(Debug, Clone, Default)] +pub struct UpsertCounts { + pub courses_fetched: i32, + pub courses_changed: i32, + pub courses_unchanged: i32, + pub audits_generated: i32, + pub metrics_generated: i32, +} + /// The priority level of a scrape job. #[derive(sqlx::Type, Copy, Debug, Clone)] #[sqlx(type_name = "scrape_priority", rename_all = "PascalCase")] diff --git a/src/data/scrape_jobs.rs b/src/data/scrape_jobs.rs index 79e4fe6..3f0cf93 100644 --- a/src/data/scrape_jobs.rs +++ b/src/data/scrape_jobs.rs @@ -1,7 +1,8 @@ //! Database operations for scrape job queue management. -use crate::data::models::{ScrapeJob, ScrapePriority, TargetType}; +use crate::data::models::{ScrapeJob, ScrapePriority, TargetType, UpsertCounts}; use crate::error::Result; +use chrono::{DateTime, Utc}; use sqlx::PgPool; use std::collections::HashSet; @@ -166,6 +167,52 @@ pub async fn find_existing_job_payloads( Ok(existing_payloads) } +/// Insert a scrape job result log entry. +#[allow(clippy::too_many_arguments)] +pub async fn insert_job_result( + target_type: TargetType, + payload: serde_json::Value, + priority: ScrapePriority, + queued_at: DateTime, + started_at: DateTime, + duration_ms: i32, + success: bool, + error_message: Option<&str>, + retry_count: i32, + counts: Option<&UpsertCounts>, + db_pool: &PgPool, +) -> Result<()> { + sqlx::query( + r#" + INSERT INTO scrape_job_results ( + target_type, payload, priority, + queued_at, started_at, duration_ms, + success, error_message, retry_count, + courses_fetched, courses_changed, courses_unchanged, + audits_generated, metrics_generated + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + "#, + ) + .bind(target_type) + .bind(&payload) + .bind(priority) + .bind(queued_at) + .bind(started_at) + .bind(duration_ms) + .bind(success) + .bind(error_message) + .bind(retry_count) + .bind(counts.map(|c| c.courses_fetched)) + .bind(counts.map(|c| c.courses_changed)) + .bind(counts.map(|c| c.courses_unchanged)) + .bind(counts.map(|c| c.audits_generated)) + .bind(counts.map(|c| c.metrics_generated)) + .execute(db_pool) + .await?; + + Ok(()) +} + /// Batch insert scrape jobs using UNNEST for a single round-trip. /// /// All jobs are inserted with `execute_at` set to the current time. diff --git a/src/scraper/jobs/mod.rs b/src/scraper/jobs/mod.rs index c3dfcfc..1e0cde0 100644 --- a/src/scraper/jobs/mod.rs +++ b/src/scraper/jobs/mod.rs @@ -1,7 +1,7 @@ pub mod subject; use crate::banner::BannerApi; -use crate::data::models::TargetType; +use crate::data::models::{TargetType, UpsertCounts}; use crate::error::Result; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -32,8 +32,9 @@ pub trait Job: Send + Sync { #[allow(dead_code)] fn target_type(&self) -> TargetType; - /// Process the job with the given API client and database pool - async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()>; + /// Process the job with the given API client and database pool. + /// Returns upsert effectiveness counts on success. + async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result; /// Get a human-readable description of the job fn description(&self) -> String; diff --git a/src/scraper/jobs/subject.rs b/src/scraper/jobs/subject.rs index d4f079c..0a58ba1 100644 --- a/src/scraper/jobs/subject.rs +++ b/src/scraper/jobs/subject.rs @@ -1,7 +1,7 @@ use super::Job; use crate::banner::{BannerApi, SearchQuery, Term}; use crate::data::batch::batch_upsert_courses; -use crate::data::models::TargetType; +use crate::data::models::{TargetType, UpsertCounts}; use crate::error::Result; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -26,7 +26,7 @@ impl Job for SubjectJob { } #[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))] - async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> { + async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result { let subject_code = &self.subject; // Get the current term @@ -37,17 +37,19 @@ impl Job for SubjectJob { .search(&term, &query, "subjectDescription", false) .await?; - if let Some(courses_from_api) = search_result.data { + let counts = if let Some(courses_from_api) = search_result.data { info!( subject = %subject_code, count = courses_from_api.len(), "Found courses" ); - batch_upsert_courses(&courses_from_api, db_pool).await?; - } + batch_upsert_courses(&courses_from_api, db_pool).await? + } else { + UpsertCounts::default() + }; debug!(subject = %subject_code, "Subject job completed"); - Ok(()) + Ok(counts) } fn description(&self) -> String { diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index c611a11..abe4d07 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -1,10 +1,10 @@ use crate::banner::{BannerApi, BannerApiError}; -use crate::data::models::{ScrapeJob, ScrapeJobStatus}; +use crate::data::models::{ScrapeJob, ScrapeJobStatus, UpsertCounts}; use crate::data::scrape_jobs; use crate::error::Result; use crate::scraper::jobs::{JobError, JobType}; use crate::web::ws::ScrapeJobEvent; -use chrono::Utc; +use chrono::{DateTime, Utc}; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; @@ -72,10 +72,15 @@ impl Worker { let job_id = job.id; let retry_count = job.retry_count; let max_retries = job.max_retries; + let target_type = job.target_type; + let payload = job.target_payload.clone(); + let priority = job.priority; + let queued_at = job.queued_at; + let started_at = Utc::now(); let start = std::time::Instant::now(); // Emit JobLocked event - let locked_at = Utc::now().to_rfc3339(); + let locked_at = started_at.to_rfc3339(); debug!(job_id, "Emitting JobLocked event"); let _ = self.job_events_tx.send(ScrapeJobEvent::JobLocked { id: job_id, @@ -105,8 +110,19 @@ impl Worker { let duration = start.elapsed(); // Handle the job processing result - self.handle_job_result(job_id, retry_count, max_retries, process_result, duration) - .await; + self.handle_job_result( + job_id, + retry_count, + max_retries, + process_result, + duration, + target_type, + payload, + priority, + queued_at, + started_at, + ) + .await; } } @@ -118,7 +134,7 @@ impl Worker { scrape_jobs::fetch_and_lock_job(&self.db_pool).await } - async fn process_job(&self, job: ScrapeJob) -> Result<(), JobError> { + async fn process_job(&self, job: ScrapeJob) -> Result { // Convert the database job to our job type let job_type = JobType::from_target_type_and_payload(job.target_type, job.target_payload) .map_err(|e| JobError::Unrecoverable(anyhow::anyhow!(e)))?; // Parse errors are unrecoverable @@ -145,9 +161,7 @@ impl Worker { job_impl .process(&self.banner_api, &self.db_pool) .await - .map_err(JobError::Recoverable)?; - - Ok(()) + .map_err(JobError::Recoverable) } .instrument(span) .await @@ -191,22 +205,53 @@ impl Worker { } /// Handle the result of job processing + #[allow(clippy::too_many_arguments)] async fn handle_job_result( &self, job_id: i32, retry_count: i32, max_retries: i32, - result: Result<(), JobError>, + result: Result, duration: std::time::Duration, + target_type: crate::data::models::TargetType, + payload: serde_json::Value, + priority: crate::data::models::ScrapePriority, + queued_at: DateTime, + started_at: DateTime, ) { + let duration_ms = duration.as_millis() as i32; + match result { - Ok(()) => { + Ok(counts) => { debug!( worker_id = self.id, job_id, duration_ms = duration.as_millis(), + courses_fetched = counts.courses_fetched, + courses_changed = counts.courses_changed, + courses_unchanged = counts.courses_unchanged, "Job completed successfully" ); + + // Log the result + if let Err(e) = scrape_jobs::insert_job_result( + target_type, + payload, + priority, + queued_at, + started_at, + duration_ms, + true, + None, + retry_count, + Some(&counts), + &self.db_pool, + ) + .await + { + error!(worker_id = self.id, job_id, error = ?e, "Failed to insert job result"); + } + if let Err(e) = self.delete_job(job_id).await { error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job"); } @@ -216,10 +261,41 @@ impl Worker { .send(ScrapeJobEvent::JobCompleted { id: job_id }); } Err(JobError::Recoverable(e)) => { - self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration) - .await; + self.handle_recoverable_error( + job_id, + retry_count, + max_retries, + e, + duration, + target_type, + payload, + priority, + queued_at, + started_at, + ) + .await; } Err(JobError::Unrecoverable(e)) => { + // Log the failed result + let err_msg = format!("{e:#}"); + if let Err(log_err) = scrape_jobs::insert_job_result( + target_type, + payload, + priority, + queued_at, + started_at, + duration_ms, + false, + Some(&err_msg), + retry_count, + None, + &self.db_pool, + ) + .await + { + error!(worker_id = self.id, job_id, error = ?log_err, "Failed to insert job result"); + } + error!( worker_id = self.id, job_id, @@ -239,6 +315,7 @@ impl Worker { } /// Handle recoverable errors by logging appropriately and unlocking the job + #[allow(clippy::too_many_arguments)] async fn handle_recoverable_error( &self, job_id: i32, @@ -246,6 +323,11 @@ impl Worker { max_retries: i32, e: anyhow::Error, duration: std::time::Duration, + target_type: crate::data::models::TargetType, + payload: serde_json::Value, + priority: crate::data::models::ScrapePriority, + queued_at: DateTime, + started_at: DateTime, ) { let next_attempt = retry_count.saturating_add(1); let remaining_retries = max_retries.saturating_sub(next_attempt); @@ -276,7 +358,7 @@ impl Worker { // Atomically unlock and increment retry count, checking if retry is allowed match self.unlock_and_increment_retry(job_id, max_retries).await { - Ok(Some(queued_at)) => { + Ok(Some(new_queued_at)) => { debug!( worker_id = self.id, job_id, @@ -288,12 +370,33 @@ impl Worker { let _ = self.job_events_tx.send(ScrapeJobEvent::JobRetried { id: job_id, retry_count: next_attempt, - queued_at: queued_at.to_rfc3339(), + queued_at: new_queued_at.to_rfc3339(), status: ScrapeJobStatus::Pending, }); + // Don't log a result yet — the job will be retried } Ok(None) => { - // Max retries exceeded (detected atomically) + // Max retries exceeded — log final failure result + let duration_ms = duration.as_millis() as i32; + let err_msg = format!("{e:#}"); + if let Err(log_err) = scrape_jobs::insert_job_result( + target_type, + payload, + priority, + queued_at, + started_at, + duration_ms, + false, + Some(&err_msg), + next_attempt, + None, + &self.db_pool, + ) + .await + { + error!(worker_id = self.id, job_id, error = ?log_err, "Failed to insert job result"); + } + error!( worker_id = self.id, job_id,