diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 9ae8f87..d797014 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -1,5 +1,5 @@ -use crate::app_state::AppState; use crate::error::Error; +use crate::state::AppState; pub mod commands; pub mod utils; diff --git a/src/config/mod.rs b/src/config/mod.rs index 62433c1..90ab3a0 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -11,6 +11,14 @@ use std::time::Duration; /// Application configuration loaded from environment variables #[derive(Deserialize)] pub struct Config { + /// Log level for the application + /// + /// This value is used to set the log level for this application's target specifically. + /// e.g. "debug" would be similar to "warn,banner=debug,..." + /// + /// Valid values are: "trace", "debug", "info", "warn", "error" + /// Defaults to "info" if not specified + pub log_level: String, /// Discord bot token for authentication pub bot_token: String, /// Port for the web server @@ -24,8 +32,6 @@ pub struct Config { pub banner_base_url: String, /// Target Discord guild ID where the bot operates pub bot_target_guild: u64, - /// Discord application ID - pub bot_app_id: u64, /// Graceful shutdown timeout duration /// /// Accepts both numeric values (seconds) and duration strings diff --git a/src/data/mod.rs b/src/data/mod.rs index b2f4caa..ded2bd5 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,4 +1,3 @@ //! Database models and schema. pub mod models; -pub mod schema; diff --git a/src/lib.rs b/src/lib.rs index 382ce16..e26d55e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -pub mod app_state; pub mod banner; pub mod bot; pub mod config; @@ -6,4 +5,5 @@ pub mod data; pub mod error; pub mod scraper; pub mod services; +pub mod state; pub mod web; diff --git a/src/main.rs b/src/main.rs index 8b7ca7e..398bc51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,28 @@ -use serenity::all::{CacheHttp, ClientBuilder, GatewayIntents}; +use serenity::all::{ClientBuilder, GatewayIntents}; use tokio::signal; use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; -use crate::app_state::AppState; use crate::banner::BannerApi; use crate::bot::{Data, get_commands}; use crate::config::Config; +use crate::scraper::ScraperService; use crate::services::manager::ServiceManager; use crate::services::{ServiceResult, bot::BotService, web::WebService}; +use crate::state::AppState; use crate::web::routes::BannerState; use figment::{Figment, providers::Env}; +use sqlx::postgres::PgPoolOptions; use std::sync::Arc; -mod app_state; mod banner; mod bot; mod config; mod data; mod error; +mod scraper; mod services; +mod state; mod web; #[tokio::main] @@ -52,14 +55,22 @@ async fn main() { } else { "production" }, - "starting banner system" + "starting banner" ); let config: Config = Figment::new() + .merge(Env::raw().only(&["DATABASE_URL"])) .merge(Env::prefixed("APP_")) .extract() .expect("Failed to load config"); + // Create database connection pool + let db_pool = PgPoolOptions::new() + .max_connections(10) + .connect(&config.database_url) + .await + .expect("Failed to create database pool"); + info!( port = config.port, shutdown_timeout = format!("{:.2?}", config.shutdown_timeout), @@ -70,10 +81,6 @@ async fn main() { // Create BannerApi and AppState let banner_api = BannerApi::new(config.banner_base_url.clone()).expect("Failed to create BannerApi"); - banner_api - .setup() - .await - .expect("Failed to set up BannerApi session"); let banner_api_arc = Arc::new(banner_api); let app_state = AppState::new(banner_api_arc.clone(), &config.redis_url) @@ -81,7 +88,7 @@ async fn main() { // Create BannerState for web service let banner_state = BannerState { - api: banner_api_arc, + api: banner_api_arc.clone(), }; // Configure the client with your Discord bot token in the environment @@ -168,9 +175,11 @@ async fn main() { // Register services with the manager let bot_service = Box::new(BotService::new(client)); let web_service = Box::new(WebService::new(port, banner_state)); + let scraper_service = Box::new(ScraperService::new(db_pool.clone(), banner_api_arc.clone())); service_manager.register_service("bot", bot_service); service_manager.register_service("web", web_service); + service_manager.register_service("scraper", scraper_service); // Spawn all registered services service_manager.spawn_all(); diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs new file mode 100644 index 0000000..16c3281 --- /dev/null +++ b/src/scraper/mod.rs @@ -0,0 +1,87 @@ +pub mod scheduler; +pub mod worker; + +use crate::banner::BannerApi; +use sqlx::PgPool; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::info; + +use self::scheduler::Scheduler; +use self::worker::Worker; +use crate::services::Service; + +/// The main service that will be managed by the application's `ServiceManager`. +/// +/// It holds the shared resources (database pool, API client) and manages the +/// lifecycle of the Scheduler and Worker tasks. +pub struct ScraperService { + db_pool: PgPool, + banner_api: Arc, + scheduler_handle: Option>, + worker_handles: Vec>, +} + +impl ScraperService { + /// Creates a new `ScraperService`. + pub fn new(db_pool: PgPool, banner_api: Arc) -> Self { + Self { + db_pool, + banner_api, + scheduler_handle: None, + worker_handles: Vec::new(), + } + } + + /// Starts the scheduler and a pool of workers. + pub fn start(&mut self) { + info!("ScraperService starting..."); + + let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone()); + let scheduler_handle = tokio::spawn(async move { + scheduler.run().await; + }); + self.scheduler_handle = Some(scheduler_handle); + info!("Scheduler task spawned."); + + let worker_count = 4; // This could be configurable + for i in 0..worker_count { + let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone()); + let worker_handle = tokio::spawn(async move { + worker.run().await; + }); + self.worker_handles.push(worker_handle); + } + info!("Spawned {} worker tasks.", self.worker_handles.len()); + } + + /// Signals all child tasks to gracefully shut down. + pub async fn shutdown(&mut self) { + info!("Shutting down scraper service..."); + if let Some(handle) = self.scheduler_handle.take() { + handle.abort(); + } + for handle in self.worker_handles.drain(..) { + handle.abort(); + } + info!("Scraper service shutdown."); + } +} + +#[async_trait::async_trait] +impl Service for ScraperService { + fn name(&self) -> &'static str { + "scraper" + } + + async fn run(&mut self) -> Result<(), anyhow::Error> { + self.start(); + std::future::pending::<()>().await; + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), anyhow::Error> { + self.shutdown().await; + Ok(()) + } +} diff --git a/src/scraper/scheduler.rs b/src/scraper/scheduler.rs new file mode 100644 index 0000000..58c272e --- /dev/null +++ b/src/scraper/scheduler.rs @@ -0,0 +1,85 @@ +use crate::banner::{BannerApi, Term}; +use crate::data::models::{ScrapePriority, TargetType}; +use crate::error::Result; +use serde_json::json; +use sqlx::PgPool; +use std::sync::Arc; +use std::time::Duration; +use tokio::time; +use tracing::{error, info}; + +/// Periodically analyzes data and enqueues prioritized scrape jobs. +pub struct Scheduler { + db_pool: PgPool, + banner_api: Arc, +} + +impl Scheduler { + pub fn new(db_pool: PgPool, banner_api: Arc) -> Self { + Self { + db_pool, + banner_api, + } + } + + /// Runs the scheduler's main loop. + pub async fn run(&self) { + info!("Scheduler service started."); + let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute + + loop { + interval.tick().await; + info!("Scheduler waking up to analyze and schedule jobs..."); + if let Err(e) = self.schedule_jobs().await { + error!(error = ?e, "Failed to schedule jobs"); + } + } + } + + /// The core logic for deciding what jobs to create. + async fn schedule_jobs(&self) -> Result<()> { + // For now, we will implement a simple baseline scheduling strategy: + // 1. Get a list of all subjects from the Banner API. + // 2. For each subject, check if an active (not locked, not completed) job already exists. + // 3. If no job exists, create a new, low-priority job to be executed in the near future. + let term = Term::get_current().inner().to_string(); + + info!( + term = term, + "[Scheduler] Enqueuing baseline subject scrape jobs..." + ); + + let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?; + + for subject in subjects { + let payload = json!({ "subject": subject.code }); + + let existing_job: Option<(i32,)> = sqlx::query_as( + "SELECT id FROM scrape_jobs WHERE target_type = $1 AND target_payload = $2 AND locked_at IS NULL" + ) + .bind(TargetType::Subject) + .bind(&payload) + .fetch_optional(&self.db_pool) + .await?; + + if existing_job.is_some() { + continue; + } + + sqlx::query( + "INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)" + ) + .bind(TargetType::Subject) + .bind(&payload) + .bind(ScrapePriority::Low) + .bind(chrono::Utc::now()) + .execute(&self.db_pool) + .await?; + + info!(subject = subject.code, "[Scheduler] Enqueued new job"); + } + + info!("[Scheduler] Job scheduling complete."); + Ok(()) + } +} diff --git a/src/scraper/worker.rs b/src/scraper/worker.rs new file mode 100644 index 0000000..d2994d2 --- /dev/null +++ b/src/scraper/worker.rs @@ -0,0 +1,205 @@ +use crate::banner::{BannerApi, BannerApiError, Course, SearchQuery, Term}; +use crate::data::models::ScrapeJob; +use crate::error::Result; +use serde_json::Value; +use sqlx::PgPool; +use std::sync::Arc; +use std::time::Duration; +use tokio::time; +use tracing::{error, info, warn}; + +/// A single worker instance. +/// +/// Each worker runs in its own asynchronous task and continuously polls the +/// database for scrape jobs to execute. +pub struct Worker { + id: usize, // For logging purposes + db_pool: PgPool, + banner_api: Arc, +} + +impl Worker { + pub fn new(id: usize, db_pool: PgPool, banner_api: Arc) -> Self { + Self { + id, + db_pool, + banner_api, + } + } + + /// Runs the worker's main loop. + pub async fn run(&self) { + info!(worker_id = self.id, "Worker started."); + loop { + match self.fetch_and_lock_job().await { + Ok(Some(job)) => { + let job_id = job.id; + info!(worker_id = self.id, job_id = job.id, "Processing job"); + if let Err(e) = self.process_job(job).await { + // Check if the error is due to an invalid session + if let Some(BannerApiError::InvalidSession) = + e.downcast_ref::() + { + warn!( + worker_id = self.id, + job_id, "Invalid session detected. Forcing session refresh." + ); + } else { + error!(worker_id = self.id, job_id, error = ?e, "Failed to process job"); + } + + // Unlock the job so it can be retried + if let Err(unlock_err) = self.unlock_job(job_id).await { + error!( + worker_id = self.id, + job_id, + ?unlock_err, + "Failed to unlock job" + ); + } + } else { + info!(worker_id = self.id, job_id, "Job processed successfully"); + // If successful, delete the job. + if let Err(delete_err) = self.delete_job(job_id).await { + error!( + worker_id = self.id, + job_id, + ?delete_err, + "Failed to delete job" + ); + } + } + } + Ok(None) => { + // No job found, wait for a bit before polling again. + time::sleep(Duration::from_secs(5)).await; + } + Err(e) => { + warn!(worker_id = self.id, error = ?e, "Failed to fetch job"); + // Wait before retrying to avoid spamming errors. + time::sleep(Duration::from_secs(10)).await; + } + } + } + } + + /// Atomically fetches a job from the queue, locking it for processing. + /// + /// This uses a `FOR UPDATE SKIP LOCKED` query to ensure that multiple + /// workers can poll the queue concurrently without conflicts. + async fn fetch_and_lock_job(&self) -> Result> { + let mut tx = self.db_pool.begin().await?; + + let job = sqlx::query_as::<_, ScrapeJob>( + "SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED" + ) + .fetch_optional(&mut *tx) + .await?; + + if let Some(ref job) = job { + sqlx::query("UPDATE scrape_jobs SET locked_at = NOW() WHERE id = $1") + .bind(job.id) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + + Ok(job) + } + + async fn process_job(&self, job: ScrapeJob) -> Result<()> { + match job.target_type { + crate::data::models::TargetType::Subject => { + self.process_subject_job(&job.target_payload).await + } + _ => { + warn!(worker_id = self.id, job_id = job.id, "unhandled job type"); + Ok(()) + } + } + } + + async fn process_subject_job(&self, payload: &Value) -> Result<()> { + let subject_code = payload["subject"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Invalid subject payload"))?; + info!( + worker_id = self.id, + subject = subject_code, + "Processing subject job" + ); + + let term = Term::get_current().inner().to_string(); + let query = SearchQuery::new().subject(subject_code).max_results(500); + + let search_result = self + .banner_api + .search(&term, &query, "subjectDescription", false) + .await?; + + if let Some(courses_from_api) = search_result.data { + info!( + worker_id = self.id, + subject = subject_code, + count = courses_from_api.len(), + "Found courses to upsert" + ); + for course in courses_from_api { + self.upsert_course(&course).await?; + } + } + + Ok(()) + } + + async fn upsert_course(&self, course: &Course) -> Result<()> { + sqlx::query( + r#" + INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (crn, term_code) DO UPDATE SET + subject = EXCLUDED.subject, + course_number = EXCLUDED.course_number, + title = EXCLUDED.title, + enrollment = EXCLUDED.enrollment, + max_enrollment = EXCLUDED.max_enrollment, + wait_count = EXCLUDED.wait_count, + wait_capacity = EXCLUDED.wait_capacity, + last_scraped_at = EXCLUDED.last_scraped_at + "#, + ) + .bind(&course.course_reference_number) + .bind(&course.subject) + .bind(&course.course_number) + .bind(&course.course_title) + .bind(&course.term) + .bind(course.enrollment) + .bind(course.maximum_enrollment) + .bind(course.wait_count) + .bind(course.wait_capacity) + .bind(chrono::Utc::now()) + .execute(&self.db_pool) + .await?; + + Ok(()) + } + + async fn delete_job(&self, job_id: i32) -> Result<()> { + sqlx::query("DELETE FROM scrape_jobs WHERE id = $1") + .bind(job_id) + .execute(&self.db_pool) + .await?; + info!(worker_id = self.id, job_id, "Job deleted"); + Ok(()) + } + + async fn unlock_job(&self, job_id: i32) -> Result<()> { + sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE id = $1") + .bind(job_id) + .execute(&self.db_pool) + .await?; + info!(worker_id = self.id, job_id, "Job unlocked after failure"); + Ok(()) + } +} diff --git a/src/services/manager.rs b/src/services/manager.rs index 0dc85a5..ce350b3 100644 --- a/src/services/manager.rs +++ b/src/services/manager.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::time::Duration; use tokio::sync::broadcast; use tokio::task::JoinHandle; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::services::{Service, ServiceResult, run_service}; @@ -36,6 +36,7 @@ impl ServiceManager { for (name, service) in self.registered_services.drain() { let shutdown_rx = self.shutdown_tx.subscribe(); let handle = tokio::spawn(run_service(service, shutdown_rx)); + trace!(service = name, id = ?handle.id(), "service spawned",); self.running_services.insert(name, handle); } diff --git a/src/app_state.rs b/src/state.rs similarity index 100% rename from src/app_state.rs rename to src/state.rs