feat: add scrape job result persistence for effectiveness tracking

This commit is contained in:
2026-01-30 01:37:41 -06:00
parent 857ceabcca
commit 75a99c10ea
7 changed files with 242 additions and 33 deletions
@@ -0,0 +1,31 @@
-- Scrape job results log: one row per completed (or failed) job for effectiveness tracking.
CREATE TABLE scrape_job_results (
id BIGSERIAL PRIMARY KEY,
target_type target_type NOT NULL,
payload JSONB NOT NULL,
priority scrape_priority NOT NULL,
-- Timing
queued_at TIMESTAMPTZ NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
duration_ms INT NOT NULL,
-- Outcome
success BOOLEAN NOT NULL,
error_message TEXT,
retry_count INT NOT NULL DEFAULT 0,
-- Effectiveness (NULL when success = false)
courses_fetched INT,
courses_changed INT,
courses_unchanged INT,
audits_generated INT,
metrics_generated INT
);
CREATE INDEX idx_scrape_job_results_target_time
ON scrape_job_results (target_type, completed_at);
CREATE INDEX idx_scrape_job_results_completed
ON scrape_job_results (completed_at);
+22 -7
View File
@@ -1,11 +1,11 @@
//! Batch database operations for improved performance. //! Batch database operations for improved performance.
use crate::banner::Course; use crate::banner::Course;
use crate::data::models::DbMeetingTime; use crate::data::models::{DbMeetingTime, UpsertCounts};
use crate::error::Result; use crate::error::Result;
use sqlx::PgConnection; use sqlx::PgConnection;
use sqlx::PgPool; use sqlx::PgPool;
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::time::Instant; use std::time::Instant;
use tracing::info; use tracing::info;
@@ -368,10 +368,10 @@ async fn insert_metrics(metrics: &[MetricEntry], conn: &mut PgConnection) -> Res
/// # Performance /// # Performance
/// - Reduces N database round-trips to 5 (old-data CTE + upsert, audits, metrics, instructors, junction) /// - Reduces N database round-trips to 5 (old-data CTE + upsert, audits, metrics, instructors, junction)
/// - Typical usage: 50-200 courses per batch /// - Typical usage: 50-200 courses per batch
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> { pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<UpsertCounts> {
if courses.is_empty() { if courses.is_empty() {
info!("No courses to upsert, skipping batch operation"); info!("No courses to upsert, skipping batch operation");
return Ok(()); return Ok(UpsertCounts::default());
} }
let start = Instant::now(); let start = Instant::now();
@@ -388,6 +388,19 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul
// Step 3: Compute audit/metric diffs // Step 3: Compute audit/metric diffs
let (audits, metrics) = compute_diffs(&diff_rows); let (audits, metrics) = compute_diffs(&diff_rows);
// Count courses that had at least one field change (existing rows only)
let changed_ids: HashSet<i32> = audits.iter().map(|a| a.course_id).collect();
let existing_count = diff_rows.iter().filter(|r| r.old_id.is_some()).count() as i32;
let courses_changed = changed_ids.len() as i32;
let counts = UpsertCounts {
courses_fetched: course_count as i32,
courses_changed,
courses_unchanged: existing_count - courses_changed,
audits_generated: audits.len() as i32,
metrics_generated: metrics.len() as i32,
};
// Step 4: Insert audits and metrics // Step 4: Insert audits and metrics
insert_audits(&audits, &mut tx).await?; insert_audits(&audits, &mut tx).await?;
insert_metrics(&metrics, &mut tx).await?; insert_metrics(&metrics, &mut tx).await?;
@@ -403,13 +416,15 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul
let duration = start.elapsed(); let duration = start.elapsed();
info!( info!(
courses_count = course_count, courses_count = course_count,
audit_entries = audits.len(), courses_changed = counts.courses_changed,
metric_entries = metrics.len(), courses_unchanged = counts.courses_unchanged,
audit_entries = counts.audits_generated,
metric_entries = counts.metrics_generated,
duration_ms = duration.as_millis(), duration_ms = duration.as_millis(),
"Batch upserted courses with instructors, audits, and metrics" "Batch upserted courses with instructors, audits, and metrics"
); );
Ok(()) Ok(counts)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
+10
View File
@@ -159,6 +159,16 @@ pub struct CourseAudit {
pub new_value: String, pub new_value: String,
} }
/// Aggregate counts returned by batch upsert, used for scrape job result logging.
#[derive(Debug, Clone, Default)]
pub struct UpsertCounts {
pub courses_fetched: i32,
pub courses_changed: i32,
pub courses_unchanged: i32,
pub audits_generated: i32,
pub metrics_generated: i32,
}
/// The priority level of a scrape job. /// The priority level of a scrape job.
#[derive(sqlx::Type, Copy, Debug, Clone)] #[derive(sqlx::Type, Copy, Debug, Clone)]
#[sqlx(type_name = "scrape_priority", rename_all = "PascalCase")] #[sqlx(type_name = "scrape_priority", rename_all = "PascalCase")]
+48 -1
View File
@@ -1,7 +1,8 @@
//! Database operations for scrape job queue management. //! Database operations for scrape job queue management.
use crate::data::models::{ScrapeJob, ScrapePriority, TargetType}; use crate::data::models::{ScrapeJob, ScrapePriority, TargetType, UpsertCounts};
use crate::error::Result; use crate::error::Result;
use chrono::{DateTime, Utc};
use sqlx::PgPool; use sqlx::PgPool;
use std::collections::HashSet; use std::collections::HashSet;
@@ -166,6 +167,52 @@ pub async fn find_existing_job_payloads(
Ok(existing_payloads) Ok(existing_payloads)
} }
/// Insert a scrape job result log entry.
#[allow(clippy::too_many_arguments)]
pub async fn insert_job_result(
target_type: TargetType,
payload: serde_json::Value,
priority: ScrapePriority,
queued_at: DateTime<Utc>,
started_at: DateTime<Utc>,
duration_ms: i32,
success: bool,
error_message: Option<&str>,
retry_count: i32,
counts: Option<&UpsertCounts>,
db_pool: &PgPool,
) -> Result<()> {
sqlx::query(
r#"
INSERT INTO scrape_job_results (
target_type, payload, priority,
queued_at, started_at, duration_ms,
success, error_message, retry_count,
courses_fetched, courses_changed, courses_unchanged,
audits_generated, metrics_generated
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
"#,
)
.bind(target_type)
.bind(&payload)
.bind(priority)
.bind(queued_at)
.bind(started_at)
.bind(duration_ms)
.bind(success)
.bind(error_message)
.bind(retry_count)
.bind(counts.map(|c| c.courses_fetched))
.bind(counts.map(|c| c.courses_changed))
.bind(counts.map(|c| c.courses_unchanged))
.bind(counts.map(|c| c.audits_generated))
.bind(counts.map(|c| c.metrics_generated))
.execute(db_pool)
.await?;
Ok(())
}
/// Batch insert scrape jobs using UNNEST for a single round-trip. /// Batch insert scrape jobs using UNNEST for a single round-trip.
/// ///
/// All jobs are inserted with `execute_at` set to the current time. /// All jobs are inserted with `execute_at` set to the current time.
+4 -3
View File
@@ -1,7 +1,7 @@
pub mod subject; pub mod subject;
use crate::banner::BannerApi; use crate::banner::BannerApi;
use crate::data::models::TargetType; use crate::data::models::{TargetType, UpsertCounts};
use crate::error::Result; use crate::error::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::PgPool; use sqlx::PgPool;
@@ -32,8 +32,9 @@ pub trait Job: Send + Sync {
#[allow(dead_code)] #[allow(dead_code)]
fn target_type(&self) -> TargetType; fn target_type(&self) -> TargetType;
/// Process the job with the given API client and database pool /// Process the job with the given API client and database pool.
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()>; /// Returns upsert effectiveness counts on success.
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<UpsertCounts>;
/// Get a human-readable description of the job /// Get a human-readable description of the job
fn description(&self) -> String; fn description(&self) -> String;
+8 -6
View File
@@ -1,7 +1,7 @@
use super::Job; use super::Job;
use crate::banner::{BannerApi, SearchQuery, Term}; use crate::banner::{BannerApi, SearchQuery, Term};
use crate::data::batch::batch_upsert_courses; use crate::data::batch::batch_upsert_courses;
use crate::data::models::TargetType; use crate::data::models::{TargetType, UpsertCounts};
use crate::error::Result; use crate::error::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::PgPool; use sqlx::PgPool;
@@ -26,7 +26,7 @@ impl Job for SubjectJob {
} }
#[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))] #[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))]
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> { async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<UpsertCounts> {
let subject_code = &self.subject; let subject_code = &self.subject;
// Get the current term // Get the current term
@@ -37,17 +37,19 @@ impl Job for SubjectJob {
.search(&term, &query, "subjectDescription", false) .search(&term, &query, "subjectDescription", false)
.await?; .await?;
if let Some(courses_from_api) = search_result.data { let counts = if let Some(courses_from_api) = search_result.data {
info!( info!(
subject = %subject_code, subject = %subject_code,
count = courses_from_api.len(), count = courses_from_api.len(),
"Found courses" "Found courses"
); );
batch_upsert_courses(&courses_from_api, db_pool).await?; batch_upsert_courses(&courses_from_api, db_pool).await?
} } else {
UpsertCounts::default()
};
debug!(subject = %subject_code, "Subject job completed"); debug!(subject = %subject_code, "Subject job completed");
Ok(()) Ok(counts)
} }
fn description(&self) -> String { fn description(&self) -> String {
+117 -14
View File
@@ -1,10 +1,10 @@
use crate::banner::{BannerApi, BannerApiError}; use crate::banner::{BannerApi, BannerApiError};
use crate::data::models::{ScrapeJob, ScrapeJobStatus}; use crate::data::models::{ScrapeJob, ScrapeJobStatus, UpsertCounts};
use crate::data::scrape_jobs; use crate::data::scrape_jobs;
use crate::error::Result; use crate::error::Result;
use crate::scraper::jobs::{JobError, JobType}; use crate::scraper::jobs::{JobError, JobType};
use crate::web::ws::ScrapeJobEvent; use crate::web::ws::ScrapeJobEvent;
use chrono::Utc; use chrono::{DateTime, Utc};
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -72,10 +72,15 @@ impl Worker {
let job_id = job.id; let job_id = job.id;
let retry_count = job.retry_count; let retry_count = job.retry_count;
let max_retries = job.max_retries; let max_retries = job.max_retries;
let target_type = job.target_type;
let payload = job.target_payload.clone();
let priority = job.priority;
let queued_at = job.queued_at;
let started_at = Utc::now();
let start = std::time::Instant::now(); let start = std::time::Instant::now();
// Emit JobLocked event // Emit JobLocked event
let locked_at = Utc::now().to_rfc3339(); let locked_at = started_at.to_rfc3339();
debug!(job_id, "Emitting JobLocked event"); debug!(job_id, "Emitting JobLocked event");
let _ = self.job_events_tx.send(ScrapeJobEvent::JobLocked { let _ = self.job_events_tx.send(ScrapeJobEvent::JobLocked {
id: job_id, id: job_id,
@@ -105,7 +110,18 @@ impl Worker {
let duration = start.elapsed(); let duration = start.elapsed();
// Handle the job processing result // Handle the job processing result
self.handle_job_result(job_id, retry_count, max_retries, process_result, duration) self.handle_job_result(
job_id,
retry_count,
max_retries,
process_result,
duration,
target_type,
payload,
priority,
queued_at,
started_at,
)
.await; .await;
} }
} }
@@ -118,7 +134,7 @@ impl Worker {
scrape_jobs::fetch_and_lock_job(&self.db_pool).await scrape_jobs::fetch_and_lock_job(&self.db_pool).await
} }
async fn process_job(&self, job: ScrapeJob) -> Result<(), JobError> { async fn process_job(&self, job: ScrapeJob) -> Result<UpsertCounts, JobError> {
// Convert the database job to our job type // Convert the database job to our job type
let job_type = JobType::from_target_type_and_payload(job.target_type, job.target_payload) let job_type = JobType::from_target_type_and_payload(job.target_type, job.target_payload)
.map_err(|e| JobError::Unrecoverable(anyhow::anyhow!(e)))?; // Parse errors are unrecoverable .map_err(|e| JobError::Unrecoverable(anyhow::anyhow!(e)))?; // Parse errors are unrecoverable
@@ -145,9 +161,7 @@ impl Worker {
job_impl job_impl
.process(&self.banner_api, &self.db_pool) .process(&self.banner_api, &self.db_pool)
.await .await
.map_err(JobError::Recoverable)?; .map_err(JobError::Recoverable)
Ok(())
} }
.instrument(span) .instrument(span)
.await .await
@@ -191,22 +205,53 @@ impl Worker {
} }
/// Handle the result of job processing /// Handle the result of job processing
#[allow(clippy::too_many_arguments)]
async fn handle_job_result( async fn handle_job_result(
&self, &self,
job_id: i32, job_id: i32,
retry_count: i32, retry_count: i32,
max_retries: i32, max_retries: i32,
result: Result<(), JobError>, result: Result<UpsertCounts, JobError>,
duration: std::time::Duration, duration: std::time::Duration,
target_type: crate::data::models::TargetType,
payload: serde_json::Value,
priority: crate::data::models::ScrapePriority,
queued_at: DateTime<Utc>,
started_at: DateTime<Utc>,
) { ) {
let duration_ms = duration.as_millis() as i32;
match result { match result {
Ok(()) => { Ok(counts) => {
debug!( debug!(
worker_id = self.id, worker_id = self.id,
job_id, job_id,
duration_ms = duration.as_millis(), duration_ms = duration.as_millis(),
courses_fetched = counts.courses_fetched,
courses_changed = counts.courses_changed,
courses_unchanged = counts.courses_unchanged,
"Job completed successfully" "Job completed successfully"
); );
// Log the result
if let Err(e) = scrape_jobs::insert_job_result(
target_type,
payload,
priority,
queued_at,
started_at,
duration_ms,
true,
None,
retry_count,
Some(&counts),
&self.db_pool,
)
.await
{
error!(worker_id = self.id, job_id, error = ?e, "Failed to insert job result");
}
if let Err(e) = self.delete_job(job_id).await { if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job"); error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
} }
@@ -216,10 +261,41 @@ impl Worker {
.send(ScrapeJobEvent::JobCompleted { id: job_id }); .send(ScrapeJobEvent::JobCompleted { id: job_id });
} }
Err(JobError::Recoverable(e)) => { Err(JobError::Recoverable(e)) => {
self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration) self.handle_recoverable_error(
job_id,
retry_count,
max_retries,
e,
duration,
target_type,
payload,
priority,
queued_at,
started_at,
)
.await; .await;
} }
Err(JobError::Unrecoverable(e)) => { Err(JobError::Unrecoverable(e)) => {
// Log the failed result
let err_msg = format!("{e:#}");
if let Err(log_err) = scrape_jobs::insert_job_result(
target_type,
payload,
priority,
queued_at,
started_at,
duration_ms,
false,
Some(&err_msg),
retry_count,
None,
&self.db_pool,
)
.await
{
error!(worker_id = self.id, job_id, error = ?log_err, "Failed to insert job result");
}
error!( error!(
worker_id = self.id, worker_id = self.id,
job_id, job_id,
@@ -239,6 +315,7 @@ impl Worker {
} }
/// Handle recoverable errors by logging appropriately and unlocking the job /// Handle recoverable errors by logging appropriately and unlocking the job
#[allow(clippy::too_many_arguments)]
async fn handle_recoverable_error( async fn handle_recoverable_error(
&self, &self,
job_id: i32, job_id: i32,
@@ -246,6 +323,11 @@ impl Worker {
max_retries: i32, max_retries: i32,
e: anyhow::Error, e: anyhow::Error,
duration: std::time::Duration, duration: std::time::Duration,
target_type: crate::data::models::TargetType,
payload: serde_json::Value,
priority: crate::data::models::ScrapePriority,
queued_at: DateTime<Utc>,
started_at: DateTime<Utc>,
) { ) {
let next_attempt = retry_count.saturating_add(1); let next_attempt = retry_count.saturating_add(1);
let remaining_retries = max_retries.saturating_sub(next_attempt); let remaining_retries = max_retries.saturating_sub(next_attempt);
@@ -276,7 +358,7 @@ impl Worker {
// Atomically unlock and increment retry count, checking if retry is allowed // Atomically unlock and increment retry count, checking if retry is allowed
match self.unlock_and_increment_retry(job_id, max_retries).await { match self.unlock_and_increment_retry(job_id, max_retries).await {
Ok(Some(queued_at)) => { Ok(Some(new_queued_at)) => {
debug!( debug!(
worker_id = self.id, worker_id = self.id,
job_id, job_id,
@@ -288,12 +370,33 @@ impl Worker {
let _ = self.job_events_tx.send(ScrapeJobEvent::JobRetried { let _ = self.job_events_tx.send(ScrapeJobEvent::JobRetried {
id: job_id, id: job_id,
retry_count: next_attempt, retry_count: next_attempt,
queued_at: queued_at.to_rfc3339(), queued_at: new_queued_at.to_rfc3339(),
status: ScrapeJobStatus::Pending, status: ScrapeJobStatus::Pending,
}); });
// Don't log a result yet — the job will be retried
} }
Ok(None) => { Ok(None) => {
// Max retries exceeded (detected atomically) // Max retries exceeded — log final failure result
let duration_ms = duration.as_millis() as i32;
let err_msg = format!("{e:#}");
if let Err(log_err) = scrape_jobs::insert_job_result(
target_type,
payload,
priority,
queued_at,
started_at,
duration_ms,
false,
Some(&err_msg),
next_attempt,
None,
&self.db_pool,
)
.await
{
error!(worker_id = self.id, job_id, error = ?log_err, "Failed to insert job result");
}
error!( error!(
worker_id = self.id, worker_id = self.id,
job_id, job_id,