Compare commits

..

12 Commits

36 changed files with 2184 additions and 913 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.env
/target
/go/
/go/
.cargo/config.toml

934
Cargo.lock generated
View File

File diff suppressed because it is too large Load Diff

View File

@@ -2,33 +2,46 @@
name = "banner"
version = "0.1.0"
edition = "2024"
default-run = "banner"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
axum = "0.8.4"
serenity = { version = "0.12.4", features = ["rustls_backend"] }
reqwest = { version = "0.12.23", features = ["json", "cookies"] }
diesel = { version = "2.2.12", features = ["chrono", "postgres", "r2d2", "uuid", "serde_json"] }
redis = { version = "0.32.5", features = ["tokio-comp"] }
figment = { version = "0.10.19", features = ["toml", "env"] }
serde_json = "1.0.143"
serde = { version = "1.0.219", features = ["derive"] }
governor = "0.10.1"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
dotenvy = "0.15.7"
poise = "0.6.1"
async-trait = "0.1"
fundu = "2.0.1"
anyhow = "1.0.99"
thiserror = "2.0.16"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.8"
rand = "0.8"
regex = "1.10"
url = "2.5"
async-trait = "0.1"
axum = "0.8.4"
bitflags = { version = "2.9.4", features = ["serde"] }
chrono = { version = "0.4.42", features = ["serde"] }
compile-time = "0.2.0"
cookie = "0.18.1"
dashmap = "6.1.0"
dotenvy = "0.15.7"
figment = { version = "0.10.19", features = ["toml", "env"] }
fundu = "2.0.1"
futures = "0.3"
http = "1.3.1"
poise = "0.6.1"
rand = "0.9.2"
redis = { version = "0.32.5", features = ["tokio-comp", "r2d2"] }
regex = "1.10"
reqwest = { version = "0.12.23", features = ["json", "cookies"] }
reqwest-middleware = { version = "0.4.2", features = ["json"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.143"
serenity = { version = "0.12.4", features = ["rustls_backend"] }
sqlx = { version = "0.8.6", features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
"json",
"macros",
] }
thiserror = "2.0.16"
time = "0.3.41"
bitflags = { version = "2.9.3", features = ["serde"] }
diesel_derives = "2.2.7"
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
tokio = { version = "1.47.1", features = ["full"] }
tl = "0.7.8"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] }
url = "2.5"
governor = "0.10.1"
once_cell = "1.21.3"
[dev-dependencies]

77
Dockerfile Normal file
View File

