From 9d720bb0a73f8ca71d84a564ed2f0b82b78fab6f Mon Sep 17 00:00:00 2001 From: Xevion Date: Sat, 13 Sep 2025 00:17:53 -0500 Subject: [PATCH] feat: implement common job trait & better interface for scheduler & workers --- src/scraper/jobs/mod.rs | 52 +++++++++++++++++++++ src/scraper/jobs/subject.rs | 93 +++++++++++++++++++++++++++++++++++++ src/scraper/mod.rs | 1 + src/scraper/scheduler.rs | 4 +- src/scraper/worker.rs | 90 +++++++---------------------------- 5 files changed, 167 insertions(+), 73 deletions(-) create mode 100644 src/scraper/jobs/mod.rs create mode 100644 src/scraper/jobs/subject.rs diff --git a/src/scraper/jobs/mod.rs b/src/scraper/jobs/mod.rs new file mode 100644 index 0000000..4537f11 --- /dev/null +++ b/src/scraper/jobs/mod.rs @@ -0,0 +1,52 @@ +pub mod subject; + +use crate::data::models::TargetType; +use crate::error::Result; +use crate::{banner::BannerApi, scraper::jobs::subject::SubjectJob}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; + +/// Common trait interface for all job types +#[async_trait::async_trait] +pub trait Job: Send + Sync { + /// The target type this job handles + fn target_type(&self) -> TargetType; + + /// Process the job with the given API client and database pool + async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()>; + + /// Get a human-readable description of the job + fn description(&self) -> String; +} + +/// Main job enum that dispatches to specific job implementations +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum JobType { + Subject(SubjectJob), +} + +impl JobType { + /// Create a job from the target type and payload + pub fn from_target_type_and_payload( + target_type: TargetType, + payload: serde_json::Value, + ) -> Result { + match target_type { + TargetType::Subject => { + let subject_payload: SubjectJob = serde_json::from_value(payload)?; + Ok(JobType::Subject(subject_payload)) + } + _ => Err(anyhow::anyhow!( + "Unsupported target type: {:?}", + target_type + )), + } + } + + /// Convert to a Job trait object + pub fn as_job(self) -> Box { + match self { + JobType::Subject(payload) => Box::new(payload), + } + } +} diff --git a/src/scraper/jobs/subject.rs b/src/scraper/jobs/subject.rs new file mode 100644 index 0000000..7d6ef8c --- /dev/null +++ b/src/scraper/jobs/subject.rs @@ -0,0 +1,93 @@ +use super::Job; +use crate::banner::{BannerApi, Course, SearchQuery, Term}; +use crate::data::models::TargetType; +use crate::error::Result; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use tracing::{debug, info, trace}; + +/// Job implementation for scraping subject data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubjectJob { + pub subject: String, +} + +impl SubjectJob { + pub fn new(subject: String) -> Self { + Self { subject } + } +} + +#[async_trait::async_trait] +impl Job for SubjectJob { + fn target_type(&self) -> TargetType { + TargetType::Subject + } + + async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> { + let subject_code = &self.subject; + debug!(subject = subject_code, "Processing subject job"); + + // Get the current term + let term = Term::get_current().inner().to_string(); + let query = SearchQuery::new().subject(subject_code).max_results(500); + + let search_result = banner_api + .search(&term, &query, "subjectDescription", false) + .await?; + + if let Some(courses_from_api) = search_result.data { + info!( + subject = subject_code, + count = courses_from_api.len(), + "Found courses" + ); + for course in courses_from_api { + self.upsert_course(&course, db_pool).await?; + } + } + + debug!(subject = subject_code, "Subject job completed"); + Ok(()) + } + + fn description(&self) -> String { + format!("Scrape subject: {}", self.subject) + } +} + +impl SubjectJob { + async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> { + sqlx::query( + r#" + INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (crn, term_code) DO UPDATE SET + subject = EXCLUDED.subject, + course_number = EXCLUDED.course_number, + title = EXCLUDED.title, + enrollment = EXCLUDED.enrollment, + max_enrollment = EXCLUDED.max_enrollment, + wait_count = EXCLUDED.wait_count, + wait_capacity = EXCLUDED.wait_capacity, + last_scraped_at = EXCLUDED.last_scraped_at + "#, + ) + .bind(&course.course_reference_number) + .bind(&course.subject) + .bind(&course.course_number) + .bind(&course.course_title) + .bind(&course.term) + .bind(course.enrollment) + .bind(course.maximum_enrollment) + .bind(course.wait_count) + .bind(course.wait_capacity) + .bind(chrono::Utc::now()) + .execute(db_pool) + .await + .map(|result| { + trace!(result = ?result, "Course upserted"); + }) + .map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}")) + } +} diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 1daa864..6d05867 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,3 +1,4 @@ +pub mod jobs; pub mod scheduler; pub mod worker; diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs index baecc03..f15637b 100644 --- a/src/scraper/scheduler.rs +++ b/src/scraper/scheduler.rs @@ -1,6 +1,7 @@ use crate::banner::{BannerApi, Term}; use crate::data::models::{ScrapePriority, TargetType}; use crate::error::Result; +use crate::scraper::jobs::subject::SubjectJob; use serde_json::json; use sqlx::PgPool; use std::sync::Arc; @@ -78,7 +79,8 @@ impl Scheduler { let new_jobs: Vec<_> = subjects .into_iter() .filter_map(|subject| { - let payload = json!({ "subject": subject.code }); + let job = SubjectJob::new(subject.code.clone()); + let payload = serde_json::to_value(&job).unwrap(); let payload_str = payload.to_string(); if existing_payloads.contains(&payload_str) { diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs index 43420f2..60e8f4f 100644 --- a/src/scraper/worker.rs +++ b/src/scraper/worker.rs @@ -1,7 +1,7 @@ -use crate::banner::{BannerApi, BannerApiError, Course, SearchQuery, Term}; +use crate::banner::{BannerApi, BannerApiError}; use crate::data::models::ScrapeJob; use crate::error::Result; -use serde_json::Value; +use crate::scraper::jobs::JobType; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; @@ -110,81 +110,27 @@ impl Worker { } async fn process_job(&self, job: ScrapeJob) -> Result<()> { - match job.target_type { - crate::data::models::TargetType::Subject => { - self.process_subject_job(&job.target_payload).await - } - _ => { - warn!(worker_id = self.id, job_id = job.id, "unhandled job type"); - Ok(()) - } - } - } - - async fn process_subject_job(&self, payload: &Value) -> Result<()> { - let subject_code = payload["subject"] - .as_str() - .ok_or_else(|| anyhow::anyhow!("Invalid subject payload"))?; - info!( + // Convert the database job to our job type + let job_type = JobType::from_target_type_and_payload( + job.target_type, + job.target_payload, + )?; + + // Get the job implementation + let job_impl = job_type.as_job(); + + debug!( worker_id = self.id, - subject = subject_code, - "Scraping subject" + job_id = job.id, + description = job_impl.description(), + "Processing job" ); - - let term = Term::get_current().inner().to_string(); - let query = SearchQuery::new().subject(subject_code).max_results(500); - - let search_result = self - .banner_api - .search(&term, &query, "subjectDescription", false) - .await?; - - if let Some(courses_from_api) = search_result.data { - info!( - worker_id = self.id, - subject = subject_code, - count = courses_from_api.len(), - "Found courses" - ); - for course in courses_from_api { - self.upsert_course(&course).await?; - } - } - - Ok(()) + + // Process the job + job_impl.process(&self.banner_api, &self.db_pool).await } - async fn upsert_course(&self, course: &Course) -> Result<()> { - sqlx::query( - r#" - INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT (crn, term_code) DO UPDATE SET - subject = EXCLUDED.subject, - course_number = EXCLUDED.course_number, - title = EXCLUDED.title, - enrollment = EXCLUDED.enrollment, - max_enrollment = EXCLUDED.max_enrollment, - wait_count = EXCLUDED.wait_count, - wait_capacity = EXCLUDED.wait_capacity, - last_scraped_at = EXCLUDED.last_scraped_at - "#, - ) - .bind(&course.course_reference_number) - .bind(&course.subject) - .bind(&course.course_number) - .bind(&course.course_title) - .bind(&course.term) - .bind(course.enrollment) - .bind(course.maximum_enrollment) - .bind(course.wait_count) - .bind(course.wait_capacity) - .bind(chrono::Utc::now()) - .execute(&self.db_pool) - .await?; - Ok(()) - } async fn delete_job(&self, job_id: i32) -> Result<()> { sqlx::query("DELETE FROM scrape_jobs WHERE id = $1")