diff --git a/Cargo.lock b/Cargo.lock index 68c11c6..f01f477 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,6 +175,8 @@ dependencies = [ "chrono-tz", "compile-time", "diesel", + "diesel-derive-enum", + "diesel_derives", "dotenvy", "figment", "fundu", @@ -573,9 +575,23 @@ dependencies = [ "diesel_derives", "itoa", "pq-sys", + "r2d2", + "serde_json", "uuid", ] +[[package]] +name = "diesel-derive-enum" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81c5131a2895ef64741dad1d483f358c2a229a3a2d1b256778cdc5e146db64d4" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "diesel_derives" version = "2.2.7" @@ -642,7 +658,7 @@ checksum = "139ae9aca7527f85f26dd76483eb38533fd84bd571065da1739656ef71c5ff5b" dependencies = [ "darling", "either", - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.106", @@ -986,6 +1002,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -1896,6 +1918,17 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.8.5" @@ -2286,6 +2319,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 089424f..6eda174 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ tokio = { version = "1.47.1", features = ["full"] } axum = "0.8.4" serenity = { version = "0.12.4", features = ["rustls_backend"] } reqwest = { version = "0.12.23", features = ["json", "cookies"] } -diesel = { version = "2.2.12", features = ["chrono", "postgres", "uuid"] } +diesel = { version = "2.2.12", features = ["chrono", "postgres", "r2d2", "uuid", "serde_json"] } redis = { version = "0.32.5", features = ["tokio-comp"] } figment = { version = "0.10.19", features = ["toml", "env"] } serde_json = "1.0.143" @@ -30,3 +30,5 @@ url = "2.5" compile-time = "0.2.0" time = "0.3.41" bitflags = { version = "2.9.3", features = ["serde"] } +diesel_derives = "2.2.7" +diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } diff --git a/src/banner/api.rs b/src/banner/api.rs index 67521f3..604d4af 100644 --- a/src/banner/api.rs +++ b/src/banner/api.rs @@ -11,15 +11,15 @@ use tracing::{error, info}; /// Main Banner API client. #[derive(Debug)] pub struct BannerApi { - session_manager: SessionManager, - client: Client, + sessions: SessionManager, + http: Client, base_url: String, } impl BannerApi { /// Creates a new Banner API client. pub fn new(base_url: String) -> Result { - let client = Client::builder() + let http = Client::builder() .cookie_store(true) .user_agent(user_agent()) .tcp_keepalive(Some(std::time::Duration::from_secs(60 * 5))) @@ -29,11 +29,11 @@ impl BannerApi { .build() .context("Failed to create HTTP client")?; - let session_manager = SessionManager::new(base_url.clone(), client.clone()); + let session_manager = SessionManager::new(base_url.clone(), http.clone()); Ok(Self { - session_manager, - client, + sessions: session_manager, + http, base_url, }) } @@ -41,7 +41,7 @@ impl BannerApi { /// Sets up the API client by initializing session cookies. pub async fn setup(&self) -> Result<()> { info!(base_url = self.base_url, "setting up banner api client"); - let result = self.session_manager.setup().await; + let result = self.sessions.setup().await; match &result { Ok(()) => info!("banner api client setup completed successfully"), Err(e) => error!(error = ?e, "banner api client setup failed"), @@ -69,7 +69,7 @@ impl BannerApi { ]; let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -96,7 +96,7 @@ impl BannerApi { return Err(anyhow::anyhow!("Offset must be greater than 0")); } - let session_id = self.session_manager.ensure_session()?; + let session_id = self.sessions.ensure_session()?; let url = format!("{}/classSearch/get_subject", self.base_url); let params = [ ("searchTerm", search), @@ -108,7 +108,7 @@ impl BannerApi { ]; let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -135,7 +135,7 @@ impl BannerApi { return Err(anyhow::anyhow!("Offset must be greater than 0")); } - let session_id = self.session_manager.ensure_session()?; + let session_id = self.sessions.ensure_session()?; let url = format!("{}/classSearch/get_instructor", self.base_url); let params = [ ("searchTerm", search), @@ -147,7 +147,7 @@ impl BannerApi { ]; let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -174,7 +174,7 @@ impl BannerApi { return Err(anyhow::anyhow!("Offset must be greater than 0")); } - let session_id = self.session_manager.ensure_session()?; + let session_id = self.sessions.ensure_session()?; let url = format!("{}/classSearch/get_campus", self.base_url); let params = [ ("searchTerm", search), @@ -186,7 +186,7 @@ impl BannerApi { ]; let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -211,7 +211,7 @@ impl BannerApi { let params = [("term", term), ("courseReferenceNumber", crn)]; let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -260,9 +260,9 @@ impl BannerApi { sort: &str, sort_descending: bool, ) -> Result { - self.session_manager.reset_data_form().await?; + self.sessions.reset_data_form().await?; - let session_id = self.session_manager.ensure_session()?; + let session_id = self.sessions.ensure_session()?; let mut params = query.to_params(); // Add additional parameters @@ -278,7 +278,7 @@ impl BannerApi { let url = format!("{}/searchResults/searchResults", self.base_url); let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -301,16 +301,16 @@ impl BannerApi { /// Selects a term for the current session. pub async fn select_term(&self, term: &str) -> Result<()> { - self.session_manager.select_term(term).await + self.sessions.select_term(term).await } /// Retrieves a single course by CRN by issuing a minimal search pub async fn get_course_by_crn(&self, term: &str, crn: &str) -> Result> { - self.session_manager.reset_data_form().await?; + self.sessions.reset_data_form().await?; // Ensure session is configured for this term self.select_term(term).await?; - let session_id = self.session_manager.ensure_session()?; + let session_id = self.sessions.ensure_session()?; let query = SearchQuery::new() .course_reference_number(crn) @@ -326,7 +326,7 @@ impl BannerApi { let url = format!("{}/searchResults/searchResults", self.base_url); let response = self - .client + .http .get(&url) .query(¶ms) .send() @@ -366,7 +366,7 @@ impl BannerApi { let url = format!("{}/searchResults/getClassDetails", self.base_url); let response = self - .client + .http .post(&url) .json(&body) .send() diff --git a/src/banner/mod.rs b/src/banner/mod.rs index ba2a728..43f5d06 100644 --- a/src/banner/mod.rs +++ b/src/banner/mod.rs @@ -11,7 +11,6 @@ pub mod api; pub mod models; pub mod query; -pub mod scraper; pub mod session; pub mod util; diff --git a/src/banner/scraper.rs b/src/banner/scraper.rs deleted file mode 100644 index e322a48..0000000 --- a/src/banner/scraper.rs +++ /dev/null @@ -1,292 +0,0 @@ -//! Course scraping functionality for the Banner API. - -use crate::banner::{api::BannerApi, models::*, query::SearchQuery}; -use anyhow::{Context, Result}; -use redis::AsyncCommands; -use std::sync::Arc; -use std::time::Duration; -use tokio::time; -use tracing::{debug, error, info, warn}; - -/// Priority majors that should be scraped more frequently -const PRIORITY_MAJORS: &[&str] = &["CS", "CPE", "MAT", "EE", "IS"]; - -/// Maximum number of courses to fetch per page -const MAX_PAGE_SIZE: i32 = 500; - -/// Course scraper for Banner API -pub struct CourseScraper { - api: Arc, - redis_client: redis::Client, -} - -impl CourseScraper { - /// Creates a new course scraper - pub fn new(api: Arc, redis_url: &str) -> Result { - let redis_client = - redis::Client::open(redis_url).context("Failed to create Redis client")?; - - Ok(Self { api, redis_client }) - } - - /// Scrapes all courses and stores them in Redis - pub async fn scrape_all(&self, term: &str) -> Result<()> { - // Get all subjects - let subjects = self - .api - .get_subjects("", term, 1, 100) - .await - .context("Failed to get subjects for scraping")?; - - if subjects.is_empty() { - return Err(anyhow::anyhow!("no subjects found for term {term}")); - } - - // Categorize subjects - let (priority_subjects, other_subjects): (Vec<_>, Vec<_>) = subjects - .into_iter() - .partition(|subject| PRIORITY_MAJORS.contains(&subject.code.as_str())); - - // Get expired subjects that need scraping - let mut expired_subjects = Vec::new(); - expired_subjects.extend(self.get_expired_subjects(&priority_subjects, term).await?); - expired_subjects.extend(self.get_expired_subjects(&other_subjects, term).await?); - - if expired_subjects.is_empty() { - info!("no expired subjects found, skipping scrape"); - return Ok(()); - } - - info!( - "scraping {count} subjects for term {term}", - count = expired_subjects.len() - ); - - // Scrape each expired subject - for subject in expired_subjects { - if let Err(e) = self.scrape_subject(&subject.code, term).await { - error!( - "failed to scrape subject {subject}: {e}", - subject = subject.code - ); - } - - // Rate limiting between subjects - time::sleep(Duration::from_secs(2)).await; - } - - Ok(()) - } - - /// Gets subjects that have expired and need to be scraped - async fn get_expired_subjects(&self, subjects: &[Pair], term: &str) -> Result> { - let mut conn = self - .redis_client - .get_multiplexed_async_connection() - .await - .context("Failed to get Redis connection")?; - - let mut expired = Vec::new(); - - for subject in subjects { - let key = format!("scraped:{code}:{term}", code = subject.code); - let scraped: Option = conn - .get(&key) - .await - .context("Failed to check scrape status in Redis")?; - - // If not scraped or marked as expired (empty/0), add to list - if scraped.is_none() || scraped.as_deref() == Some("0") { - expired.push(subject.clone()); - } - } - - Ok(expired) - } - - /// Scrapes all courses for a specific subject - pub async fn scrape_subject(&self, subject: &str, term: &str) -> Result<()> { - let mut offset = 0; - let mut total_courses = 0; - - loop { - let query = SearchQuery::new() - .subject(subject) - .offset(offset) - .max_results(MAX_PAGE_SIZE * 2); - - // Ensure session term is selected before searching - self.api.select_term(term).await?; - - let result = self - .api - .search(term, &query, "subjectDescription", false) - .await - .with_context(|| { - format!("failed to search for subject {subject} at offset {offset}") - })?; - - if !result.success { - return Err(anyhow::anyhow!( - "search marked unsuccessful for subject {subject}" - )); - } - - let course_count = result.data.as_ref().map(|v| v.len() as i32).unwrap_or(0); - total_courses += course_count; - - debug!( - "retrieved {count} courses for subject {subject} at offset {offset}", - count = course_count - ); - - // Store each course in Redis - for course in result.data.unwrap_or_default() { - if let Err(e) = self.store_course(&course).await { - error!( - "failed to store course {crn}: {e}", - crn = course.course_reference_number - ); - } - } - - // Check if we got a full page and should continue - if course_count >= MAX_PAGE_SIZE { - if course_count > MAX_PAGE_SIZE { - warn!( - "course count {count} exceeds max page size {max_page_size}", - count = course_count, - max_page_size = MAX_PAGE_SIZE - ); - } - - offset += MAX_PAGE_SIZE; - debug!("continuing to next page for subject {subject} at offset {offset}"); - - // Rate limiting between pages - time::sleep(Duration::from_secs(3)).await; - continue; - } - - break; - } - - info!( - "scraped {count} total courses for subject {subject}", - count = total_courses - ); - - // Mark subject as scraped with expiry - self.mark_subject_scraped(subject, term, total_courses) - .await?; - - Ok(()) - } - - /// Stores a course in Redis - async fn store_course(&self, course: &Course) -> Result<()> { - let mut conn = self - .redis_client - .get_multiplexed_async_connection() - .await - .context("Failed to get Redis connection")?; - - let key = format!("class:{crn}", crn = course.course_reference_number); - let serialized = serde_json::to_string(course).context("Failed to serialize course")?; - - let _: () = conn - .set(&key, serialized) - .await - .context("Failed to store course in Redis")?; - - Ok(()) - } - - /// Marks a subject as scraped with appropriate expiry time - async fn mark_subject_scraped( - &self, - subject: &str, - term: &str, - course_count: i32, - ) -> Result<()> { - let mut conn = self - .redis_client - .get_multiplexed_async_connection() - .await - .context("Failed to get Redis connection")?; - - let key = format!("scraped:{subject}:{term}", subject = subject); - let expiry = self.calculate_expiry(subject, course_count); - - let value = if course_count == 0 { -1 } else { course_count }; - - let _: () = conn - .set_ex(&key, value, expiry.as_secs()) - .await - .context("Failed to mark subject as scraped")?; - - debug!( - "marked subject {subject} as scraped with {count} courses, expiry: {expiry:?}", - subject = subject, - count = course_count, - expiry = expiry - ); - - Ok(()) - } - - /// Calculates expiry time for a scraped subject based on various factors - fn calculate_expiry(&self, subject: &str, course_count: i32) -> Duration { - // Base calculation: 1 hour per 100 courses - let mut base_expiry = Duration::from_secs(3600 * (course_count as u64 / 100).max(1)); - - // Special handling for subjects with few courses - if course_count < 50 { - // Linear interpolation: 1 course = 12 hours, 49 courses = 1 hour - let hours = 12.0 - ((course_count as f64 - 1.0) / 48.0) * 11.0; - base_expiry = Duration::from_secs((hours * 3600.0) as u64); - } - - // Priority subjects get shorter expiry (more frequent updates) - if PRIORITY_MAJORS.contains(&subject) { - base_expiry /= 3; - } - - // Add random variance (±15%) - let variance = (base_expiry.as_secs() as f64 * 0.15) as u64; - let random_offset = (rand::random::() - 0.5) * 2.0 * variance as f64; - - let final_expiry = if random_offset > 0.0 { - base_expiry + Duration::from_secs(random_offset as u64) - } else { - base_expiry.saturating_sub(Duration::from_secs((-random_offset) as u64)) - }; - - // Ensure minimum of 1 hour - final_expiry.max(Duration::from_secs(3600)) - } - - /// Gets a course from Redis cache - pub async fn get_course(&self, crn: &str) -> Result> { - let mut conn = self - .redis_client - .get_multiplexed_async_connection() - .await - .context("Failed to get Redis connection")?; - - let key = format!("class:{crn}"); - let serialized: Option = conn - .get(&key) - .await - .context("Failed to get course from Redis")?; - - match serialized { - Some(data) => { - let course: Course = serde_json::from_str(&data) - .context("Failed to deserialize course from Redis")?; - Ok(Some(course)) - } - None => Ok(None), - } - } -} diff --git a/src/data/models.rs b/src/data/models.rs index 0fbd31f..449556a 100644 --- a/src/data/models.rs +++ b/src/data/models.rs @@ -1,8 +1,9 @@ //! Diesel models for the database schema. -use crate::data::schema::{course_audits, course_metrics, courses}; +use crate::data::schema::{course_audits, course_metrics, courses, scrape_jobs}; use chrono::{DateTime, Utc}; -use diesel::{Insertable, Queryable, Selectable}; +use diesel::{Insertable, Queryable, QueryableByName, Selectable}; +use serde_json::Value; #[derive(Queryable, Selectable)] #[diesel(table_name = courses)] @@ -78,3 +79,45 @@ pub struct NewCourseAudit<'a> { pub old_value: &'a str, pub new_value: &'a str, } + +/// The priority level of a scrape job. +#[derive(diesel_derive_enum::DbEnum, Copy, Debug, Clone)] +pub enum ScrapePriority { + Low, + Medium, + High, + Critical, +} + +/// The type of target for a scrape job, determining how the payload is interpreted. +#[derive(diesel_derive_enum::DbEnum, Copy, Debug, Clone)] +pub enum TargetType { + Subject, + CourseRange, + CrnList, + SingleCrn, +} + +/// Represents a queryable job from the database. +#[derive(Debug, Clone, Queryable, QueryableByName)] +#[diesel(table_name = scrape_jobs)] +pub struct ScrapeJob { + pub id: i32, + pub target_type: TargetType, + pub target_payload: Value, + pub priority: ScrapePriority, + pub execute_at: DateTime, + pub created_at: DateTime, + pub locked_at: Option>, +} + +/// Represents a new job to be inserted into the database. +#[derive(Debug, Clone, Insertable)] +#[diesel(table_name = scrape_jobs)] +pub struct NewScrapeJob { + pub target_type: TargetType, + #[diesel(sql_type = diesel::sql_types::Jsonb)] + pub target_payload: Value, + pub priority: ScrapePriority, + pub execute_at: DateTime, +} diff --git a/src/data/schema.rs b/src/data/schema.rs index 0575af9..ee4998a 100644 --- a/src/data/schema.rs +++ b/src/data/schema.rs @@ -1,3 +1,30 @@ +pub mod sql_types { + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "scrape_priority"))] + pub struct ScrapePriority; + + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "target_type"))] + pub struct TargetType; +} + +use super::models::{ScrapePriorityMapping, TargetTypeMapping}; + +diesel::table! { + use diesel::sql_types::*; + use super::{ScrapePriorityMapping, TargetTypeMapping}; + + scrape_jobs (id) { + id -> Int4, + target_type -> TargetTypeMapping, + target_payload -> Jsonb, + priority -> ScrapePriorityMapping, + execute_at -> Timestamptz, + created_at -> Timestamptz, + locked_at -> Nullable, + } +} + diesel::table! { courses (id) { id -> Int4, @@ -39,4 +66,4 @@ diesel::table! { diesel::joinable!(course_metrics -> courses (course_id)); diesel::joinable!(course_audits -> courses (course_id)); -diesel::allow_tables_to_appear_in_same_query!(courses, course_metrics, course_audits,); +diesel::allow_tables_to_appear_in_same_query!(courses, course_metrics, course_audits, scrape_jobs,); diff --git a/src/lib.rs b/src/lib.rs index ad75f5d..d096e02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,5 +3,6 @@ pub mod banner; pub mod bot; pub mod data; pub mod error; +pub mod scraper; pub mod services; pub mod web; diff --git a/src/main.rs b/src/main.rs index e5b0426..3cb78fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,6 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber}; use crate::app_state::AppState; use crate::banner::BannerApi; -use crate::banner::scraper::CourseScraper; use crate::bot::{Data, get_commands}; use crate::config::Config; use crate::services::manager::ServiceManager; @@ -80,14 +79,9 @@ async fn main() { let app_state = AppState::new(banner_api_arc.clone(), &config.redis_url) .expect("Failed to create AppState"); - // Create CourseScraper for web service - let scraper = CourseScraper::new(banner_api_arc.clone(), &config.redis_url) - .expect("Failed to create CourseScraper"); - // Create BannerState for web service let banner_state = BannerState { api: banner_api_arc, - scraper: Arc::new(scraper), }; // Configure the client with your Discord bot token in the environment diff --git a/src/web/routes.rs b/src/web/routes.rs index b866272..4bd1db4 100644 --- a/src/web/routes.rs +++ b/src/web/routes.rs @@ -5,11 +5,12 @@ use serde_json::{Value, json}; use std::sync::Arc; use tracing::info; +use crate::banner::BannerApi; + /// Shared application state for web server #[derive(Clone)] pub struct BannerState { - pub api: Arc, - pub scraper: Arc, + pub api: Arc, } /// Creates the web server router