@@ -0,0 +1,77 @@
# Build Stage
ARG RUST_VERSION=1.86.0
FROM rust:${RUST_VERSION}-bookworm AS builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /usr/src
RUN USER=root cargo new --bin banner
WORKDIR /usr/src/banner
# Copy dependency files for better layer caching
COPY ./Cargo.toml ./Cargo.lock* ./
# Build empty app with downloaded dependencies to produce a stable image layer for next build
RUN cargo build --release
# Build web app with own code
RUN rm src/*.rs
COPY ./src ./src
RUN rm ./target/release/deps/banner*
RUN cargo build --release
# Strip the binary to reduce size
RUN strip target/release/banner
# Runtime Stage - Debian slim for glibc compatibility
FROM debian:12-slim
ARG APP=/usr/src/app
ARG APP_USER=appuser
ARG UID=1000
ARG GID=1000
# Install runtime dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
tzdata \
wget \
&& rm -rf /var/lib/apt/lists/*
ARG TZ=Etc/UTC
ENV TZ=${TZ}
# Create user with specific UID/GID
RUN addgroup --gid $GID $APP_USER \
&& adduser --uid $UID --disabled-password --gecos "" --ingroup $APP_USER $APP_USER \
&& mkdir -p ${APP}
# Copy application files
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/target/release/banner ${APP}/banner
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/src/fonts ${APP}/fonts
# Set proper permissions
RUN chmod +x ${APP}/banner
USER $APP_USER
WORKDIR ${APP}
# Build-time arg for PORT, default to 8000
ARG PORT=8000
# Runtime environment var for PORT, default to build-time arg
ENV PORT=${PORT}
EXPOSE ${PORT}
# Add health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:${PORT}/health || exit 1
# Can be explicitly overriden with different hosts & ports
ENV HOSTS=0.0.0.0,[::]
# Implicitly uses PORT environment variable
CMD ["sh", "-c", "exec ./banner --server ${HOSTS}"]

View File

View File

@@ -1,6 +0,0 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();

View File

@@ -1,36 +0,0 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@@ -1,4 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS "courses";
DROP TABLE IF EXISTS "course_metrics";
DROP TABLE IF EXISTS "course_audits";

View File

@@ -1,35 +0,0 @@
-- Your SQL goes here
CREATE TABLE "courses"(
"id" INT4 NOT NULL PRIMARY KEY,
"crn" VARCHAR NOT NULL,
"subject" VARCHAR NOT NULL,
"course_number" VARCHAR NOT NULL,
"title" VARCHAR NOT NULL,
"term_code" VARCHAR NOT NULL,
"enrollment" INT4 NOT NULL,
"max_enrollment" INT4 NOT NULL,
"wait_count" INT4 NOT NULL,
"wait_capacity" INT4 NOT NULL,
"last_scraped_at" TIMESTAMPTZ NOT NULL
);
CREATE TABLE "course_metrics"(
"id" INT4 NOT NULL PRIMARY KEY,
"course_id" INT4 NOT NULL,
"timestamp" TIMESTAMPTZ NOT NULL,
"enrollment" INT4 NOT NULL,
"wait_count" INT4 NOT NULL,
"seats_available" INT4 NOT NULL,
FOREIGN KEY ("course_id") REFERENCES "courses"("id")
);
CREATE TABLE "course_audits"(
"id" INT4 NOT NULL PRIMARY KEY,
"course_id" INT4 NOT NULL,
"timestamp" TIMESTAMPTZ NOT NULL,
"field_changed" VARCHAR NOT NULL,
"old_value" TEXT NOT NULL,
"new_value" TEXT NOT NULL,
FOREIGN KEY ("course_id") REFERENCES "courses"("id")
);

View File

@@ -0,0 +1,56 @@
-- Drop all old tables
DROP TABLE IF EXISTS scrape_jobs;
DROP TABLE IF EXISTS course_metrics;
DROP TABLE IF EXISTS course_audits;
DROP TABLE IF EXISTS courses;
-- Enums for scrape_jobs
CREATE TYPE scrape_priority AS ENUM ('Low', 'Medium', 'High', 'Critical');
CREATE TYPE target_type AS ENUM ('Subject', 'CourseRange', 'CrnList', 'SingleCrn');
-- Main course data table
CREATE TABLE courses (
id SERIAL PRIMARY KEY,
crn VARCHAR NOT NULL,
subject VARCHAR NOT NULL,
course_number VARCHAR NOT NULL,
title VARCHAR NOT NULL,
term_code VARCHAR NOT NULL,
enrollment INTEGER NOT NULL,
max_enrollment INTEGER NOT NULL,
wait_count INTEGER NOT NULL,
wait_capacity INTEGER NOT NULL,
last_scraped_at TIMESTAMPTZ NOT NULL,
UNIQUE(crn, term_code)
);
-- Time-series data for course enrollment
CREATE TABLE course_metrics (
id SERIAL PRIMARY KEY,
course_id INTEGER NOT NULL REFERENCES courses(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL,
enrollment INTEGER NOT NULL,
wait_count INTEGER NOT NULL,
seats_available INTEGER NOT NULL
);
-- Audit trail for changes to course data
CREATE TABLE course_audits (
id SERIAL PRIMARY KEY,
course_id INTEGER NOT NULL REFERENCES courses(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL,
field_changed VARCHAR NOT NULL,
old_value TEXT NOT NULL,
new_value TEXT NOT NULL
);
-- Job queue for the scraper
CREATE TABLE scrape_jobs (
id SERIAL PRIMARY KEY,
target_type target_type NOT NULL,
target_payload JSONB NOT NULL,
priority scrape_priority NOT NULL,
execute_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
locked_at TIMESTAMPTZ
);

View File

@@ -1,72 +1,101 @@
//! Main Banner API client implementation.
use crate::banner::{models::*, query::SearchQuery, session::SessionManager, util::user_agent};
use anyhow::{Context, Result};
use axum::http::HeaderValue;
use reqwest::Client;
use serde_json;
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Mutex},
time::Instant,
};
use tracing::{error, info};
use crate::banner::{
BannerSession, SessionPool, errors::BannerApiError, json::parse_json_with_context,
middleware::TransparentMiddleware, models::*, nonce, query::SearchQuery, util::user_agent,
};
use anyhow::{Context, Result, anyhow};
use cookie::Cookie;
use dashmap::DashMap;
use http::HeaderValue;
use reqwest::{Client, Request, Response};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use serde_json;
use tl;
use tracing::{Level, Metadata, Span, debug, error, field::ValueSet, info, span, trace, warn};
/// Main Banner API client.
#[derive(Debug)]
pub struct BannerApi {
sessions: SessionManager,
http: Client,
pub sessions: SessionPool,
http: ClientWithMiddleware,
base_url: String,
}
impl BannerApi {
/// Creates a new Banner API client.
pub fn new(base_url: String) -> Result<Self> {
let http = Client::builder()
.cookie_store(true)
.user_agent(user_agent())
.tcp_keepalive(Some(std::time::Duration::from_secs(60 * 5)))
.read_timeout(std::time::Duration::from_secs(10))
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to create HTTP client")?;
let session_manager = SessionManager::new(base_url.clone(), http.clone());
let http = ClientBuilder::new(
Client::builder()
.cookie_store(false)
.user_agent(user_agent())
.tcp_keepalive(Some(std::time::Duration::from_secs(60 * 5)))
.read_timeout(std::time::Duration::from_secs(10))
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to create HTTP client")?,
)
.with(TransparentMiddleware)
.build();
Ok(Self {
sessions: session_manager,
sessions: SessionPool::new(http.clone(), base_url.clone()),
http,
base_url,
})
}
/// Sets up the API client by initializing session cookies.
pub async fn setup(&self) -> Result<()> {
info!(base_url = self.base_url, "setting up banner api client");
let result = self.sessions.setup().await;
match &result {
Ok(()) => info!("banner api client setup completed successfully"),
Err(e) => error!(error = ?e, "banner api client setup failed"),
/// Validates offset parameter for search methods.
fn validate_offset(offset: i32) -> Result<()> {
if offset <= 0 {
Err(anyhow::anyhow!("Offset must be greater than 0"))
} else {
Ok(())
}
result
}
/// Retrieves a list of terms from the Banner API.
pub async fn get_terms(
/// Builds common search parameters for list endpoints.
fn build_list_params(
&self,
search: &str,
page: i32,
term: &str,
offset: i32,
max_results: i32,
) -> Result<Vec<BannerTerm>> {
if page <= 0 {
return Err(anyhow::anyhow!("Page must be greater than 0"));
}
session_id: &str,
) -> Vec<(&str, String)> {
vec![
("searchTerm", search.to_string()),
("term", term.to_string()),
("offset", offset.to_string()),
("max", max_results.to_string()),
("uniqueSessionId", session_id.to_string()),
("_", nonce()),
]
}
let url = format!("{}/classSearch/getTerms", self.base_url);
let params = [
("searchTerm", search),
("offset", &page.to_string()),
("max", &max_results.to_string()),
("_", &SessionManager::nonce()),
];
/// Makes a GET request to a list endpoint and parses JSON response.
async fn get_list_endpoint<T>(
&self,
endpoint: &str,
search: &str,
term: &str,
offset: i32,
max_results: i32,
) -> Result<Vec<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
Self::validate_offset(offset)?;
let session = self.sessions.acquire(term.parse()?).await?;
let url = format!("{}/classSearch/{}", self.base_url, endpoint);
let params = self.build_list_params(search, term, offset, max_results, &session.id());
let response = self
.http
@@ -74,14 +103,109 @@ impl BannerApi {
.query(&params)
.send()
.await
.context("Failed to get terms")?;
.with_context(|| format!("Failed to get {}", endpoint))?;
let terms: Vec<BannerTerm> = response
let data: Vec<T> = response
.json()
.await
.context("Failed to parse terms response")?;
.with_context(|| format!("Failed to parse {} response", endpoint))?;
Ok(terms)
Ok(data)
}
/// Builds search parameters for course search methods.
fn build_search_params(
&self,
query: &SearchQuery,
term: &str,
session_id: &str,
sort: &str,
sort_descending: bool,
) -> HashMap<String, String> {
let mut params = query.to_params();
params.insert("txt_term".to_string(), term.to_string());
params.insert("uniqueSessionId".to_string(), session_id.to_string());
params.insert("sortColumn".to_string(), sort.to_string());
params.insert(
"sortDirection".to_string(),
if sort_descending { "desc" } else { "asc" }.to_string(),
);
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
params
}
/// Performs a course search and handles common response processing.
async fn perform_search(
&self,
term: &str,
query: &SearchQuery,
sort: &str,
sort_descending: bool,
) -> Result<SearchResult, BannerApiError> {
let mut session = self.sessions.acquire(term.parse()?).await?;
if session.been_used() {
self.http
.post(format!("{}/classSearch/resetDataForm", self.base_url))
.header("Cookie", session.cookie())
.send()
.await
.map_err(|e| BannerApiError::RequestFailed(e.into()))?;
}
session.touch();
let params = self.build_search_params(query, term, &session.id(), sort, sort_descending);
debug!(
term = term,
query = ?query,
sort = sort,
sort_descending = sort_descending,
"Searching for courses with params: {:?}", params
);
let response = self
.http
.get(format!("{}/searchResults/searchResults", self.base_url))
.header("Cookie", session.cookie())
.query(&params)
.send()
.await
.context("Failed to search courses")?;
let status = response.status();
let url = response.url().clone();
let body = response
.text()
.await
.with_context(|| format!("Failed to read body (status={status})"))?;
let search_result: SearchResult = parse_json_with_context(&body).map_err(|e| {
BannerApiError::RequestFailed(anyhow!(
"Failed to parse search response (status={status}, url={url}): {e}\nBody: {body}"
))
})?;
// Check for signs of an invalid session
if search_result.path_mode.is_none() {
return Err(BannerApiError::InvalidSession(
"Search result path mode is none".to_string(),
));
} else if search_result.data.is_none() {
return Err(BannerApiError::InvalidSession(
"Search result data is none".to_string(),
));
}
if !search_result.success {
return Err(BannerApiError::RequestFailed(anyhow!(
"Search marked as unsuccessful by Banner API"
)));
}
Ok(search_result)
}
/// Retrieves a list of subjects from the Banner API.
@@ -92,35 +216,8 @@ impl BannerApi {
offset: i32,
max_results: i32,
) -> Result<Vec<Pair>> {
if offset <= 0 {
return Err(anyhow::anyhow!("Offset must be greater than 0"));
}
let session_id = self.sessions.ensure_session()?;
let url = format!("{}/classSearch/get_subject", self.base_url);
let params = [
("searchTerm", search),
("term", term),
("offset", &offset.to_string()),
("max", &max_results.to_string()),
("uniqueSessionId", &session_id),
("_", &SessionManager::nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
self.get_list_endpoint("get_subject", search, term, offset, max_results)
.await
.context("Failed to get subjects")?;
let subjects: Vec<Pair> = response
.json()
.await
.context("Failed to parse subjects response")?;
Ok(subjects)
}
/// Retrieves a list of instructors from the Banner API.
@@ -131,35 +228,8 @@ impl BannerApi {
offset: i32,
max_results: i32,
) -> Result<Vec<Instructor>> {
if offset <= 0 {
return Err(anyhow::anyhow!("Offset must be greater than 0"));
}
let session_id = self.sessions.ensure_session()?;
let url = format!("{}/classSearch/get_instructor", self.base_url);
let params = [
("searchTerm", search),
("term", term),
("offset", &offset.to_string()),
("max", &max_results.to_string()),
("uniqueSessionId", &session_id),
("_", &SessionManager::nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
self.get_list_endpoint("get_instructor", search, term, offset, max_results)
.await
.context("Failed to get instructors")?;
let instructors: Vec<Instructor> = response
.json()
.await
.context("Failed to parse instructors response")?;
Ok(instructors)
}
/// Retrieves a list of campuses from the Banner API.
@@ -170,35 +240,8 @@ impl BannerApi {
offset: i32,
max_results: i32,
) -> Result<Vec<Pair>> {
if offset <= 0 {
return Err(anyhow::anyhow!("Offset must be greater than 0"));
}
let session_id = self.sessions.ensure_session()?;
let url = format!("{}/classSearch/get_campus", self.base_url);
let params = [
("searchTerm", search),
("term", term),
("offset", &offset.to_string()),
("max", &max_results.to_string()),
("uniqueSessionId", &session_id),
("_", &SessionManager::nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
self.get_list_endpoint("get_campus", search, term, offset, max_results)
.await
.context("Failed to get campuses")?;
let campuses: Vec<Pair> = response
.json()
.await
.context("Failed to parse campuses response")?;
Ok(campuses)
}
/// Retrieves meeting time information for a course.
@@ -259,95 +302,31 @@ impl BannerApi {
query: &SearchQuery,
sort: &str,
sort_descending: bool,
) -> Result<SearchResult> {
self.sessions.reset_data_form().await?;
let session_id = self.sessions.ensure_session()?;
let mut params = query.to_params();
// Add additional parameters
params.insert("txt_term".to_string(), term.to_string());
params.insert("uniqueSessionId".to_string(), session_id);
params.insert("sortColumn".to_string(), sort.to_string());
params.insert(
"sortDirection".to_string(),
if sort_descending { "desc" } else { "asc" }.to_string(),
);
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
let url = format!("{}/searchResults/searchResults", self.base_url);
let response = self
.http
.get(&url)
.query(&params)
.send()
) -> Result<SearchResult, BannerApiError> {
self.perform_search(term, query, sort, sort_descending)
.await
.context("Failed to search courses")?;
let search_result: SearchResult = response
.json()
.await
.context("Failed to parse search response")?;
if !search_result.success {
return Err(anyhow::anyhow!(
"Search marked as unsuccessful by Banner API"
));
}
Ok(search_result)
}
/// Selects a term for the current session.
pub async fn select_term(&self, term: &str) -> Result<()> {
self.sessions.select_term(term).await
}
/// Retrieves a single course by CRN by issuing a minimal search
pub async fn get_course_by_crn(&self, term: &str, crn: &str) -> Result<Option<Course>> {
self.sessions.reset_data_form().await?;
// Ensure session is configured for this term
self.select_term(term).await?;
let session_id = self.sessions.ensure_session()?;
pub async fn get_course_by_crn(
&self,
term: &str,
crn: &str,
) -> Result<Option<Course>, BannerApiError> {
let query = SearchQuery::new()
.course_reference_number(crn)
.max_results(1);
let mut params = query.to_params();
params.insert("txt_term".to_string(), term.to_string());
params.insert("uniqueSessionId".to_string(), session_id);
params.insert("sortColumn".to_string(), "subjectDescription".to_string());
params.insert("sortDirection".to_string(), "asc".to_string());
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
let search_result = self
.perform_search(term, &query, "subjectDescription", false)
.await?;
let url = format!("{}/searchResults/searchResults", self.base_url);
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.context("Failed to search course by CRN")?;
let status = response.status();
let body = response
.text()
.await
.with_context(|| format!("Failed to read body (status={status})"))?;
let search_result: SearchResult = parse_json_with_context(&body).map_err(|e| {
anyhow::anyhow!(
"Failed to parse search response for CRN (status={status}, url={url}): {e}",
)
})?;
if !search_result.success {
return Err(anyhow::anyhow!(
"Search marked as unsuccessful by Banner API"
// Additional validation for CRN search
if search_result.path_mode == Some("registration".to_string())
&& search_result.data.is_none()
{
return Err(BannerApiError::InvalidSession(
"Search result path mode is registration and data is none".to_string(),
));
}
@@ -381,36 +360,3 @@ impl BannerApi {
Ok(details)
}
}
/// Attempt to parse JSON and, on failure, include a contextual snippet around the error location
fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
match serde_json::from_str::<T>(body) {
Ok(value) => Ok(value),
Err(err) => {
let (line, column) = (err.line(), err.column());
let snippet = build_error_snippet(body, line, column, 120);
Err(anyhow::anyhow!(
"{err} at line {line}, column {column}\nSnippet:\n{snippet}",
))
}
}
}
fn build_error_snippet(body: &str, line: usize, column: usize, max_len: usize) -> String {
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
if target_line.is_empty() {
return String::new();
}
let start = column.saturating_sub(max_len.min(column));
let end = (column + max_len).min(target_line.len());
let slice = &target_line[start..end];
let mut indicator = String::new();
if column > start {
indicator.push_str(&" ".repeat(column - start - 1));
indicator.push('^');
}
format!("{slice}\n{indicator}")
}

11
src/banner/errors.rs Normal file
View File

@@ -0,0 +1,11 @@
//! Error types for the Banner API client.
use thiserror::Error;
#[derive(Debug, thiserror::Error)]
pub enum BannerApiError {
#[error("Banner session is invalid or expired: {0}")]
InvalidSession(String),
#[error(transparent)]
RequestFailed(#[from] anyhow::Error),
}

39
src/banner/json.rs Normal file
View File

@@ -0,0 +1,39 @@
//! JSON parsing utilities for the Banner API client.
use anyhow::Result;
/// Attempt to parse JSON and, on failure, include a contextual snippet of the
/// line where the error occurred. This prevents dumping huge JSON bodies to logs.
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
match serde_json::from_str::<T>(body) {
Ok(value) => Ok(value),
Err(err) => {
let (line, column) = (err.line(), err.column());
let snippet = build_error_snippet(body, line, column, 80);
Err(anyhow::anyhow!(
"{err} at line {line}, column {column}\nSnippet:\n{snippet}",
))
}
}
}
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
if target_line.is_empty() {
return "(empty line)".to_string();
}
// column is 1-based, convert to 0-based for slicing
let error_idx = column.saturating_sub(1);
let half_len = context_len / 2;
let start = error_idx.saturating_sub(half_len);
let end = (error_idx + half_len).min(target_line.len());
let slice = &target_line[start..end];
let indicator_pos = error_idx - start;
let indicator = " ".repeat(indicator_pos) + "^";
format!("...{slice}...\n {indicator}")
}

49
src/banner/middleware.rs Normal file
View File

@@ -0,0 +1,49 @@
//! HTTP middleware for the Banner API client.
use http::Extensions;
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next};
use tracing::{trace, warn};
pub struct TransparentMiddleware;
#[async_trait::async_trait]
impl Middleware for TransparentMiddleware {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> std::result::Result<Response, reqwest_middleware::Error> {
trace!(
domain = req.url().domain(),
headers = ?req.headers(),
"{method} {path}",
method = req.method().to_string(),
path = req.url().path(),
);
let response_result = next.run(req, extensions).await;
match response_result {
Ok(response) => {
if response.status().is_success() {
trace!(
"{code} {reason} {path}",
code = response.status().as_u16(),
reason = response.status().canonical_reason().unwrap_or("??"),
path = response.url().path(),
);
Ok(response)
} else {
let e = response.error_for_status_ref().unwrap_err();
warn!(error = ?e, "Request failed (server)");
Ok(response)
}
}
Err(error) => {
warn!(?error, "Request failed (middleware)");
Err(error)
}
}
}
}

View File

@@ -9,12 +9,16 @@
//! - Generate ICS files and calendar links
pub mod api;
pub mod errors;
pub mod json;
pub mod middleware;
pub mod models;
pub mod query;
pub mod session;
pub mod util;
pub use api::*;
pub use errors::*;
pub use models::*;
pub use query::*;
pub use session::*;

View File

@@ -42,11 +42,11 @@ pub struct FacultyItem {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MeetingTime {
pub start_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub end_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub begin_time: String, // HHMM, e.g 1000
pub end_time: String, // HHMM, e.g 1100
pub category: String, // unknown meaning, e.g. 01, 02, etc
pub start_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub end_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub begin_time: Option<String>, // HHMM, e.g 1000
pub end_time: Option<String>, // HHMM, e.g 1100
pub category: String, // unknown meaning, e.g. 01, 02, etc
pub class: String, // internal class name, e.g. net.hedtech.banner.general.overallMeetingTimeDecorator
pub monday: bool, // true if the meeting time occurs on Monday
pub tuesday: bool, // true if the meeting time occurs on Tuesday
@@ -55,13 +55,13 @@ pub struct MeetingTime {
pub friday: bool, // true if the meeting time occurs on Friday
pub saturday: bool, // true if the meeting time occurs on Saturday
pub sunday: bool, // true if the meeting time occurs on Sunday
pub room: String, // e.g. 1238
pub room: Option<String>, // e.g. 1.238
#[serde(deserialize_with = "deserialize_string_to_term")]
pub term: Term, // e.g 202510
pub building: String, // e.g NPB
pub building_description: String, // e.g North Paseo Building
pub campus: String, // campus code, e.g 11
pub campus_description: String, // name of campus, e.g Main Campus
pub building: Option<String>, // e.g NPB
pub building_description: Option<String>, // e.g North Paseo Building
pub campus: Option<String>, // campus code, e.g 11
pub campus_description: Option<String>, // name of campus, e.g Main Campus
pub course_reference_number: String, // CRN, e.g 27294
pub credit_hour_session: f64, // e.g. 30
pub hours_week: f64, // e.g. 30
@@ -347,42 +347,58 @@ impl MeetingType {
/// Meeting location information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeetingLocation {
pub campus: String,
pub building: String,
pub building_description: String,
pub room: String,
pub is_online: bool,
pub enum MeetingLocation {
Online,
InPerson {
campus: String,
campus_description: String,
building: String,
building_description: String,
room: String,
},
}
impl MeetingLocation {
/// Create from raw MeetingTime data
pub fn from_meeting_time(meeting_time: &MeetingTime) -> Self {
let is_online = meeting_time.room.is_empty();
if meeting_time.campus.is_none()
|| meeting_time.building.is_none()
|| meeting_time.building_description.is_none()
|| meeting_time.room.is_none()
|| meeting_time.campus_description.is_none()
|| meeting_time
.campus_description
.eq(&Some("Internet".to_string()))
{
return MeetingLocation::Online;
}
MeetingLocation {
campus: meeting_time.campus_description.clone(),
building: meeting_time.building.clone(),
building_description: meeting_time.building_description.clone(),
room: meeting_time.room.clone(),
is_online,
MeetingLocation::InPerson {
campus: meeting_time.campus.as_ref().unwrap().clone(),
campus_description: meeting_time.campus_description.as_ref().unwrap().clone(),
building: meeting_time.building.as_ref().unwrap().clone(),
building_description: meeting_time.building_description.as_ref().unwrap().clone(),
room: meeting_time.room.as_ref().unwrap().clone(),
}
}
}
impl Display for MeetingLocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_online {
write!(f, "Online")
} else {
write!(
match self {
MeetingLocation::Online => write!(f, "Online"),
MeetingLocation::InPerson {
campus,
building,
building_description,
room,
..
} => write!(
f,
"{campus} | {building_name} | {building_code} {room}",
campus = self.campus,
building_name = self.building_description,
building_code = self.building,
room = self.room
)
building_name = building_description,
building_code = building,
),
}
}
}
@@ -402,7 +418,11 @@ impl MeetingScheduleInfo {
/// Create from raw MeetingTime data
pub fn from_meeting_time(meeting_time: &MeetingTime) -> Self {
let days = MeetingDays::from_meeting_time(meeting_time);
let time_range = TimeRange::from_hhmm(&meeting_time.begin_time, &meeting_time.end_time);
let time_range = match (&meeting_time.begin_time, &meeting_time.end_time) {
(Some(begin), Some(end)) => TimeRange::from_hhmm(begin, end),
_ => None,
};
let date_range =
DateRange::from_mm_dd_yyyy(&meeting_time.start_date, &meeting_time.end_date)
.unwrap_or_else(|| {
@@ -470,16 +490,18 @@ impl MeetingScheduleInfo {
/// Returns a formatted string representing the location of the meeting
pub fn place_string(&self) -> String {
if self.location.room.is_empty() {
"Online".to_string()
} else {
format!(
match &self.location {
MeetingLocation::Online => "Online".to_string(),
MeetingLocation::InPerson {
campus,
building,
building_description,
room,
..
} => format!(
"{} | {} | {} {}",
self.location.campus,
self.location.building_description,
self.location.building,
self.location.room
)
campus, building_description, building, room
),
}
}

View File

@@ -10,8 +10,8 @@ pub struct SearchResult {
pub total_count: i32,
pub page_offset: i32,
pub page_max_size: i32,
pub path_mode: String,
pub search_results_config: Vec<SearchResultConfig>,
pub path_mode: Option<String>,
pub search_results_config: Option<Vec<SearchResultConfig>>,
pub data: Option<Vec<Course>>,
}

View File

@@ -13,7 +13,7 @@ const CURRENT_YEAR: u32 = compile_time::date!().year() as u32;
const VALID_YEARS: RangeInclusive<u32> = 2007..=(CURRENT_YEAR + 10);
/// Represents a term in the Banner system
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct Term {
pub year: u32, // 2024, 2025, etc
pub season: Season,
@@ -29,7 +29,7 @@ pub enum TermPoint {
}
/// Represents a season within a term
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum Season {
Fall,
Spring,
@@ -193,7 +193,7 @@ impl std::fmt::Display for Term {
impl Season {
/// Returns the season code as a string
fn to_str(&self) -> &'static str {
fn to_str(self) -> &'static str {
match self {
Season::Fall => "10",
Season::Spring => "20",

View File

@@ -1,133 +1,424 @@
//! Session management for Banner API.
use crate::banner::util::user_agent;
use anyhow::Result;
use rand::distributions::{Alphanumeric, DistString};
use reqwest::Client;
use std::sync::Mutex;
use crate::banner::BannerTerm;
use crate::banner::models::Term;
use anyhow::{Context, Result};
use cookie::Cookie;
use dashmap::DashMap;
use governor::state::InMemoryState;
use governor::{Quota, RateLimiter};
use once_cell::sync::Lazy;
use rand::distr::{Alphanumeric, SampleString};
use reqwest_middleware::ClientWithMiddleware;
use std::collections::{HashMap, VecDeque};
use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Notify};
use tracing::{debug, info};
use url::Url;
/// Session manager for Banner API interactions
#[derive(Debug)]
pub struct SessionManager {
current_session: Mutex<Option<SessionData>>,
base_url: String,
client: Client,
}
const SESSION_EXPIRY: Duration = Duration::from_secs(25 * 60); // 25 minutes
// A global rate limiter to ensure we only try to create one new session every 10 seconds,
// preventing us from overwhelming the server with session creation requests.
static SESSION_CREATION_RATE_LIMITER: Lazy<
RateLimiter<governor::state::direct::NotKeyed, InMemoryState, governor::clock::DefaultClock>,
> = Lazy::new(|| RateLimiter::direct(Quota::with_period(Duration::from_secs(10)).unwrap()));
/// Represents an active anonymous session within the Banner API.
/// Identified by multiple persistent cookies, as well as a client-generated "unique session ID".
#[derive(Debug, Clone)]
struct SessionData {
session_id: String,
pub struct BannerSession {
// Randomly generated
pub unique_session_id: String,
// Timestamp of creation
created_at: Instant,
// Timestamp of last activity
last_activity: Option<Instant>,
// Cookie values from initial registration page
jsessionid: String,
ssb_cookie: String,
}
impl SessionManager {
const SESSION_EXPIRY: Duration = Duration::from_secs(25 * 60); // 25 minutes
/// Generates a new session ID mimicking Banner's format
fn generate_session_id() -> String {
let random_part = Alphanumeric.sample_string(&mut rand::rng(), 5);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
format!("{}{}", random_part, timestamp)
}
/// Creates a new session manager
pub fn new(base_url: String, client: Client) -> Self {
/// Generates a timestamp-based nonce
pub fn nonce() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string()
}
impl BannerSession {
/// Creates a new session
pub async fn new(unique_session_id: &str, jsessionid: &str, ssb_cookie: &str) -> Result<Self> {
let now = Instant::now();
Ok(Self {
created_at: now,
last_activity: None,
unique_session_id: unique_session_id.to_string(),
jsessionid: jsessionid.to_string(),
ssb_cookie: ssb_cookie.to_string(),
})
}
/// Returns the unique session ID
pub fn id(&self) -> String {
self.unique_session_id.clone()
}
/// Updates the last activity timestamp
pub fn touch(&mut self) {
debug!(id = self.unique_session_id, "Session was used");
self.last_activity = Some(Instant::now());
}
/// Returns true if the session is expired
pub fn is_expired(&self) -> bool {
self.last_activity.unwrap_or(self.created_at).elapsed() > SESSION_EXPIRY
}
/// Returns a string used to for the "Cookie" header
pub fn cookie(&self) -> String {
format!(
"JSESSIONID={}; SSB_COOKIE={}",
self.jsessionid, self.ssb_cookie
)
}
pub fn been_used(&self) -> bool {
self.last_activity.is_some()
}
}
/// A smart pointer that returns a BannerSession to the pool when dropped.
pub struct PooledSession {
session: Option<BannerSession>,
// This Arc points directly to the term-specific pool.
pool: Arc<TermPool>,
}
impl PooledSession {
pub fn been_used(&self) -> bool {
self.session.as_ref().unwrap().been_used()
}
}
impl Deref for PooledSession {
type Target = BannerSession;
fn deref(&self) -> &Self::Target {
// The option is only ever None after drop is called, so this is safe.
self.session.as_ref().unwrap()
}
}
impl DerefMut for PooledSession {
fn deref_mut(&mut self) -> &mut Self::Target {
self.session.as_mut().unwrap()
}
}
/// The magic happens here: when the guard goes out of scope, this is called.
impl Drop for PooledSession {
fn drop(&mut self) {
if let Some(session) = self.session.take() {
let pool = self.pool.clone();
// Since drop() cannot be async, we spawn a task to return the session.
tokio::spawn(async move {
pool.release(session).await;
});
}
}
}
pub struct TermPool {
sessions: Mutex<VecDeque<BannerSession>>,
notifier: Notify,
is_creating: Mutex<bool>,
}
impl TermPool {
fn new() -> Self {
Self {
current_session: Mutex::new(None),
base_url,
client,
sessions: Mutex::new(VecDeque::new()),
notifier: Notify::new(),
is_creating: Mutex::new(false),
}
}
/// Ensures a valid session is available, creating one if necessary
pub fn ensure_session(&self) -> Result<String> {
let start_time = std::time::Instant::now();
let mut session_guard = self.current_session.lock().unwrap();
if let Some(ref session) = *session_guard
&& session.created_at.elapsed() < Self::SESSION_EXPIRY
{
let elapsed = start_time.elapsed();
debug!(
session_id = session.session_id,
elapsed = format!("{:.2?}", elapsed),
"reusing existing banner session"
);
return Ok(session.session_id.clone());
async fn release(&self, session: BannerSession) {
let id = session.unique_session_id.clone();
if session.is_expired() {
debug!(id = id, "Session is now expired, dropping.");
// Wake up a waiter, as it might need to create a new session
// if this was the last one.
self.notifier.notify_one();
return;
}
// Generate new session
let session_id = self.generate_session_id();
*session_guard = Some(SessionData {
session_id: session_id.clone(),
created_at: Instant::now(),
});
let mut queue = self.sessions.lock().await;
queue.push_back(session);
let queue_size = queue.len();
drop(queue); // Release lock before notifying
let elapsed = start_time.elapsed();
debug!(
session_id = session_id,
elapsed = format!("{:.2?}", elapsed),
"generated new banner session"
id = id,
"Session returned to pool. Queue size is now {queue_size}."
);
Ok(session_id)
self.notifier.notify_one();
}
}
pub struct SessionPool {
sessions: DashMap<Term, Arc<TermPool>>,
http: ClientWithMiddleware,
base_url: String,
}
impl SessionPool {
pub fn new(http: ClientWithMiddleware, base_url: String) -> Self {
Self {
sessions: DashMap::new(),
http,
base_url,
}
}
/// Generates a new session ID mimicking Banner's format
fn generate_session_id(&self) -> String {
let random_part = Alphanumeric.sample_string(&mut rand::thread_rng(), 5);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
format!("{}{}", random_part, timestamp)
/// Acquires a session from the pool.
/// If no sessions are available, a new one is created on demand,
/// respecting the global rate limit.
pub async fn acquire(&self, term: Term) -> Result<PooledSession> {
let term_pool = self
.sessions
.entry(term)
.or_insert_with(|| Arc::new(TermPool::new()))
.clone();
loop {
// Fast path: Try to get an existing, non-expired session.
{
let mut queue = term_pool.sessions.lock().await;
if let Some(session) = queue.pop_front() {
if !session.is_expired() {
debug!(id = session.unique_session_id, "Reusing session from pool");
return Ok(PooledSession {
session: Some(session),
pool: Arc::clone(&term_pool),
});
} else {
debug!(
id = session.unique_session_id,
"Popped an expired session, discarding."
);
}
}
} // MutexGuard is dropped, lock is released.
// Slow path: No sessions available. We must either wait or become the creator.
let mut is_creating_guard = term_pool.is_creating.lock().await;
if *is_creating_guard {
// Another task is already creating a session. Release the lock and wait.
drop(is_creating_guard);
debug!("Another task is creating a session, waiting for notification...");
term_pool.notifier.notified().await;
// Loop back to the top to try the fast path again.
continue;
}
// This task is now the designated creator.
*is_creating_guard = true;
drop(is_creating_guard);
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
debug!("Pool empty, racing notifier vs rate limiter...");
tokio::select! {
_ = term_pool.notifier.notified() => {
// A session was returned while we were waiting!
// We are no longer the creator. Reset the flag and loop to race for the new session.
debug!("Notified that a session was returned. Looping to retry.");
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
continue;
}
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
// The rate limit has elapsed. It's our job to create the session.
debug!("Rate limiter ready. Proceeding to create a new session.");
let new_session_result = self.create_session(&term).await;
// After creation, we are no longer the creator. Reset the flag
// and notify all other waiting tasks.
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
term_pool.notifier.notify_waiters();
match new_session_result {
Ok(new_session) => {
debug!(id = new_session.unique_session_id, "Successfully created new session");
return Ok(PooledSession {
session: Some(new_session),
pool: term_pool,
});
}
Err(e) => {
// Propagate the error if session creation failed.
return Err(e.context("Failed to create new session in pool"));
}
}
}
}
}
}
/// Sets up initial session cookies by making required Banner API requests
pub async fn setup(&self) -> Result<()> {
info!("setting up banner session...");
pub async fn create_session(&self, term: &Term) -> Result<BannerSession> {
info!("setting up banner session for term {term}");
let request_paths = ["/registration/registration", "/selfServiceMenu/data"];
// The 'register' or 'search' registration page
let initial_registration = self
.http
.get(format!("{}/registration", self.base_url))
.send()
.await?;
// TODO: Validate success
for path in &request_paths {
let url = format!("{}{}", self.base_url, path);
let response = self
.client
.get(&url)
.query(&[("_", Self::nonce())])
.header("User-Agent", user_agent())
.send()
.await?;
let cookies = initial_registration
.headers()
.get_all("Set-Cookie")
.iter()
.filter_map(|header_value| {
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) {
Some((cookie.name().to_string(), cookie.value().to_string()))
} else {
None
}
})
.collect::<HashMap<String, String>>();
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Failed to setup session, request to {} returned {}",
path,
response.status()
));
}
if !cookies.contains_key("JSESSIONID") || !cookies.contains_key("SSB_COOKIE") {
return Err(anyhow::anyhow!("Failed to get cookies"));
}
// Note: Cookie validation would require additional setup in a real implementation
debug!("session setup complete");
Ok(())
let jsessionid = cookies.get("JSESSIONID").unwrap();
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
debug!(
jsessionid = jsessionid,
ssb_cookie = ssb_cookie,
"New session cookies acquired"
);
self.http
.get(format!("{}/selfServiceMenu/data", self.base_url))
.header("Cookie", &cookie_header)
.send()
.await?
.error_for_status()
.context("Failed to get data page")?;
self.http
.get(format!("{}/term/termSelection", self.base_url))
.header("Cookie", &cookie_header)
.query(&[("mode", "search")])
.send()
.await?
.error_for_status()
.context("Failed to get term selection page")?;
// TOOD: Validate success
let terms = self.get_terms("", 1, 10).await?;
if !terms.iter().any(|t| t.code == term.to_string()) {
return Err(anyhow::anyhow!("Failed to get term search response"));
}
let specific_term_search_response = self.get_terms(&term.to_string(), 1, 10).await?;
if !specific_term_search_response
.iter()
.any(|t| t.code == term.to_string())
{
return Err(anyhow::anyhow!("Failed to get term search response"));
}
let unique_session_id = generate_session_id();
self.select_term(&term.to_string(), &unique_session_id, &cookie_header)
.await?;
BannerSession::new(&unique_session_id, jsessionid, ssb_cookie).await
}
/// Retrieves a list of terms from the Banner API.
pub async fn get_terms(
&self,
search: &str,
page: i32,
max_results: i32,
) -> Result<Vec<BannerTerm>> {
if page <= 0 {
return Err(anyhow::anyhow!("Page must be greater than 0"));
}
let url = format!("{}/classSearch/getTerms", self.base_url);
let params = [
("searchTerm", search),
("offset", &page.to_string()),
("max", &max_results.to_string()),
("_", &nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.with_context(|| "Failed to get terms".to_string())?;
let terms: Vec<BannerTerm> = response
.json()
.await
.context("Failed to parse terms response")?;
Ok(terms)
}
/// Selects a term for the current session
pub async fn select_term(&self, term: &str) -> Result<()> {
let session_id = self.ensure_session()?;
pub async fn select_term(
&self,
term: &str,
unique_session_id: &str,
cookie_header: &str,
) -> Result<()> {
let form_data = [
("term", term),
("studyPath", ""),
("studyPathText", ""),
("startDatepicker", ""),
("endDatepicker", ""),
("uniqueSessionId", &session_id),
("uniqueSessionId", unique_session_id),
];
let url = format!("{}/term/search", self.base_url);
let response = self
.client
.http
.post(&url)
.header("Cookie", cookie_header)
.query(&[("mode", "search")])
.form(&form_data)
.header("User-Agent", user_agent())
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await?;
@@ -141,18 +432,21 @@ impl SessionManager {
#[derive(serde::Deserialize)]
struct RedirectResponse {
#[serde(rename = "fwdUrl")]
#[serde(rename = "fwdURL")]
fwd_url: String,
}
let redirect: RedirectResponse = response.json().await?;
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string();
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap();
// Follow the redirect
let redirect_url = format!("{}{}", self.base_url, redirect.fwd_url);
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
let redirect_response = self
.client
.http
.get(&redirect_url)
.header("User-Agent", user_agent())
.header("Cookie", cookie_header)
.send()
.await?;
@@ -163,36 +457,7 @@ impl SessionManager {
));
}
debug!("successfully selected term: {}", term);
debug!(term = term, "successfully selected term");
Ok(())
}
/// Resets the data form (required before new searches)
pub async fn reset_data_form(&self) -> Result<()> {
let url = format!("{}/classSearch/resetDataForm", self.base_url);
let response = self
.client
.post(&url)
.header("User-Agent", user_agent())
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Failed to reset data form: {}",
response.status()
));
}
Ok(())
}
/// Generates a timestamp-based nonce
pub fn nonce() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string()
}
}

131
src/bin/search.rs Normal file
View File

@@ -0,0 +1,131 @@
use banner::banner::{BannerApi, SearchQuery, Term};
use banner::config::Config;
use banner::error::Result;
use figment::{Figment, providers::Env};
use futures::future;
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
#[tokio::main]
async fn main() -> Result<()> {
// Configure logging
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,banner=trace,reqwest=debug,hyper=info"));
let subscriber = FmtSubscriber::builder()
.with_env_filter(filter)
.with_target(true)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
info!("Starting Banner search test");
dotenvy::dotenv().ok();
// Load configuration
let config: Config = Figment::new()
.merge(Env::raw().only(&["DATABASE_URL"]))
.merge(Env::prefixed("APP_"))
.extract()
.expect("Failed to load config");
info!(
banner_base_url = config.banner_base_url,
"Configuration loaded"
);
// Create Banner API client
let banner_api = BannerApi::new(config.banner_base_url).expect("Failed to create BannerApi");
// Get current term
let term = Term::get_current().inner().to_string();
info!(term = term, "Using current term");
// Define multiple search queries
let queries = vec![
(
"CS Courses",
SearchQuery::new().subject("CS").max_results(10),
),
(
"Math Courses",
SearchQuery::new().subject("MAT").max_results(10),
),
(
"3000-level CS",
SearchQuery::new()
.subject("CS")
.course_numbers(3000, 3999)
.max_results(8),
),
(
"High Credit Courses",
SearchQuery::new().credits(4, 6).max_results(8),
),
(
"Programming Courses",
SearchQuery::new().keyword("programming").max_results(6),
),
];
info!("Executing {} concurrent searches", queries.len());
// Execute all searches concurrently
let search_futures = queries.into_iter().map(|(label, query)| {
info!("Starting search: {}", label);
let banner_api = &banner_api;
let term = &term;
async move {
let result = banner_api
.search(term, &query, "subjectDescription", false)
.await;
(label, result)
}
});
// Wait for all searches to complete
let search_results = future::join_all(search_futures)
.await
.into_iter()
.filter_map(|(label, result)| match result {
Ok(search_result) => {
info!(
label = label,
success = search_result.success,
total_count = search_result.total_count,
"Search completed successfully"
);
Some((label, search_result))
}
Err(e) => {
error!(label = label, error = ?e, "Search failed");
None
}
})
.collect::<Vec<_>>();
// Process and display results
for (label, search_result) in search_results {
println!("\n=== {} ===", label);
if let Some(courses) = &search_result.data {
if courses.is_empty() {
println!(" No courses found");
} else {
println!(" Found {} courses:", courses.len());
for course in courses {
println!(
" {} {} - {} (CRN: {})",
course.subject,
course.course_number,
course.course_title,
course.course_reference_number
);
}
}
} else {
println!(" No courses found");
}
}
info!("Search test completed");
Ok(())
}

View File

@@ -1,13 +1,13 @@
//! Bot commands module.
pub mod gcal;
pub mod ics;
pub mod search;
pub mod terms;
pub mod time;
pub mod ics;
pub mod gcal;
pub use gcal::gcal;
pub use ics::ics;
pub use search::search;
pub use terms::terms;
pub use time::time;
pub use ics::ics;
pub use gcal::gcal;

View File

@@ -92,9 +92,7 @@ fn parse_course_code(input: &str) -> Result<(i32, i32), Error> {
};
if low > high {
return Err(anyhow!(
"Invalid range: low value greater than high value"
));
return Err(anyhow!("Invalid range: low value greater than high value"));
}
if low < 1000 || high > 9999 {

View File

@@ -21,6 +21,7 @@ pub async fn terms(
.data()
.app_state
.banner_api
.sessions
.get_terms(&search_term, page_number, max_results)
.await?;
@@ -46,7 +47,11 @@ fn format_term(term: &BannerTerm, current_term_code: &str) -> String {
} else {
""
};
let is_archived = if term.is_archived() { " (archived)" } else { "" };
let is_archived = if term.is_archived() {
" (archived)"
} else {
""
};
format!(
"- `{}`: {}{}{}",
term.code, term.description, is_current, is_archived

View File

@@ -1,6 +1,6 @@
//! Time command implementation for course meeting times.
use crate::bot::{utils, Context, Error};
use crate::bot::{Context, Error, utils};
use tracing::info;
/// Get meeting times for a specific course

View File

@@ -1,10 +1,9 @@
use crate::app_state::AppState;
use crate::error::Error;
use crate::state::AppState;
pub mod commands;
pub mod utils;
#[derive(Debug)]
pub struct Data {
pub app_state: AppState,
} // User data, which is stored and accessible in all command invocations

View File

@@ -11,6 +11,15 @@ use std::time::Duration;
/// Application configuration loaded from environment variables
#[derive(Deserialize)]
pub struct Config {
/// Log level for the application
///
/// This value is used to set the log level for this application's target specifically.
/// e.g. "debug" would be similar to "warn,banner=debug,..."
///
/// Valid values are: "trace", "debug", "info", "warn", "error"
/// Defaults to "info" if not specified
#[serde(default = "default_log_level")]
pub log_level: String,
/// Discord bot token for authentication
pub bot_token: String,
/// Port for the web server
@@ -24,8 +33,6 @@ pub struct Config {
pub banner_base_url: String,
/// Target Discord guild ID where the bot operates
pub bot_target_guild: u64,
/// Discord application ID
pub bot_app_id: u64,
/// Graceful shutdown timeout duration
///
/// Accepts both numeric values (seconds) and duration strings
@@ -37,6 +44,11 @@ pub struct Config {
pub shutdown_timeout: Duration,
}
/// Default log level of "info"
fn default_log_level() -> String {
"info".to_string()
}
/// Default port of 3000
fn default_port() -> u16 {
3000

View File

@@ -1,4 +1,3 @@
//! Database models and schema.
pub mod models;
pub mod schema;

View File

@@ -1,12 +1,9 @@
//! Diesel models for the database schema.
//! `sqlx` models for the database schema.
use crate::data::schema::{course_audits, course_metrics, courses, scrape_jobs};
use chrono::{DateTime, Utc};
use diesel::{Insertable, Queryable, QueryableByName, Selectable};
use serde_json::Value;
#[derive(Queryable, Selectable)]
#[diesel(table_name = courses)]
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct Course {
pub id: i32,
pub crn: String,
@@ -21,24 +18,7 @@ pub struct Course {
pub last_scraped_at: DateTime<Utc>,
}
#[derive(Insertable)]
#[diesel(table_name = courses)]
pub struct NewCourse<'a> {
pub crn: &'a str,
pub subject: &'a str,
pub course_number: &'a str,
pub title: &'a str,
pub term_code: &'a str,
pub enrollment: i32,
pub max_enrollment: i32,
pub wait_count: i32,
pub wait_capacity: i32,
pub last_scraped_at: DateTime<Utc>,
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = course_metrics)]
#[diesel(belongs_to(Course))]
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct CourseMetric {
pub id: i32,
pub course_id: i32,
@@ -48,19 +28,7 @@ pub struct CourseMetric {
pub seats_available: i32,
}
#[derive(Insertable)]
#[diesel(table_name = course_metrics)]
pub struct NewCourseMetric {
pub course_id: i32,
pub timestamp: DateTime<Utc>,
pub enrollment: i32,
pub wait_count: i32,
pub seats_available: i32,
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = course_audits)]
#[diesel(belongs_to(Course))]
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct CourseAudit {
pub id: i32,
pub course_id: i32,
@@ -70,18 +38,9 @@ pub struct CourseAudit {
pub new_value: String,
}
#[derive(Insertable)]
#[diesel(table_name = course_audits)]
pub struct NewCourseAudit<'a> {
pub course_id: i32,
pub timestamp: DateTime<Utc>,
pub field_changed: &'a str,
pub old_value: &'a str,
pub new_value: &'a str,
}
/// The priority level of a scrape job.
#[derive(diesel_derive_enum::DbEnum, Copy, Debug, Clone)]
#[derive(sqlx::Type, Copy, Debug, Clone)]
#[sqlx(type_name = "scrape_priority", rename_all = "PascalCase")]
pub enum ScrapePriority {
Low,
Medium,
@@ -90,7 +49,8 @@ pub enum ScrapePriority {
}
/// The type of target for a scrape job, determining how the payload is interpreted.
#[derive(diesel_derive_enum::DbEnum, Copy, Debug, Clone)]
#[derive(sqlx::Type, Copy, Debug, Clone)]
#[sqlx(type_name = "target_type", rename_all = "PascalCase")]
pub enum TargetType {
Subject,
CourseRange,
@@ -99,8 +59,7 @@ pub enum TargetType {
}
/// Represents a queryable job from the database.
#[derive(Debug, Clone, Queryable, QueryableByName)]
#[diesel(table_name = scrape_jobs)]
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct ScrapeJob {
pub id: i32,
pub target_type: TargetType,
@@ -110,14 +69,3 @@ pub struct ScrapeJob {
pub created_at: DateTime<Utc>,
pub locked_at: Option<DateTime<Utc>>,
}
/// Represents a new job to be inserted into the database.
#[derive(Debug, Clone, Insertable)]
#[diesel(table_name = scrape_jobs)]
pub struct NewScrapeJob {
pub target_type: TargetType,
#[diesel(sql_type = diesel::sql_types::Jsonb)]
pub target_payload: Value,
pub priority: ScrapePriority,
pub execute_at: DateTime<Utc>,
}

View File

@@ -1,69 +0,0 @@
pub mod sql_types {
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "scrape_priority"))]
pub struct ScrapePriority;
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "target_type"))]
pub struct TargetType;
}
use super::models::{ScrapePriorityMapping, TargetTypeMapping};
diesel::table! {
use diesel::sql_types::*;
use super::{ScrapePriorityMapping, TargetTypeMapping};
scrape_jobs (id) {
id -> Int4,
target_type -> TargetTypeMapping,
target_payload -> Jsonb,
priority -> ScrapePriorityMapping,
execute_at -> Timestamptz,
created_at -> Timestamptz,
locked_at -> Nullable<Timestamptz>,
}
}
diesel::table! {
courses (id) {
id -> Int4,
crn -> Varchar,
subject -> Varchar,
course_number -> Varchar,
title -> Varchar,
term_code -> Varchar,
enrollment -> Int4,
max_enrollment -> Int4,
wait_count -> Int4,
wait_capacity -> Int4,
last_scraped_at -> Timestamptz,
}
}
diesel::table! {
course_metrics (id) {
id -> Int4,
course_id -> Int4,
timestamp -> Timestamptz,
enrollment -> Int4,
wait_count -> Int4,
seats_available -> Int4,
}
}
diesel::table! {
course_audits (id) {
id -> Int4,
course_id -> Int4,
timestamp -> Timestamptz,
field_changed -> Varchar,
old_value -> Text,
new_value -> Text,
}
}
diesel::joinable!(course_metrics -> courses (course_id));
diesel::joinable!(course_audits -> courses (course_id));
diesel::allow_tables_to_appear_in_same_query!(courses, course_metrics, course_audits, scrape_jobs,);

View File

@@ -1,8 +1,9 @@
pub mod app_state;
pub mod banner;
pub mod bot;
pub mod config;
pub mod data;
pub mod error;
pub mod scraper;
pub mod services;
pub mod state;
pub mod web;

View File

@@ -1,25 +1,28 @@
use serenity::all::{CacheHttp, ClientBuilder, GatewayIntents};
use serenity::all::{ClientBuilder, GatewayIntents};
use tokio::signal;
use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use crate::app_state::AppState;
use crate::banner::BannerApi;
use crate::bot::{Data, get_commands};
use crate::config::Config;
use crate::scraper::ScraperService;
use crate::services::manager::ServiceManager;
use crate::services::{ServiceResult, bot::BotService, web::WebService};
use crate::state::AppState;
use crate::web::routes::BannerState;
use figment::{Figment, providers::Env};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
mod app_state;
mod banner;
mod bot;
mod config;
mod data;
mod error;
mod scraper;
mod services;
mod state;
mod web;
#[tokio::main]
@@ -52,14 +55,22 @@ async fn main() {
} else {
"production"
},
"starting banner system"
"starting banner"
);
let config: Config = Figment::new()
.merge(Env::raw().only(&["DATABASE_URL"]))
.merge(Env::prefixed("APP_"))
.extract()
.expect("Failed to load config");
// Create database connection pool
let db_pool = PgPoolOptions::new()
.max_connections(10)
.connect(&config.database_url)
.await
.expect("Failed to create database pool");
info!(
port = config.port,
shutdown_timeout = format!("{:.2?}", config.shutdown_timeout),
@@ -70,10 +81,6 @@ async fn main() {
// Create BannerApi and AppState
let banner_api =
BannerApi::new(config.banner_base_url.clone()).expect("Failed to create BannerApi");
banner_api
.setup()
.await
.expect("Failed to set up BannerApi session");
let banner_api_arc = Arc::new(banner_api);
let app_state = AppState::new(banner_api_arc.clone(), &config.redis_url)
@@ -81,7 +88,7 @@ async fn main() {
// Create BannerState for web service
let banner_state = BannerState {
api: banner_api_arc,
api: banner_api_arc.clone(),
};
// Configure the client with your Discord bot token in the environment
@@ -168,9 +175,11 @@ async fn main() {
// Register services with the manager
let bot_service = Box::new(BotService::new(client));
let web_service = Box::new(WebService::new(port, banner_state));
let scraper_service = Box::new(ScraperService::new(db_pool.clone(), banner_api_arc.clone()));
service_manager.register_service("bot", bot_service);
service_manager.register_service("web", web_service);
service_manager.register_service("scraper", scraper_service);
// Spawn all registered services
service_manager.spawn_all();

87
src/scraper/mod.rs Normal file
View File

@@ -0,0 +1,87 @@
pub mod scheduler;
pub mod worker;
use crate::banner::BannerApi;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::info;
use self::scheduler::Scheduler;
use self::worker::Worker;
use crate::services::Service;
/// The main service that will be managed by the application's `ServiceManager`.
///
/// It holds the shared resources (database pool, API client) and manages the
/// lifecycle of the Scheduler and Worker tasks.
pub struct ScraperService {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
scheduler_handle: Option<JoinHandle<()>>,
worker_handles: Vec<JoinHandle<()>>,
}
impl ScraperService {
/// Creates a new `ScraperService`.
pub fn new(db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
Self {
db_pool,
banner_api,
scheduler_handle: None,
worker_handles: Vec::new(),
}
}
/// Starts the scheduler and a pool of workers.
pub fn start(&mut self) {
info!("ScraperService starting...");
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
let scheduler_handle = tokio::spawn(async move {
scheduler.run().await;
});
self.scheduler_handle = Some(scheduler_handle);
info!("Scheduler task spawned.");
let worker_count = 4; // This could be configurable
for i in 0..worker_count {
let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone());
let worker_handle = tokio::spawn(async move {
worker.run().await;
});
self.worker_handles.push(worker_handle);
}
info!("Spawned {} worker tasks.", self.worker_handles.len());
}
/// Signals all child tasks to gracefully shut down.
pub async fn shutdown(&mut self) {
info!("Shutting down scraper service...");
if let Some(handle) = self.scheduler_handle.take() {
handle.abort();
}
for handle in self.worker_handles.drain(..) {
handle.abort();
}
info!("Scraper service shutdown.");
}
}
#[async_trait::async_trait]
impl Service for ScraperService {
fn name(&self) -> &'static str {
"scraper"
}
async fn run(&mut self) -> Result<(), anyhow::Error> {
self.start();
std::future::pending::<()>().await;
Ok(())
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
self.shutdown().await;
Ok(())
}
}

85
src/scraper/scheduler.rs Normal file
View File

@@ -0,0 +1,85 @@
use crate::banner::{BannerApi, Term};
use crate::data::models::{ScrapePriority, TargetType};
use crate::error::Result;
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info};
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
}
impl Scheduler {
pub fn new(db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
Self {
db_pool,
banner_api,
}
}
/// Runs the scheduler's main loop.
pub async fn run(&self) {
info!("Scheduler service started.");
let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute
loop {
interval.tick().await;
info!("Scheduler waking up to analyze and schedule jobs...");
if let Err(e) = self.schedule_jobs().await {
error!(error = ?e, "Failed to schedule jobs");
}
}
}
/// The core logic for deciding what jobs to create.
async fn schedule_jobs(&self) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API.
// 2. For each subject, check if an active (not locked, not completed) job already exists.
// 3. If no job exists, create a new, low-priority job to be executed in the near future.
let term = Term::get_current().inner().to_string();
info!(
term = term,
"[Scheduler] Enqueuing baseline subject scrape jobs..."
);
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
for subject in subjects {
let payload = json!({ "subject": subject.code });
let existing_job: Option<(i32,)> = sqlx::query_as(
"SELECT id FROM scrape_jobs WHERE target_type = $1 AND target_payload = $2 AND locked_at IS NULL"
)
.bind(TargetType::Subject)
.bind(&payload)
.fetch_optional(&self.db_pool)
.await?;
if existing_job.is_some() {
continue;
}
sqlx::query(
"INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)"
)
.bind(TargetType::Subject)
.bind(&payload)
.bind(ScrapePriority::Low)
.bind(chrono::Utc::now())
.execute(&self.db_pool)
.await?;
info!(subject = subject.code, "[Scheduler] Enqueued new job");
}
info!("[Scheduler] Job scheduling complete.");
Ok(())
}
}

205
src/scraper/worker.rs Normal file
View File

@@ -0,0 +1,205 @@
use crate::banner::{BannerApi, BannerApiError, Course, SearchQuery, Term};
use crate::data::models::ScrapeJob;
use crate::error::Result;
use serde_json::Value;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info, warn};
/// A single worker instance.
///
/// Each worker runs in its own asynchronous task and continuously polls the
/// database for scrape jobs to execute.
pub struct Worker {
id: usize, // For logging purposes
db_pool: PgPool,
banner_api: Arc<BannerApi>,
}
impl Worker {
pub fn new(id: usize, db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
Self {
id,
db_pool,
banner_api,
}
}
/// Runs the worker's main loop.
pub async fn run(&self) {
info!(worker_id = self.id, "Worker started.");
loop {
match self.fetch_and_lock_job().await {
Ok(Some(job)) => {
let job_id = job.id;
info!(worker_id = self.id, job_id = job.id, "Processing job");
if let Err(e) = self.process_job(job).await {
// Check if the error is due to an invalid session
if let Some(BannerApiError::InvalidSession(_)) =
e.downcast_ref::<BannerApiError>()
{
warn!(
worker_id = self.id,
job_id, "Invalid session detected. Forcing session refresh."
);
} else {
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
}
// Unlock the job so it can be retried
if let Err(unlock_err) = self.unlock_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?unlock_err,
"Failed to unlock job"
);
}
} else {
info!(worker_id = self.id, job_id, "Job processed successfully");
// If successful, delete the job.
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete job"
);
}
}
}
Ok(None) => {
// No job found, wait for a bit before polling again.
time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
warn!(worker_id = self.id, error = ?e, "Failed to fetch job");
// Wait before retrying to avoid spamming errors.
time::sleep(Duration::from_secs(10)).await;
}
}
}
}
/// Atomically fetches a job from the queue, locking it for processing.
///
/// This uses a `FOR UPDATE SKIP LOCKED` query to ensure that multiple
/// workers can poll the queue concurrently without conflicts.
async fn fetch_and_lock_job(&self) -> Result<Option<ScrapeJob>> {
let mut tx = self.db_pool.begin().await?;
let job = sqlx::query_as::<_, ScrapeJob>(
"SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED"
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query("UPDATE scrape_jobs SET locked_at = NOW() WHERE id = $1")
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
async fn process_job(&self, job: ScrapeJob) -> Result<()> {
match job.target_type {
crate::data::models::TargetType::Subject => {
self.process_subject_job(&job.target_payload).await
}
_ => {
warn!(worker_id = self.id, job_id = job.id, "unhandled job type");
Ok(())
}
}
}
async fn process_subject_job(&self, payload: &Value) -> Result<()> {
let subject_code = payload["subject"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Invalid subject payload"))?;
info!(
worker_id = self.id,
subject = subject_code,
"Processing subject job"
);
let term = Term::get_current().inner().to_string();
let query = SearchQuery::new().subject(subject_code).max_results(500);
let search_result = self
.banner_api
.search(&term, &query, "subjectDescription", false)
.await?;
if let Some(courses_from_api) = search_result.data {
info!(
worker_id = self.id,
subject = subject_code,
count = courses_from_api.len(),
"Found courses to upsert"
);
for course in courses_from_api {
self.upsert_course(&course).await?;
}
}
Ok(())
}
async fn upsert_course(&self, course: &Course) -> Result<()> {
sqlx::query(
r#"
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (crn, term_code) DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&course.course_reference_number)
.bind(&course.subject)
.bind(&course.course_number)
.bind(&course.course_title)
.bind(&course.term)
.bind(course.enrollment)
.bind(course.maximum_enrollment)
.bind(course.wait_count)
.bind(course.wait_capacity)
.bind(chrono::Utc::now())
.execute(&self.db_pool)
.await?;
Ok(())
}
async fn delete_job(&self, job_id: i32) -> Result<()> {
sqlx::query("DELETE FROM scrape_jobs WHERE id = $1")
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job deleted");
Ok(())
}
async fn unlock_job(&self, job_id: i32) -> Result<()> {
sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE id = $1")
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job unlocked after failure");
Ok(())
}
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};
use crate::services::{Service, ServiceResult, run_service};
@@ -13,6 +13,12 @@ pub struct ServiceManager {
shutdown_tx: broadcast::Sender<()>,
}
impl Default for ServiceManager {
fn default() -> Self {
Self::new()
}
}
impl ServiceManager {
pub fn new() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
@@ -36,6 +42,7 @@ impl ServiceManager {
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
trace!(service = name, id = ?handle.id(), "service spawned",);
self.running_services.insert(name, handle);
}

View File

@@ -7,7 +7,7 @@ use redis::AsyncCommands;
use redis::Client;
use std::sync::Arc;
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct AppState {
pub banner_api: Arc<BannerApi>,
pub redis: Arc<Client>,