feat: implement comprehensive course data model with reference cache and search

This commit is contained in:
2026-01-28 21:06:29 -06:00
parent e3b855b956
commit 6df4303bd6
16 changed files with 1121 additions and 76 deletions
+17 -4
View File
@@ -4,10 +4,11 @@ pub mod worker;
use crate::banner::BannerApi;
use crate::services::Service;
use crate::state::ReferenceCache;
use crate::status::{ServiceStatus, ServiceStatusRegistry};
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::{RwLock, broadcast};
use tokio::task::JoinHandle;
use tracing::{info, warn};
@@ -21,6 +22,7 @@ use self::worker::Worker;
pub struct ScraperService {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
service_statuses: ServiceStatusRegistry,
scheduler_handle: Option<JoinHandle<()>>,
worker_handles: Vec<JoinHandle<()>>,
@@ -29,10 +31,16 @@ pub struct ScraperService {
impl ScraperService {
/// Creates a new `ScraperService`.
pub fn new(db_pool: PgPool, banner_api: Arc<BannerApi>, service_statuses: ServiceStatusRegistry) -> Self {
pub fn new(
db_pool: PgPool,
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
service_statuses: ServiceStatusRegistry,
) -> Self {
Self {
db_pool,
banner_api,
reference_cache,
service_statuses,
scheduler_handle: None,
worker_handles: Vec::new(),
@@ -48,7 +56,11 @@ impl ScraperService {
let (shutdown_tx, _) = broadcast::channel(1);
self.shutdown_tx = Some(shutdown_tx.clone());
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
let scheduler = Scheduler::new(
self.db_pool.clone(),
self.banner_api.clone(),
self.reference_cache.clone(),
);
let shutdown_rx = shutdown_tx.subscribe();
let scheduler_handle = tokio::spawn(async move {
scheduler.run(shutdown_rx).await;
@@ -86,7 +98,8 @@ impl Service for ScraperService {
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
self.service_statuses.set("scraper", ServiceStatus::Disabled);
self.service_statuses
.set("scraper", ServiceStatus::Disabled);
info!("Shutting down scraper service");
// Send shutdown signal to all tasks
+122 -8
View File
@@ -1,28 +1,38 @@
use crate::banner::{BannerApi, Term};
use crate::data::models::{ScrapePriority, TargetType};
use crate::data::models::{ReferenceData, ScrapePriority, TargetType};
use crate::data::scrape_jobs;
use crate::error::Result;
use crate::scraper::jobs::subject::SubjectJob;
use crate::state::ReferenceCache;
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, broadcast};
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
/// How often reference data is re-scraped (6 hours).
const REFERENCE_DATA_INTERVAL: Duration = Duration::from_secs(6 * 60 * 60);
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
}
impl Scheduler {
pub fn new(db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
pub fn new(
db_pool: PgPool,
banner_api: Arc<BannerApi>,
reference_cache: Arc<RwLock<ReferenceCache>>,
) -> Self {
Self {
db_pool,
banner_api,
reference_cache,
}
}
@@ -41,26 +51,35 @@ impl Scheduler {
let work_interval = Duration::from_secs(60);
let mut next_run = time::Instant::now();
let mut current_work: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = None;
// Scrape reference data immediately on first cycle
let mut last_ref_scrape = Instant::now() - REFERENCE_DATA_INTERVAL;
loop {
tokio::select! {
_ = time::sleep_until(next_run) => {
let cancel_token = CancellationToken::new();
let should_scrape_ref = last_ref_scrape.elapsed() >= REFERENCE_DATA_INTERVAL;
// Spawn work in separate task to allow graceful cancellation during shutdown.
// Without this, shutdown would have to wait for the full scheduling cycle.
let work_handle = tokio::spawn({
let db_pool = self.db_pool.clone();
let banner_api = self.banner_api.clone();
let cancel_token = cancel_token.clone();
let reference_cache = self.reference_cache.clone();
async move {
tokio::select! {
result = Self::schedule_jobs_impl(&db_pool, &banner_api) => {
if let Err(e) = result {
_ = async {
if should_scrape_ref
&& let Err(e) = Self::scrape_reference_data(&db_pool, &banner_api, &reference_cache).await
{
error!(error = ?e, "Failed to scrape reference data");
}
if let Err(e) = Self::schedule_jobs_impl(&db_pool, &banner_api).await {
error!(error = ?e, "Failed to schedule jobs");
}
}
} => {}
_ = cancel_token.cancelled() => {
debug!("Scheduling work cancelled gracefully");
}
@@ -68,6 +87,10 @@ impl Scheduler {
}
});
if should_scrape_ref {
last_ref_scrape = Instant::now();
}
current_work = Some((work_handle, cancel_token));
next_run = time::Instant::now() + work_interval;
}
@@ -170,4 +193,95 @@ impl Scheduler {
debug!("Job scheduling complete");
Ok(())
}
/// Scrape all reference data categories from Banner and upsert to DB, then refresh cache.
#[tracing::instrument(skip_all)]
async fn scrape_reference_data(
db_pool: &PgPool,
banner_api: &BannerApi,
reference_cache: &Arc<RwLock<ReferenceCache>>,
) -> Result<()> {
let term = Term::get_current().inner().to_string();
info!(term = %term, "Scraping reference data");
let mut all_entries = Vec::new();
// Subjects
match banner_api.get_subjects("", &term, 1, 500).await {
Ok(pairs) => {
debug!(count = pairs.len(), "Fetched subjects");
all_entries.extend(pairs.into_iter().map(|p| ReferenceData {
category: "subject".to_string(),
code: p.code,
description: p.description,
}));
}
Err(e) => warn!(error = ?e, "Failed to fetch subjects"),
}
// Campuses
match banner_api.get_campuses(&term).await {
Ok(pairs) => {
debug!(count = pairs.len(), "Fetched campuses");
all_entries.extend(pairs.into_iter().map(|p| ReferenceData {
category: "campus".to_string(),
code: p.code,
description: p.description,
}));
}
Err(e) => warn!(error = ?e, "Failed to fetch campuses"),
}
// Instructional methods
match banner_api.get_instructional_methods(&term).await {
Ok(pairs) => {
debug!(count = pairs.len(), "Fetched instructional methods");
all_entries.extend(pairs.into_iter().map(|p| ReferenceData {
category: "instructional_method".to_string(),
code: p.code,
description: p.description,
}));
}
Err(e) => warn!(error = ?e, "Failed to fetch instructional methods"),
}
// Parts of term
match banner_api.get_parts_of_term(&term).await {
Ok(pairs) => {
debug!(count = pairs.len(), "Fetched parts of term");
all_entries.extend(pairs.into_iter().map(|p| ReferenceData {
category: "part_of_term".to_string(),
code: p.code,
description: p.description,
}));
}
Err(e) => warn!(error = ?e, "Failed to fetch parts of term"),
}
// Attributes
match banner_api.get_attributes(&term).await {
Ok(pairs) => {
debug!(count = pairs.len(), "Fetched attributes");
all_entries.extend(pairs.into_iter().map(|p| ReferenceData {
category: "attribute".to_string(),
code: p.code,
description: p.description,
}));
}
Err(e) => warn!(error = ?e, "Failed to fetch attributes"),
}
// Batch upsert all entries
let total = all_entries.len();
crate::data::reference::batch_upsert(&all_entries, db_pool).await?;
info!(total_entries = total, "Reference data upserted to DB");
// Refresh in-memory cache
let all = crate::data::reference::get_all(db_pool).await?;
let count = all.len();
*reference_cache.write().await = ReferenceCache::from_entries(all);
info!(entries = count, "Reference cache refreshed");
Ok(())
}
}