Compare commits

..

1 Commits

Author SHA1 Message Date
9da48b9985 feat: translate over to sqlx, remove diesel 2025-08-29 13:26:53 -05:00
29 changed files with 620 additions and 1487 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,3 @@
.env
/target
/go/
.cargo/config.toml
/go/

175
Cargo.lock generated
View File

@@ -32,6 +32,12 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -173,31 +179,25 @@ dependencies = [
"anyhow",
"async-trait",
"axum",
"bitflags 2.9.4",
"bitflags 2.9.3",
"chrono",
"chrono-tz",
"compile-time",
"cookie",
"dashmap 6.1.0",
"dotenvy",
"figment",
"fundu",
"futures",
"governor",
"http 1.3.1",
"once_cell",
"poise",
"rand 0.9.2",
"redis",
"regex",
"reqwest 0.12.23",
"reqwest-middleware",
"serde",
"serde_json",
"serenity",
"sqlx",
"thiserror 2.0.16",
"time",
"tl",
"tokio",
"tracing",
"tracing-subscriber",
@@ -230,9 +230,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.9.4"
version = "2.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d"
dependencies = [
"serde",
]
@@ -324,16 +324,27 @@ checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9"
[[package]]
name = "chrono"
version = "0.4.42"
version = "0.4.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link 0.2.0",
"windows-link",
]
[[package]]
name = "chrono-tz"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3"
dependencies = [
"chrono",
"phf",
]
[[package]]
@@ -806,7 +817,6 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@@ -1432,7 +1442,7 @@ version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"cfg-if",
"libc",
]
@@ -1502,7 +1512,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "391290121bad3d37fbddad76d8f5d1c1c314cfc646d143d7e07a3086ddff0ce3"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"libc",
"redox_syscall",
]
@@ -1553,11 +1563,11 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "matchers"
version = "0.2.0"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
"regex-automata 0.1.10",
]
[[package]]
@@ -1658,11 +1668,12 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nu-ansi-term"
version = "0.50.1"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"windows-sys 0.52.0",
"overload",
"winapi",
]
[[package]]
@@ -1749,7 +1760,7 @@ version = "0.10.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"cfg-if",
"foreign-types",
"libc",
@@ -1787,6 +1798,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.1"
@@ -1854,6 +1871,24 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "phf"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
@@ -1996,7 +2031,7 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"memchr",
"unicase",
]
@@ -2103,11 +2138,11 @@ dependencies = [
[[package]]
name = "raw-cpuid"
version = "11.6.0"
version = "11.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
]
[[package]]
@@ -2139,7 +2174,7 @@ version = "0.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
]
[[package]]
@@ -2150,8 +2185,17 @@ checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.10",
"regex-syntax 0.8.6",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
@@ -2162,9 +2206,15 @@ checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax 0.8.6",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.6"
@@ -2257,21 +2307,6 @@ dependencies = [
"web-sys",
]
[[package]]
name = "reqwest-middleware"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57f17d28a6e6acfe1733fe24bcd30774d13bffa4b8a22535b4c8c98423088d4e"
dependencies = [
"anyhow",
"async-trait",
"http 1.3.1",
"reqwest 0.12.23",
"serde",
"thiserror 1.0.69",
"tower-service",
]
[[package]]
name = "ring"
version = "0.17.14"
@@ -2327,7 +2362,7 @@ version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"errno",
"libc",
"linux-raw-sys",
@@ -2495,7 +2530,7 @@ version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"core-foundation",
"core-foundation-sys",
"libc",
@@ -2602,7 +2637,7 @@ dependencies = [
"arrayvec",
"async-trait",
"base64 0.22.1",
"bitflags 2.9.4",
"bitflags 2.9.3",
"bytes",
"chrono",
"command_attr",
@@ -2692,6 +2727,12 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "skeptic"
version = "0.13.7"
@@ -2866,7 +2907,7 @@ checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags 2.9.4",
"bitflags 2.9.3",
"byteorder",
"bytes",
"chrono",
@@ -2909,7 +2950,7 @@ checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags 2.9.4",
"bitflags 2.9.3",
"byteorder",
"chrono",
"crc",
@@ -3064,7 +3105,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"core-foundation",
"system-configuration-sys 0.6.0",
]
@@ -3213,12 +3254,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tl"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b130bd8a58c163224b44e217b4239ca7b927d82bf6cc2fea1fc561d15056e3f7"
[[package]]
name = "tokio"
version = "1.47.1"
@@ -3394,7 +3429,7 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [
"bitflags 2.9.4",
"bitflags 2.9.3",
"bytes",
"futures-util",
"http 1.3.1",
@@ -3474,14 +3509,14 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.20"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"regex",
"serde",
"serde_json",
"sharded-slab",
@@ -3880,7 +3915,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link 0.1.3",
"windows-link",
"windows-result",
"windows-strings",
]
@@ -3913,19 +3948,13 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
name = "windows-link"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65"
[[package]]
name = "windows-registry"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e"
dependencies = [
"windows-link 0.1.3",
"windows-link",
"windows-result",
"windows-strings",
]
@@ -3936,7 +3965,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link 0.1.3",
"windows-link",
]
[[package]]
@@ -3945,7 +3974,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link 0.1.3",
"windows-link",
]
[[package]]
@@ -4021,7 +4050,7 @@ version = "0.53.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91"
dependencies = [
"windows-link 0.1.3",
"windows-link",
"windows_aarch64_gnullvm 0.53.0",
"windows_aarch64_msvc 0.53.0",
"windows_i686_gnu 0.53.0",

View File

@@ -2,46 +2,33 @@
name = "banner"
version = "0.1.0"
edition = "2024"
default-run = "banner"
[dependencies]
anyhow = "1.0.99"
async-trait = "0.1"
axum = "0.8.4"
bitflags = { version = "2.9.4", features = ["serde"] }
chrono = { version = "0.4.42", features = ["serde"] }
bitflags = { version = "2.9.3", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.10.4"
compile-time = "0.2.0"
cookie = "0.18.1"
dashmap = "6.1.0"
dotenvy = "0.15.7"
figment = { version = "0.10.19", features = ["toml", "env"] }
fundu = "2.0.1"
futures = "0.3"
http = "1.3.1"
governor = "0.10.1"
poise = "0.6.1"
rand = "0.9.2"
redis = { version = "0.32.5", features = ["tokio-comp", "r2d2"] }
regex = "1.10"
reqwest = { version = "0.12.23", features = ["json", "cookies"] }
reqwest-middleware = { version = "0.4.2", features = ["json"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.143"
serenity = { version = "0.12.4", features = ["rustls_backend"] }
sqlx = { version = "0.8.6", features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
"json",
"macros",
] }
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "chrono", "json", "macros"] }
thiserror = "2.0.16"
time = "0.3.41"
tokio = { version = "1.47.1", features = ["full"] }
tl = "0.7.8"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
url = "2.5"
governor = "0.10.1"
once_cell = "1.21.3"
[dev-dependencies]

View File

@@ -1,77 +0,0 @@
# Build Stage
ARG RUST_VERSION=1.86.0
FROM rust:${RUST_VERSION}-bookworm AS builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /usr/src
RUN USER=root cargo new --bin banner
WORKDIR /usr/src/banner
# Copy dependency files for better layer caching
COPY ./Cargo.toml ./Cargo.lock* ./
# Build empty app with downloaded dependencies to produce a stable image layer for next build
RUN cargo build --release
# Build web app with own code
RUN rm src/*.rs
COPY ./src ./src
RUN rm ./target/release/deps/banner*
RUN cargo build --release
# Strip the binary to reduce size
RUN strip target/release/banner
# Runtime Stage - Debian slim for glibc compatibility
FROM debian:12-slim
ARG APP=/usr/src/app
ARG APP_USER=appuser
ARG UID=1000
ARG GID=1000
# Install runtime dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
tzdata \
wget \
&& rm -rf /var/lib/apt/lists/*
ARG TZ=Etc/UTC
ENV TZ=${TZ}
# Create user with specific UID/GID
RUN addgroup --gid $GID $APP_USER \
&& adduser --uid $UID --disabled-password --gecos "" --ingroup $APP_USER $APP_USER \
&& mkdir -p ${APP}
# Copy application files
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/target/release/banner ${APP}/banner
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/src/fonts ${APP}/fonts
# Set proper permissions
RUN chmod +x ${APP}/banner
USER $APP_USER
WORKDIR ${APP}
# Build-time arg for PORT, default to 8000
ARG PORT=8000
# Runtime environment var for PORT, default to build-time arg
ENV PORT=${PORT}
EXPOSE ${PORT}
# Add health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:${PORT}/health || exit 1
# Can be explicitly overriden with different hosts & ports
ENV HOSTS=0.0.0.0,[::]
# Implicitly uses PORT environment variable
CMD ["sh", "-c", "exec ./banner --server ${HOSTS}"]

View File

@@ -7,7 +7,7 @@ use redis::AsyncCommands;
use redis::Client;
use std::sync::Arc;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct AppState {
pub banner_api: Arc<BannerApi>,
pub redis: Arc<Client>,

View File

@@ -1,101 +1,72 @@
//! Main Banner API client implementation.
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Mutex},
time::Instant,
};
use crate::banner::{
BannerSession, SessionPool, errors::BannerApiError, json::parse_json_with_context,
middleware::TransparentMiddleware, models::*, nonce, query::SearchQuery, util::user_agent,
};
use anyhow::{Context, Result, anyhow};
use cookie::Cookie;
use dashmap::DashMap;
use http::HeaderValue;
use reqwest::{Client, Request, Response};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use crate::banner::{models::*, query::SearchQuery, session::SessionManager, util::user_agent};
use anyhow::{Context, Result};
use axum::http::HeaderValue;
use reqwest::Client;
use serde_json;
use tl;
use tracing::{Level, Metadata, Span, debug, error, field::ValueSet, info, span, trace, warn};
use tracing::{error, info};
/// Main Banner API client.
#[derive(Debug)]
pub struct BannerApi {
pub sessions: SessionPool,
http: ClientWithMiddleware,
sessions: SessionManager,
http: Client,
base_url: String,
}
impl BannerApi {
/// Creates a new Banner API client.
pub fn new(base_url: String) -> Result<Self> {
let http = ClientBuilder::new(
Client::builder()
.cookie_store(false)
.user_agent(user_agent())
.tcp_keepalive(Some(std::time::Duration::from_secs(60 * 5)))
.read_timeout(std::time::Duration::from_secs(10))
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to create HTTP client")?,
)
.with(TransparentMiddleware)
.build();
let http = Client::builder()
.cookie_store(true)
.user_agent(user_agent())
.tcp_keepalive(Some(std::time::Duration::from_secs(60 * 5)))
.read_timeout(std::time::Duration::from_secs(10))
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to create HTTP client")?;
let session_manager = SessionManager::new(base_url.clone(), http.clone());
Ok(Self {
sessions: SessionPool::new(http.clone(), base_url.clone()),
sessions: session_manager,
http,
base_url,
})
}
/// Validates offset parameter for search methods.
fn validate_offset(offset: i32) -> Result<()> {
if offset <= 0 {
Err(anyhow::anyhow!("Offset must be greater than 0"))
} else {
Ok(())
/// Sets up the API client by initializing session cookies.
pub async fn setup(&self) -> Result<()> {
info!(base_url = self.base_url, "setting up banner api client");
let result = self.sessions.setup().await;
match &result {
Ok(()) => info!("banner api client setup completed successfully"),
Err(e) => error!(error = ?e, "banner api client setup failed"),
}
result
}
/// Builds common search parameters for list endpoints.
fn build_list_params(
/// Retrieves a list of terms from the Banner API.
pub async fn get_terms(
&self,
search: &str,
term: &str,
offset: i32,
page: i32,
max_results: i32,
session_id: &str,
) -> Vec<(&str, String)> {
vec![
("searchTerm", search.to_string()),
("term", term.to_string()),
("offset", offset.to_string()),
("max", max_results.to_string()),
("uniqueSessionId", session_id.to_string()),
("_", nonce()),
]
}
) -> Result<Vec<BannerTerm>> {
if page <= 0 {
return Err(anyhow::anyhow!("Page must be greater than 0"));
}
/// Makes a GET request to a list endpoint and parses JSON response.
async fn get_list_endpoint<T>(
&self,
endpoint: &str,
search: &str,
term: &str,
offset: i32,
max_results: i32,
) -> Result<Vec<T>>
where
T: for<'de> serde::Deserialize<'de>,
{
Self::validate_offset(offset)?;
let session = self.sessions.acquire(term.parse()?).await?;
let url = format!("{}/classSearch/{}", self.base_url, endpoint);
let params = self.build_list_params(search, term, offset, max_results, &session.id());
let url = format!("{}/classSearch/getTerms", self.base_url);
let params = [
("searchTerm", search),
("offset", &page.to_string()),
("max", &max_results.to_string()),
("_", &SessionManager::nonce()),
];
let response = self
.http
@@ -103,109 +74,14 @@ impl BannerApi {
.query(&params)
.send()
.await
.with_context(|| format!("Failed to get {}", endpoint))?;
.context("Failed to get terms")?;
let data: Vec<T> = response
let terms: Vec<BannerTerm> = response
.json()
.await
.with_context(|| format!("Failed to parse {} response", endpoint))?;
.context("Failed to parse terms response")?;
Ok(data)
}
/// Builds search parameters for course search methods.
fn build_search_params(
&self,
query: &SearchQuery,
term: &str,
session_id: &str,
sort: &str,
sort_descending: bool,
) -> HashMap<String, String> {
let mut params = query.to_params();
params.insert("txt_term".to_string(), term.to_string());
params.insert("uniqueSessionId".to_string(), session_id.to_string());
params.insert("sortColumn".to_string(), sort.to_string());
params.insert(
"sortDirection".to_string(),
if sort_descending { "desc" } else { "asc" }.to_string(),
);
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
params
}
/// Performs a course search and handles common response processing.
async fn perform_search(
&self,
term: &str,
query: &SearchQuery,
sort: &str,
sort_descending: bool,
) -> Result<SearchResult, BannerApiError> {
let mut session = self.sessions.acquire(term.parse()?).await?;
if session.been_used() {
self.http
.post(format!("{}/classSearch/resetDataForm", self.base_url))
.header("Cookie", session.cookie())
.send()
.await
.map_err(|e| BannerApiError::RequestFailed(e.into()))?;
}
session.touch();
let params = self.build_search_params(query, term, &session.id(), sort, sort_descending);
debug!(
term = term,
query = ?query,
sort = sort,
sort_descending = sort_descending,
"Searching for courses with params: {:?}", params
);
let response = self
.http
.get(format!("{}/searchResults/searchResults", self.base_url))
.header("Cookie", session.cookie())
.query(&params)
.send()
.await
.context("Failed to search courses")?;
let status = response.status();
let url = response.url().clone();
let body = response
.text()
.await
.with_context(|| format!("Failed to read body (status={status})"))?;
let search_result: SearchResult = parse_json_with_context(&body).map_err(|e| {
BannerApiError::RequestFailed(anyhow!(
"Failed to parse search response (status={status}, url={url}): {e}\nBody: {body}"
))
})?;
// Check for signs of an invalid session
if search_result.path_mode.is_none() {
return Err(BannerApiError::InvalidSession(
"Search result path mode is none".to_string(),
));
} else if search_result.data.is_none() {
return Err(BannerApiError::InvalidSession(
"Search result data is none".to_string(),
));
}
if !search_result.success {
return Err(BannerApiError::RequestFailed(anyhow!(
"Search marked as unsuccessful by Banner API"
)));
}
Ok(search_result)
Ok(terms)
}
/// Retrieves a list of subjects from the Banner API.
@@ -216,8 +92,35 @@ impl BannerApi {
offset: i32,
max_results: i32,
) -> Result<Vec<Pair>> {
self.get_list_endpoint("get_subject", search, term, offset, max_results)
if offset <= 0 {
return Err(anyhow::anyhow!("Offset must be greater than 0"));
}
let session_id = self.sessions.ensure_session()?;
let url = format!("{}/classSearch/get_subject", self.base_url);
let params = [
("searchTerm", search),
("term", term),
("offset", &offset.to_string()),
("max", &max_results.to_string()),
("uniqueSessionId", &session_id),
("_", &SessionManager::nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.context("Failed to get subjects")?;
let subjects: Vec<Pair> = response
.json()
.await
.context("Failed to parse subjects response")?;
Ok(subjects)
}
/// Retrieves a list of instructors from the Banner API.
@@ -228,8 +131,35 @@ impl BannerApi {
offset: i32,
max_results: i32,
) -> Result<Vec<Instructor>> {
self.get_list_endpoint("get_instructor", search, term, offset, max_results)
if offset <= 0 {
return Err(anyhow::anyhow!("Offset must be greater than 0"));
}
let session_id = self.sessions.ensure_session()?;
let url = format!("{}/classSearch/get_instructor", self.base_url);
let params = [
("searchTerm", search),
("term", term),
("offset", &offset.to_string()),
("max", &max_results.to_string()),
("uniqueSessionId", &session_id),
("_", &SessionManager::nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.context("Failed to get instructors")?;
let instructors: Vec<Instructor> = response
.json()
.await
.context("Failed to parse instructors response")?;
Ok(instructors)
}
/// Retrieves a list of campuses from the Banner API.
@@ -240,8 +170,35 @@ impl BannerApi {
offset: i32,
max_results: i32,
) -> Result<Vec<Pair>> {
self.get_list_endpoint("get_campus", search, term, offset, max_results)
if offset <= 0 {
return Err(anyhow::anyhow!("Offset must be greater than 0"));
}
let session_id = self.sessions.ensure_session()?;
let url = format!("{}/classSearch/get_campus", self.base_url);
let params = [
("searchTerm", search),
("term", term),
("offset", &offset.to_string()),
("max", &max_results.to_string()),
("uniqueSessionId", &session_id),
("_", &SessionManager::nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.context("Failed to get campuses")?;
let campuses: Vec<Pair> = response
.json()
.await
.context("Failed to parse campuses response")?;
Ok(campuses)
}
/// Retrieves meeting time information for a course.
@@ -302,31 +259,95 @@ impl BannerApi {
query: &SearchQuery,
sort: &str,
sort_descending: bool,
) -> Result<SearchResult, BannerApiError> {
self.perform_search(term, query, sort, sort_descending)
) -> Result<SearchResult> {
self.sessions.reset_data_form().await?;
let session_id = self.sessions.ensure_session()?;
let mut params = query.to_params();
// Add additional parameters
params.insert("txt_term".to_string(), term.to_string());
params.insert("uniqueSessionId".to_string(), session_id);
params.insert("sortColumn".to_string(), sort.to_string());
params.insert(
"sortDirection".to_string(),
if sort_descending { "desc" } else { "asc" }.to_string(),
);
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
let url = format!("{}/searchResults/searchResults", self.base_url);
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.context("Failed to search courses")?;
let search_result: SearchResult = response
.json()
.await
.context("Failed to parse search response")?;
if !search_result.success {
return Err(anyhow::anyhow!(
"Search marked as unsuccessful by Banner API"
));
}
Ok(search_result)
}
/// Selects a term for the current session.
pub async fn select_term(&self, term: &str) -> Result<()> {
self.sessions.select_term(term).await
}
/// Retrieves a single course by CRN by issuing a minimal search
pub async fn get_course_by_crn(
&self,
term: &str,
crn: &str,
) -> Result<Option<Course>, BannerApiError> {
pub async fn get_course_by_crn(&self, term: &str, crn: &str) -> Result<Option<Course>> {
self.sessions.reset_data_form().await?;
// Ensure session is configured for this term
self.select_term(term).await?;
let session_id = self.sessions.ensure_session()?;
let query = SearchQuery::new()
.course_reference_number(crn)
.max_results(1);
let search_result = self
.perform_search(term, &query, "subjectDescription", false)
.await?;
let mut params = query.to_params();
params.insert("txt_term".to_string(), term.to_string());
params.insert("uniqueSessionId".to_string(), session_id);
params.insert("sortColumn".to_string(), "subjectDescription".to_string());
params.insert("sortDirection".to_string(), "asc".to_string());
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
// Additional validation for CRN search
if search_result.path_mode == Some("registration".to_string())
&& search_result.data.is_none()
{
return Err(BannerApiError::InvalidSession(
"Search result path mode is registration and data is none".to_string(),
let url = format!("{}/searchResults/searchResults", self.base_url);
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.context("Failed to search course by CRN")?;
let status = response.status();
let body = response
.text()
.await
.with_context(|| format!("Failed to read body (status={status})"))?;
let search_result: SearchResult = parse_json_with_context(&body).map_err(|e| {
anyhow::anyhow!(
"Failed to parse search response for CRN (status={status}, url={url}): {e}",
)
})?;
if !search_result.success {
return Err(anyhow::anyhow!(
"Search marked as unsuccessful by Banner API"
));
}
@@ -360,3 +381,36 @@ impl BannerApi {
Ok(details)
}
}
/// Attempt to parse JSON and, on failure, include a contextual snippet around the error location
fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
match serde_json::from_str::<T>(body) {
Ok(value) => Ok(value),
Err(err) => {
let (line, column) = (err.line(), err.column());
let snippet = build_error_snippet(body, line, column, 120);
Err(anyhow::anyhow!(
"{err} at line {line}, column {column}\nSnippet:\n{snippet}",
))
}
}
}
fn build_error_snippet(body: &str, line: usize, column: usize, max_len: usize) -> String {
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
if target_line.is_empty() {
return String::new();
}
let start = column.saturating_sub(max_len.min(column));
let end = (column + max_len).min(target_line.len());
let slice = &target_line[start..end];
let mut indicator = String::new();
if column > start {
indicator.push_str(&" ".repeat(column - start - 1));
indicator.push('^');
}
format!("{slice}\n{indicator}")
}

View File

@@ -1,11 +0,0 @@
//! Error types for the Banner API client.
use thiserror::Error;
#[derive(Debug, thiserror::Error)]
pub enum BannerApiError {
#[error("Banner session is invalid or expired: {0}")]
InvalidSession(String),
#[error(transparent)]
RequestFailed(#[from] anyhow::Error),
}

View File

@@ -1,39 +0,0 @@
//! JSON parsing utilities for the Banner API client.
use anyhow::Result;
/// Attempt to parse JSON and, on failure, include a contextual snippet of the
/// line where the error occurred. This prevents dumping huge JSON bodies to logs.
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
match serde_json::from_str::<T>(body) {
Ok(value) => Ok(value),
Err(err) => {
let (line, column) = (err.line(), err.column());
let snippet = build_error_snippet(body, line, column, 80);
Err(anyhow::anyhow!(
"{err} at line {line}, column {column}\nSnippet:\n{snippet}",
))
}
}
}
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
if target_line.is_empty() {
return "(empty line)".to_string();
}
// column is 1-based, convert to 0-based for slicing
let error_idx = column.saturating_sub(1);
let half_len = context_len / 2;
let start = error_idx.saturating_sub(half_len);
let end = (error_idx + half_len).min(target_line.len());
let slice = &target_line[start..end];
let indicator_pos = error_idx - start;
let indicator = " ".repeat(indicator_pos) + "^";
format!("...{slice}...\n {indicator}")
}

View File

@@ -1,49 +0,0 @@
//! HTTP middleware for the Banner API client.
use http::Extensions;
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next};
use tracing::{trace, warn};
pub struct TransparentMiddleware;
#[async_trait::async_trait]
impl Middleware for TransparentMiddleware {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> std::result::Result<Response, reqwest_middleware::Error> {
trace!(
domain = req.url().domain(),
headers = ?req.headers(),
"{method} {path}",
method = req.method().to_string(),
path = req.url().path(),
);
let response_result = next.run(req, extensions).await;
match response_result {
Ok(response) => {
if response.status().is_success() {
trace!(
"{code} {reason} {path}",
code = response.status().as_u16(),
reason = response.status().canonical_reason().unwrap_or("??"),
path = response.url().path(),
);
Ok(response)
} else {
let e = response.error_for_status_ref().unwrap_err();
warn!(error = ?e, "Request failed (server)");
Ok(response)
}
}
Err(error) => {
warn!(?error, "Request failed (middleware)");
Err(error)
}
}
}
}

View File

@@ -9,16 +9,12 @@
//! - Generate ICS files and calendar links
pub mod api;
pub mod errors;
pub mod json;
pub mod middleware;
pub mod models;
pub mod query;
pub mod session;
pub mod util;
pub use api::*;
pub use errors::*;
pub use models::*;
pub use query::*;
pub use session::*;

View File

@@ -42,11 +42,11 @@ pub struct FacultyItem {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MeetingTime {
pub start_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub end_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub begin_time: Option<String>, // HHMM, e.g 1000
pub end_time: Option<String>, // HHMM, e.g 1100
pub category: String, // unknown meaning, e.g. 01, 02, etc
pub start_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub end_date: String, // MM/DD/YYYY, e.g 08/26/2025
pub begin_time: String, // HHMM, e.g 1000
pub end_time: String, // HHMM, e.g 1100
pub category: String, // unknown meaning, e.g. 01, 02, etc
pub class: String, // internal class name, e.g. net.hedtech.banner.general.overallMeetingTimeDecorator
pub monday: bool, // true if the meeting time occurs on Monday
pub tuesday: bool, // true if the meeting time occurs on Tuesday
@@ -55,13 +55,13 @@ pub struct MeetingTime {
pub friday: bool, // true if the meeting time occurs on Friday
pub saturday: bool, // true if the meeting time occurs on Saturday
pub sunday: bool, // true if the meeting time occurs on Sunday
pub room: Option<String>, // e.g. 1.238
pub room: String, // e.g. 1238
#[serde(deserialize_with = "deserialize_string_to_term")]
pub term: Term, // e.g 202510
pub building: Option<String>, // e.g NPB
pub building_description: Option<String>, // e.g North Paseo Building
pub campus: Option<String>, // campus code, e.g 11
pub campus_description: Option<String>, // name of campus, e.g Main Campus
pub building: String, // e.g NPB
pub building_description: String, // e.g North Paseo Building
pub campus: String, // campus code, e.g 11
pub campus_description: String, // name of campus, e.g Main Campus
pub course_reference_number: String, // CRN, e.g 27294
pub credit_hour_session: f64, // e.g. 30
pub hours_week: f64, // e.g. 30
@@ -347,58 +347,42 @@ impl MeetingType {
/// Meeting location information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MeetingLocation {
Online,
InPerson {
campus: String,
campus_description: String,
building: String,
building_description: String,
room: String,
},
pub struct MeetingLocation {
pub campus: String,
pub building: String,
pub building_description: String,
pub room: String,
pub is_online: bool,
}
impl MeetingLocation {
/// Create from raw MeetingTime data
pub fn from_meeting_time(meeting_time: &MeetingTime) -> Self {
if meeting_time.campus.is_none()
|| meeting_time.building.is_none()
|| meeting_time.building_description.is_none()
|| meeting_time.room.is_none()
|| meeting_time.campus_description.is_none()
|| meeting_time
.campus_description
.eq(&Some("Internet".to_string()))
{
return MeetingLocation::Online;
}
let is_online = meeting_time.room.is_empty();
MeetingLocation::InPerson {
campus: meeting_time.campus.as_ref().unwrap().clone(),
campus_description: meeting_time.campus_description.as_ref().unwrap().clone(),
building: meeting_time.building.as_ref().unwrap().clone(),
building_description: meeting_time.building_description.as_ref().unwrap().clone(),
room: meeting_time.room.as_ref().unwrap().clone(),
MeetingLocation {
campus: meeting_time.campus_description.clone(),
building: meeting_time.building.clone(),
building_description: meeting_time.building_description.clone(),
room: meeting_time.room.clone(),
is_online,
}
}
}
impl Display for MeetingLocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MeetingLocation::Online => write!(f, "Online"),
MeetingLocation::InPerson {
campus,
building,
building_description,
room,
..
} => write!(
if self.is_online {
write!(f, "Online")
} else {
write!(
f,
"{campus} | {building_name} | {building_code} {room}",
building_name = building_description,
building_code = building,
),
campus = self.campus,
building_name = self.building_description,
building_code = self.building,
room = self.room
)
}
}
}
@@ -418,11 +402,7 @@ impl MeetingScheduleInfo {
/// Create from raw MeetingTime data
pub fn from_meeting_time(meeting_time: &MeetingTime) -> Self {
let days = MeetingDays::from_meeting_time(meeting_time);
let time_range = match (&meeting_time.begin_time, &meeting_time.end_time) {
(Some(begin), Some(end)) => TimeRange::from_hhmm(begin, end),
_ => None,
};
let time_range = TimeRange::from_hhmm(&meeting_time.begin_time, &meeting_time.end_time);
let date_range =
DateRange::from_mm_dd_yyyy(&meeting_time.start_date, &meeting_time.end_date)
.unwrap_or_else(|| {
@@ -490,18 +470,16 @@ impl MeetingScheduleInfo {
/// Returns a formatted string representing the location of the meeting
pub fn place_string(&self) -> String {
match &self.location {
MeetingLocation::Online => "Online".to_string(),
MeetingLocation::InPerson {
campus,
building,
building_description,
room,
..
} => format!(
if self.location.room.is_empty() {
"Online".to_string()
} else {
format!(
"{} | {} | {} {}",
campus, building_description, building, room
),
self.location.campus,
self.location.building_description,
self.location.building,
self.location.room
)
}
}

View File

@@ -10,8 +10,8 @@ pub struct SearchResult {
pub total_count: i32,
pub page_offset: i32,
pub page_max_size: i32,
pub path_mode: Option<String>,
pub search_results_config: Option<Vec<SearchResultConfig>>,
pub path_mode: String,
pub search_results_config: Vec<SearchResultConfig>,
pub data: Option<Vec<Course>>,
}

View File

@@ -13,7 +13,7 @@ const CURRENT_YEAR: u32 = compile_time::date!().year() as u32;
const VALID_YEARS: RangeInclusive<u32> = 2007..=(CURRENT_YEAR + 10);
/// Represents a term in the Banner system
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Term {
pub year: u32, // 2024, 2025, etc
pub season: Season,
@@ -29,7 +29,7 @@ pub enum TermPoint {
}
/// Represents a season within a term
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Season {
Fall,
Spring,
@@ -193,7 +193,7 @@ impl std::fmt::Display for Term {
impl Season {
/// Returns the season code as a string
fn to_str(self) -> &'static str {
fn to_str(&self) -> &'static str {
match self {
Season::Fall => "10",
Season::Spring => "20",

View File

@@ -1,424 +1,133 @@
//! Session management for Banner API.
use crate::banner::BannerTerm;
use crate::banner::models::Term;
use anyhow::{Context, Result};
use cookie::Cookie;
use dashmap::DashMap;
use governor::state::InMemoryState;
use governor::{Quota, RateLimiter};
use once_cell::sync::Lazy;
use rand::distr::{Alphanumeric, SampleString};
use reqwest_middleware::ClientWithMiddleware;
use std::collections::{HashMap, VecDeque};
use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use crate::banner::util::user_agent;
use anyhow::Result;
use rand::distributions::{Alphanumeric, DistString};
use reqwest::Client;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Notify};
use tracing::{debug, info};
use url::Url;
const SESSION_EXPIRY: Duration = Duration::from_secs(25 * 60); // 25 minutes
// A global rate limiter to ensure we only try to create one new session every 10 seconds,
// preventing us from overwhelming the server with session creation requests.
static SESSION_CREATION_RATE_LIMITER: Lazy<
RateLimiter<governor::state::direct::NotKeyed, InMemoryState, governor::clock::DefaultClock>,
> = Lazy::new(|| RateLimiter::direct(Quota::with_period(Duration::from_secs(10)).unwrap()));
/// Represents an active anonymous session within the Banner API.
/// Identified by multiple persistent cookies, as well as a client-generated "unique session ID".
#[derive(Debug, Clone)]
pub struct BannerSession {
// Randomly generated
pub unique_session_id: String,
// Timestamp of creation
created_at: Instant,
// Timestamp of last activity
last_activity: Option<Instant>,
// Cookie values from initial registration page
jsessionid: String,
ssb_cookie: String,
}
/// Generates a new session ID mimicking Banner's format
fn generate_session_id() -> String {
let random_part = Alphanumeric.sample_string(&mut rand::rng(), 5);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
format!("{}{}", random_part, timestamp)
}
/// Generates a timestamp-based nonce
pub fn nonce() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string()
}
impl BannerSession {
/// Creates a new session
pub async fn new(unique_session_id: &str, jsessionid: &str, ssb_cookie: &str) -> Result<Self> {
let now = Instant::now();
Ok(Self {
created_at: now,
last_activity: None,
unique_session_id: unique_session_id.to_string(),
jsessionid: jsessionid.to_string(),
ssb_cookie: ssb_cookie.to_string(),
})
}
/// Returns the unique session ID
pub fn id(&self) -> String {
self.unique_session_id.clone()
}
/// Updates the last activity timestamp
pub fn touch(&mut self) {
debug!(id = self.unique_session_id, "Session was used");
self.last_activity = Some(Instant::now());
}
/// Returns true if the session is expired
pub fn is_expired(&self) -> bool {
self.last_activity.unwrap_or(self.created_at).elapsed() > SESSION_EXPIRY
}
/// Returns a string used to for the "Cookie" header
pub fn cookie(&self) -> String {
format!(
"JSESSIONID={}; SSB_COOKIE={}",
self.jsessionid, self.ssb_cookie
)
}
pub fn been_used(&self) -> bool {
self.last_activity.is_some()
}
}
/// A smart pointer that returns a BannerSession to the pool when dropped.
pub struct PooledSession {
session: Option<BannerSession>,
// This Arc points directly to the term-specific pool.
pool: Arc<TermPool>,
}
impl PooledSession {
pub fn been_used(&self) -> bool {
self.session.as_ref().unwrap().been_used()
}
}
impl Deref for PooledSession {
type Target = BannerSession;
fn deref(&self) -> &Self::Target {
// The option is only ever None after drop is called, so this is safe.
self.session.as_ref().unwrap()
}
}
impl DerefMut for PooledSession {
fn deref_mut(&mut self) -> &mut Self::Target {
self.session.as_mut().unwrap()
}
}
/// The magic happens here: when the guard goes out of scope, this is called.
impl Drop for PooledSession {
fn drop(&mut self) {
if let Some(session) = self.session.take() {
let pool = self.pool.clone();
// Since drop() cannot be async, we spawn a task to return the session.
tokio::spawn(async move {
pool.release(session).await;
});
}
}
}
pub struct TermPool {
sessions: Mutex<VecDeque<BannerSession>>,
notifier: Notify,
is_creating: Mutex<bool>,
}
impl TermPool {
fn new() -> Self {
Self {
sessions: Mutex::new(VecDeque::new()),
notifier: Notify::new(),
is_creating: Mutex::new(false),
}
}
async fn release(&self, session: BannerSession) {
let id = session.unique_session_id.clone();
if session.is_expired() {
debug!(id = id, "Session is now expired, dropping.");
// Wake up a waiter, as it might need to create a new session
// if this was the last one.
self.notifier.notify_one();
return;
}
let mut queue = self.sessions.lock().await;
queue.push_back(session);
let queue_size = queue.len();
drop(queue); // Release lock before notifying
debug!(
id = id,
"Session returned to pool. Queue size is now {queue_size}."
);
self.notifier.notify_one();
}
}
pub struct SessionPool {
sessions: DashMap<Term, Arc<TermPool>>,
http: ClientWithMiddleware,
/// Session manager for Banner API interactions
#[derive(Debug)]
pub struct SessionManager {
current_session: Mutex<Option<SessionData>>,
base_url: String,
client: Client,
}
impl SessionPool {
pub fn new(http: ClientWithMiddleware, base_url: String) -> Self {
#[derive(Debug, Clone)]
struct SessionData {
session_id: String,
created_at: Instant,
}
impl SessionManager {
const SESSION_EXPIRY: Duration = Duration::from_secs(25 * 60); // 25 minutes
/// Creates a new session manager
pub fn new(base_url: String, client: Client) -> Self {
Self {
sessions: DashMap::new(),
http,
current_session: Mutex::new(None),
base_url,
client,
}
}
/// Acquires a session from the pool.
/// If no sessions are available, a new one is created on demand,
/// respecting the global rate limit.
pub async fn acquire(&self, term: Term) -> Result<PooledSession> {
let term_pool = self
.sessions
.entry(term)
.or_insert_with(|| Arc::new(TermPool::new()))
.clone();
/// Ensures a valid session is available, creating one if necessary
pub fn ensure_session(&self) -> Result<String> {
let start_time = std::time::Instant::now();
let mut session_guard = self.current_session.lock().unwrap();
loop {
// Fast path: Try to get an existing, non-expired session.
{
let mut queue = term_pool.sessions.lock().await;
if let Some(session) = queue.pop_front() {
if !session.is_expired() {
debug!(id = session.unique_session_id, "Reusing session from pool");
return Ok(PooledSession {
session: Some(session),
pool: Arc::clone(&term_pool),
});
} else {
debug!(
id = session.unique_session_id,
"Popped an expired session, discarding."
);
}
}
} // MutexGuard is dropped, lock is released.
// Slow path: No sessions available. We must either wait or become the creator.
let mut is_creating_guard = term_pool.is_creating.lock().await;
if *is_creating_guard {
// Another task is already creating a session. Release the lock and wait.
drop(is_creating_guard);
debug!("Another task is creating a session, waiting for notification...");
term_pool.notifier.notified().await;
// Loop back to the top to try the fast path again.
continue;
}
// This task is now the designated creator.
*is_creating_guard = true;
drop(is_creating_guard);
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
debug!("Pool empty, racing notifier vs rate limiter...");
tokio::select! {
_ = term_pool.notifier.notified() => {
// A session was returned while we were waiting!
// We are no longer the creator. Reset the flag and loop to race for the new session.
debug!("Notified that a session was returned. Looping to retry.");
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
continue;
}
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
// The rate limit has elapsed. It's our job to create the session.
debug!("Rate limiter ready. Proceeding to create a new session.");
let new_session_result = self.create_session(&term).await;
// After creation, we are no longer the creator. Reset the flag
// and notify all other waiting tasks.
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
term_pool.notifier.notify_waiters();
match new_session_result {
Ok(new_session) => {
debug!(id = new_session.unique_session_id, "Successfully created new session");
return Ok(PooledSession {
session: Some(new_session),
pool: term_pool,
});
}
Err(e) => {
// Propagate the error if session creation failed.
return Err(e.context("Failed to create new session in pool"));
}
}
}
}
if let Some(ref session) = *session_guard
&& session.created_at.elapsed() < Self::SESSION_EXPIRY
{
let elapsed = start_time.elapsed();
debug!(
session_id = session.session_id,
elapsed = format!("{:.2?}", elapsed),
"reusing existing banner session"
);
return Ok(session.session_id.clone());
}
// Generate new session
let session_id = self.generate_session_id();
*session_guard = Some(SessionData {
session_id: session_id.clone(),
created_at: Instant::now(),
});
let elapsed = start_time.elapsed();
debug!(
session_id = session_id,
elapsed = format!("{:.2?}", elapsed),
"generated new banner session"
);
Ok(session_id)
}
/// Generates a new session ID mimicking Banner's format
fn generate_session_id(&self) -> String {
let random_part = Alphanumeric.sample_string(&mut rand::thread_rng(), 5);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
format!("{}{}", random_part, timestamp)
}
/// Sets up initial session cookies by making required Banner API requests
pub async fn create_session(&self, term: &Term) -> Result<BannerSession> {
info!("setting up banner session for term {term}");
pub async fn setup(&self) -> Result<()> {
info!("setting up banner session...");
// The 'register' or 'search' registration page
let initial_registration = self
.http
.get(format!("{}/registration", self.base_url))
.send()
.await?;
// TODO: Validate success
let request_paths = ["/registration/registration", "/selfServiceMenu/data"];
let cookies = initial_registration
.headers()
.get_all("Set-Cookie")
.iter()
.filter_map(|header_value| {
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) {
Some((cookie.name().to_string(), cookie.value().to_string()))
} else {
None
}
})
.collect::<HashMap<String, String>>();
for path in &request_paths {
let url = format!("{}{}", self.base_url, path);
let response = self
.client
.get(&url)
.query(&[("_", Self::nonce())])
.header("User-Agent", user_agent())
.send()
.await?;
if !cookies.contains_key("JSESSIONID") || !cookies.contains_key("SSB_COOKIE") {
return Err(anyhow::anyhow!("Failed to get cookies"));
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Failed to setup session, request to {} returned {}",
path,
response.status()
));
}
}
let jsessionid = cookies.get("JSESSIONID").unwrap();
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
debug!(
jsessionid = jsessionid,
ssb_cookie = ssb_cookie,
"New session cookies acquired"
);
self.http
.get(format!("{}/selfServiceMenu/data", self.base_url))
.header("Cookie", &cookie_header)
.send()
.await?
.error_for_status()
.context("Failed to get data page")?;
self.http
.get(format!("{}/term/termSelection", self.base_url))
.header("Cookie", &cookie_header)
.query(&[("mode", "search")])
.send()
.await?
.error_for_status()
.context("Failed to get term selection page")?;
// TOOD: Validate success
let terms = self.get_terms("", 1, 10).await?;
if !terms.iter().any(|t| t.code == term.to_string()) {
return Err(anyhow::anyhow!("Failed to get term search response"));
}
let specific_term_search_response = self.get_terms(&term.to_string(), 1, 10).await?;
if !specific_term_search_response
.iter()
.any(|t| t.code == term.to_string())
{
return Err(anyhow::anyhow!("Failed to get term search response"));
}
let unique_session_id = generate_session_id();
self.select_term(&term.to_string(), &unique_session_id, &cookie_header)
.await?;
BannerSession::new(&unique_session_id, jsessionid, ssb_cookie).await
}
/// Retrieves a list of terms from the Banner API.
pub async fn get_terms(
&self,
search: &str,
page: i32,
max_results: i32,
) -> Result<Vec<BannerTerm>> {
if page <= 0 {
return Err(anyhow::anyhow!("Page must be greater than 0"));
}
let url = format!("{}/classSearch/getTerms", self.base_url);
let params = [
("searchTerm", search),
("offset", &page.to_string()),
("max", &max_results.to_string()),
("_", &nonce()),
];
let response = self
.http
.get(&url)
.query(&params)
.send()
.await
.with_context(|| "Failed to get terms".to_string())?;
let terms: Vec<BannerTerm> = response
.json()
.await
.context("Failed to parse terms response")?;
Ok(terms)
// Note: Cookie validation would require additional setup in a real implementation
debug!("session setup complete");
Ok(())
}
/// Selects a term for the current session
pub async fn select_term(
&self,
term: &str,
unique_session_id: &str,
cookie_header: &str,
) -> Result<()> {
pub async fn select_term(&self, term: &str) -> Result<()> {
let session_id = self.ensure_session()?;
let form_data = [
("term", term),
("studyPath", ""),
("studyPathText", ""),
("startDatepicker", ""),
("endDatepicker", ""),
("uniqueSessionId", unique_session_id),
("uniqueSessionId", &session_id),
];
let url = format!("{}/term/search", self.base_url);
let response = self
.http
.client
.post(&url)
.header("Cookie", cookie_header)
.query(&[("mode", "search")])
.form(&form_data)
.header("User-Agent", user_agent())
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await?;
@@ -432,21 +141,18 @@ impl SessionPool {
#[derive(serde::Deserialize)]
struct RedirectResponse {
#[serde(rename = "fwdURL")]
#[serde(rename = "fwdUrl")]
fwd_url: String,
}
let redirect: RedirectResponse = response.json().await?;
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string();
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap();
// Follow the redirect
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
let redirect_url = format!("{}{}", self.base_url, redirect.fwd_url);
let redirect_response = self
.http
.client
.get(&redirect_url)
.header("Cookie", cookie_header)
.header("User-Agent", user_agent())
.send()
.await?;
@@ -457,7 +163,36 @@ impl SessionPool {
));
}
debug!(term = term, "successfully selected term");
debug!("successfully selected term: {}", term);
Ok(())
}
/// Resets the data form (required before new searches)
pub async fn reset_data_form(&self) -> Result<()> {
let url = format!("{}/classSearch/resetDataForm", self.base_url);
let response = self
.client
.post(&url)
.header("User-Agent", user_agent())
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Failed to reset data form: {}",
response.status()
));
}
Ok(())
}
/// Generates a timestamp-based nonce
pub fn nonce() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string()
}
}

View File

@@ -1,131 +0,0 @@
use banner::banner::{BannerApi, SearchQuery, Term};
use banner::config::Config;
use banner::error::Result;
use figment::{Figment, providers::Env};
use futures::future;
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
#[tokio::main]
async fn main() -> Result<()> {
// Configure logging
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,banner=trace,reqwest=debug,hyper=info"));
let subscriber = FmtSubscriber::builder()
.with_env_filter(filter)
.with_target(true)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
info!("Starting Banner search test");
dotenvy::dotenv().ok();
// Load configuration
let config: Config = Figment::new()
.merge(Env::raw().only(&["DATABASE_URL"]))
.merge(Env::prefixed("APP_"))
.extract()
.expect("Failed to load config");
info!(
banner_base_url = config.banner_base_url,
"Configuration loaded"
);
// Create Banner API client
let banner_api = BannerApi::new(config.banner_base_url).expect("Failed to create BannerApi");
// Get current term
let term = Term::get_current().inner().to_string();
info!(term = term, "Using current term");
// Define multiple search queries
let queries = vec![
(
"CS Courses",
SearchQuery::new().subject("CS").max_results(10),
),
(
"Math Courses",
SearchQuery::new().subject("MAT").max_results(10),
),
(
"3000-level CS",
SearchQuery::new()
.subject("CS")
.course_numbers(3000, 3999)
.max_results(8),
),
(
"High Credit Courses",
SearchQuery::new().credits(4, 6).max_results(8),
),
(
"Programming Courses",
SearchQuery::new().keyword("programming").max_results(6),
),
];
info!("Executing {} concurrent searches", queries.len());
// Execute all searches concurrently
let search_futures = queries.into_iter().map(|(label, query)| {
info!("Starting search: {}", label);
let banner_api = &banner_api;
let term = &term;
async move {
let result = banner_api
.search(term, &query, "subjectDescription", false)
.await;
(label, result)
}
});
// Wait for all searches to complete
let search_results = future::join_all(search_futures)
.await
.into_iter()
.filter_map(|(label, result)| match result {
Ok(search_result) => {
info!(
label = label,
success = search_result.success,
total_count = search_result.total_count,
"Search completed successfully"
);
Some((label, search_result))
}
Err(e) => {
error!(label = label, error = ?e, "Search failed");
None
}
})
.collect::<Vec<_>>();
// Process and display results
for (label, search_result) in search_results {
println!("\n=== {} ===", label);
if let Some(courses) = &search_result.data {
if courses.is_empty() {
println!(" No courses found");
} else {
println!(" Found {} courses:", courses.len());
for course in courses {
println!(
" {} {} - {} (CRN: {})",
course.subject,
course.course_number,
course.course_title,
course.course_reference_number
);
}
}
} else {
println!(" No courses found");
}
}
info!("Search test completed");
Ok(())
}

View File

@@ -1,13 +1,13 @@
//! Bot commands module.
pub mod gcal;
pub mod ics;
pub mod search;
pub mod terms;
pub mod time;
pub mod ics;
pub mod gcal;
pub use gcal::gcal;
pub use ics::ics;
pub use search::search;
pub use terms::terms;
pub use time::time;
pub use ics::ics;
pub use gcal::gcal;

View File

@@ -92,7 +92,9 @@ fn parse_course_code(input: &str) -> Result<(i32, i32), Error> {
};
if low > high {
return Err(anyhow!("Invalid range: low value greater than high value"));
return Err(anyhow!(
"Invalid range: low value greater than high value"
));
}
if low < 1000 || high > 9999 {

View File

@@ -21,7 +21,6 @@ pub async fn terms(
.data()
.app_state
.banner_api
.sessions
.get_terms(&search_term, page_number, max_results)
.await?;
@@ -47,11 +46,7 @@ fn format_term(term: &BannerTerm, current_term_code: &str) -> String {
} else {
""
};
let is_archived = if term.is_archived() {
" (archived)"
} else {
""
};
let is_archived = if term.is_archived() { " (archived)" } else { "" };
format!(
"- `{}`: {}{}{}",
term.code, term.description, is_current, is_archived

View File

@@ -1,6 +1,6 @@
//! Time command implementation for course meeting times.
use crate::bot::{Context, Error, utils};
use crate::bot::{utils, Context, Error};
use tracing::info;
/// Get meeting times for a specific course

View File

@@ -1,9 +1,10 @@
use crate::app_state::AppState;
use crate::error::Error;
use crate::state::AppState;
pub mod commands;
pub mod utils;
#[derive(Debug)]
pub struct Data {
pub app_state: AppState,
} // User data, which is stored and accessible in all command invocations

View File

@@ -11,15 +11,6 @@ use std::time::Duration;
/// Application configuration loaded from environment variables
#[derive(Deserialize)]
pub struct Config {
/// Log level for the application
///
/// This value is used to set the log level for this application's target specifically.
/// e.g. "debug" would be similar to "warn,banner=debug,..."
///
/// Valid values are: "trace", "debug", "info", "warn", "error"
/// Defaults to "info" if not specified
#[serde(default = "default_log_level")]
pub log_level: String,
/// Discord bot token for authentication
pub bot_token: String,
/// Port for the web server
@@ -33,6 +24,8 @@ pub struct Config {
pub banner_base_url: String,
/// Target Discord guild ID where the bot operates
pub bot_target_guild: u64,
/// Discord application ID
pub bot_app_id: u64,
/// Graceful shutdown timeout duration
///
/// Accepts both numeric values (seconds) and duration strings
@@ -44,11 +37,6 @@ pub struct Config {
pub shutdown_timeout: Duration,
}
/// Default log level of "info"
fn default_log_level() -> String {
"info".to_string()
}
/// Default port of 3000
fn default_port() -> u16 {
3000

View File

@@ -1,3 +1,4 @@
//! Database models and schema.
pub mod models;
pub mod schema;

69
src/data/schema.rs Normal file
View File

@@ -0,0 +1,69 @@
// pub mod sql_types {
// #[derive(diesel::sql_types::SqlType)]
// #[diesel(postgres_type(name = "scrape_priority"))]
// pub struct ScrapePriority;
// #[derive(diesel::sql_types::SqlType)]
// #[diesel(postgres_type(name = "target_type"))]
// pub struct TargetType;
// }
// use super::models::{ScrapePriorityMapping, TargetTypeMapping};
// diesel::table! {
// use diesel::sql_types::*;
// use super::{ScrapePriorityMapping, TargetTypeMapping};
// scrape_jobs (id) {
// id -> Int4,
// target_type -> TargetTypeMapping,
// target_payload -> Jsonb,
// priority -> ScrapePriorityMapping,
// execute_at -> Timestamptz,
// created_at -> Timestamptz,
// locked_at -> Nullable<Timestamptz>,
// }
// }
// diesel::table! {
// courses (id) {
// id -> Int4,
// crn -> Varchar,
// subject -> Varchar,
// course_number -> Varchar,
// title -> Varchar,
// term_code -> Varchar,
// enrollment -> Int4,
// max_enrollment -> Int4,
// wait_count -> Int4,
// wait_capacity -> Int4,
// last_scraped_at -> Timestamptz,
// }
// }
// diesel::table! {
// course_metrics (id) {
// id -> Int4,
// course_id -> Int4,
// timestamp -> Timestamptz,
// enrollment -> Int4,
// wait_count -> Int4,
// seats_available -> Int4,
// }
// }
// diesel::table! {
// course_audits (id) {
// id -> Int4,
// course_id -> Int4,
// timestamp -> Timestamptz,
// field_changed -> Varchar,
// old_value -> Text,
// new_value -> Text,
// }
// }
// diesel::joinable!(course_metrics -> courses (course_id));
// diesel::joinable!(course_audits -> courses (course_id));
// diesel::allow_tables_to_appear_in_same_query!(courses, course_metrics, course_audits, scrape_jobs,);

View File

@@ -1,9 +1,8 @@
pub mod app_state;
pub mod banner;
pub mod bot;
pub mod config;
pub mod data;
pub mod error;
pub mod scraper;
pub mod services;
pub mod state;
pub mod web;

View File

@@ -1,28 +1,25 @@
use serenity::all::{ClientBuilder, GatewayIntents};
use serenity::all::{CacheHttp, ClientBuilder, GatewayIntents};
use tokio::signal;
use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use crate::app_state::AppState;
use crate::banner::BannerApi;
use crate::bot::{Data, get_commands};
use crate::config::Config;
use crate::scraper::ScraperService;
use crate::services::manager::ServiceManager;
use crate::services::{ServiceResult, bot::BotService, web::WebService};
use crate::state::AppState;
use crate::web::routes::BannerState;
use figment::{Figment, providers::Env};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
mod app_state;
mod banner;
mod bot;
mod config;
mod data;
mod error;
mod scraper;
mod services;
mod state;
mod web;
#[tokio::main]
@@ -55,22 +52,14 @@ async fn main() {
} else {
"production"
},
"starting banner"
"starting banner system"
);
let config: Config = Figment::new()
.merge(Env::raw().only(&["DATABASE_URL"]))
.merge(Env::prefixed("APP_"))
.extract()
.expect("Failed to load config");
// Create database connection pool
let db_pool = PgPoolOptions::new()
.max_connections(10)
.connect(&config.database_url)
.await
.expect("Failed to create database pool");
info!(
port = config.port,
shutdown_timeout = format!("{:.2?}", config.shutdown_timeout),
@@ -81,6 +70,10 @@ async fn main() {
// Create BannerApi and AppState
let banner_api =
BannerApi::new(config.banner_base_url.clone()).expect("Failed to create BannerApi");
banner_api
.setup()
.await
.expect("Failed to set up BannerApi session");
let banner_api_arc = Arc::new(banner_api);
let app_state = AppState::new(banner_api_arc.clone(), &config.redis_url)
@@ -88,7 +81,7 @@ async fn main() {
// Create BannerState for web service
let banner_state = BannerState {
api: banner_api_arc.clone(),
api: banner_api_arc,
};
// Configure the client with your Discord bot token in the environment
@@ -175,11 +168,9 @@ async fn main() {
// Register services with the manager
let bot_service = Box::new(BotService::new(client));
let web_service = Box::new(WebService::new(port, banner_state));
let scraper_service = Box::new(ScraperService::new(db_pool.clone(), banner_api_arc.clone()));
service_manager.register_service("bot", bot_service);
service_manager.register_service("web", web_service);
service_manager.register_service("scraper", scraper_service);
// Spawn all registered services
service_manager.spawn_all();

View File

@@ -1,87 +0,0 @@
pub mod scheduler;
pub mod worker;
use crate::banner::BannerApi;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::info;
use self::scheduler::Scheduler;
use self::worker::Worker;
use crate::services::Service;
/// The main service that will be managed by the application's `ServiceManager`.
///
/// It holds the shared resources (database pool, API client) and manages the
/// lifecycle of the Scheduler and Worker tasks.
pub struct ScraperService {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
scheduler_handle: Option<JoinHandle<()>>,
worker_handles: Vec<JoinHandle<()>>,
}
impl ScraperService {
/// Creates a new `ScraperService`.
pub fn new(db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
Self {
db_pool,
banner_api,
scheduler_handle: None,
worker_handles: Vec::new(),
}
}
/// Starts the scheduler and a pool of workers.
pub fn start(&mut self) {
info!("ScraperService starting...");
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
let scheduler_handle = tokio::spawn(async move {
scheduler.run().await;
});
self.scheduler_handle = Some(scheduler_handle);
info!("Scheduler task spawned.");
let worker_count = 4; // This could be configurable
for i in 0..worker_count {
let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone());
let worker_handle = tokio::spawn(async move {
worker.run().await;
});
self.worker_handles.push(worker_handle);
}
info!("Spawned {} worker tasks.", self.worker_handles.len());
}
/// Signals all child tasks to gracefully shut down.
pub async fn shutdown(&mut self) {
info!("Shutting down scraper service...");
if let Some(handle) = self.scheduler_handle.take() {
handle.abort();
}
for handle in self.worker_handles.drain(..) {
handle.abort();
}
info!("Scraper service shutdown.");
}
}
#[async_trait::async_trait]
impl Service for ScraperService {
fn name(&self) -> &'static str {
"scraper"
}
async fn run(&mut self) -> Result<(), anyhow::Error> {
self.start();
std::future::pending::<()>().await;
Ok(())
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
self.shutdown().await;
Ok(())
}
}

View File

@@ -1,85 +0,0 @@
use crate::banner::{BannerApi, Term};
use crate::data::models::{ScrapePriority, TargetType};
use crate::error::Result;
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info};
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
db_pool: PgPool,
banner_api: Arc<BannerApi>,
}
impl Scheduler {
pub fn new(db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
Self {
db_pool,
banner_api,
}
}
/// Runs the scheduler's main loop.
pub async fn run(&self) {
info!("Scheduler service started.");
let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute
loop {
interval.tick().await;
info!("Scheduler waking up to analyze and schedule jobs...");
if let Err(e) = self.schedule_jobs().await {
error!(error = ?e, "Failed to schedule jobs");
}
}
}
/// The core logic for deciding what jobs to create.
async fn schedule_jobs(&self) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API.
// 2. For each subject, check if an active (not locked, not completed) job already exists.
// 3. If no job exists, create a new, low-priority job to be executed in the near future.
let term = Term::get_current().inner().to_string();
info!(
term = term,
"[Scheduler] Enqueuing baseline subject scrape jobs..."
);
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
for subject in subjects {
let payload = json!({ "subject": subject.code });
let existing_job: Option<(i32,)> = sqlx::query_as(
"SELECT id FROM scrape_jobs WHERE target_type = $1 AND target_payload = $2 AND locked_at IS NULL"
)
.bind(TargetType::Subject)
.bind(&payload)
.fetch_optional(&self.db_pool)
.await?;
if existing_job.is_some() {
continue;
}
sqlx::query(
"INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)"
)
.bind(TargetType::Subject)
.bind(&payload)
.bind(ScrapePriority::Low)
.bind(chrono::Utc::now())
.execute(&self.db_pool)
.await?;
info!(subject = subject.code, "[Scheduler] Enqueued new job");
}
info!("[Scheduler] Job scheduling complete.");
Ok(())
}
}

View File

@@ -1,205 +0,0 @@
use crate::banner::{BannerApi, BannerApiError, Course, SearchQuery, Term};
use crate::data::models::ScrapeJob;
use crate::error::Result;
use serde_json::Value;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info, warn};
/// A single worker instance.
///
/// Each worker runs in its own asynchronous task and continuously polls the
/// database for scrape jobs to execute.
pub struct Worker {
id: usize, // For logging purposes
db_pool: PgPool,
banner_api: Arc<BannerApi>,
}
impl Worker {
pub fn new(id: usize, db_pool: PgPool, banner_api: Arc<BannerApi>) -> Self {
Self {
id,
db_pool,
banner_api,
}
}
/// Runs the worker's main loop.
pub async fn run(&self) {
info!(worker_id = self.id, "Worker started.");
loop {
match self.fetch_and_lock_job().await {
Ok(Some(job)) => {
let job_id = job.id;
info!(worker_id = self.id, job_id = job.id, "Processing job");
if let Err(e) = self.process_job(job).await {
// Check if the error is due to an invalid session
if let Some(BannerApiError::InvalidSession(_)) =
e.downcast_ref::<BannerApiError>()
{
warn!(
worker_id = self.id,
job_id, "Invalid session detected. Forcing session refresh."
);
} else {
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
}
// Unlock the job so it can be retried
if let Err(unlock_err) = self.unlock_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?unlock_err,
"Failed to unlock job"
);
}
} else {
info!(worker_id = self.id, job_id, "Job processed successfully");
// If successful, delete the job.
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete job"
);
}
}
}
Ok(None) => {
// No job found, wait for a bit before polling again.
time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
warn!(worker_id = self.id, error = ?e, "Failed to fetch job");
// Wait before retrying to avoid spamming errors.
time::sleep(Duration::from_secs(10)).await;
}
}
}
}
/// Atomically fetches a job from the queue, locking it for processing.
///
/// This uses a `FOR UPDATE SKIP LOCKED` query to ensure that multiple
/// workers can poll the queue concurrently without conflicts.
async fn fetch_and_lock_job(&self) -> Result<Option<ScrapeJob>> {
let mut tx = self.db_pool.begin().await?;
let job = sqlx::query_as::<_, ScrapeJob>(
"SELECT * FROM scrape_jobs WHERE locked_at IS NULL AND execute_at <= NOW() ORDER BY priority DESC, execute_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED"
)
.fetch_optional(&mut *tx)
.await?;
if let Some(ref job) = job {
sqlx::query("UPDATE scrape_jobs SET locked_at = NOW() WHERE id = $1")
.bind(job.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(job)
}
async fn process_job(&self, job: ScrapeJob) -> Result<()> {
match job.target_type {
crate::data::models::TargetType::Subject => {
self.process_subject_job(&job.target_payload).await
}
_ => {
warn!(worker_id = self.id, job_id = job.id, "unhandled job type");
Ok(())
}
}
}
async fn process_subject_job(&self, payload: &Value) -> Result<()> {
let subject_code = payload["subject"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Invalid subject payload"))?;
info!(
worker_id = self.id,
subject = subject_code,
"Processing subject job"
);
let term = Term::get_current().inner().to_string();
let query = SearchQuery::new().subject(subject_code).max_results(500);
let search_result = self
.banner_api
.search(&term, &query, "subjectDescription", false)
.await?;
if let Some(courses_from_api) = search_result.data {
info!(
worker_id = self.id,
subject = subject_code,
count = courses_from_api.len(),
"Found courses to upsert"
);
for course in courses_from_api {
self.upsert_course(&course).await?;
}
}
Ok(())
}
async fn upsert_course(&self, course: &Course) -> Result<()> {
sqlx::query(
r#"
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (crn, term_code) DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&course.course_reference_number)
.bind(&course.subject)
.bind(&course.course_number)
.bind(&course.course_title)
.bind(&course.term)
.bind(course.enrollment)
.bind(course.maximum_enrollment)
.bind(course.wait_count)
.bind(course.wait_capacity)
.bind(chrono::Utc::now())
.execute(&self.db_pool)
.await?;
Ok(())
}
async fn delete_job(&self, job_id: i32) -> Result<()> {
sqlx::query("DELETE FROM scrape_jobs WHERE id = $1")
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job deleted");
Ok(())
}
async fn unlock_job(&self, job_id: i32) -> Result<()> {
sqlx::query("UPDATE scrape_jobs SET locked_at = NULL WHERE id = $1")
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job unlocked after failure");
Ok(())
}
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, warn};
use crate::services::{Service, ServiceResult, run_service};
@@ -13,12 +13,6 @@ pub struct ServiceManager {
shutdown_tx: broadcast::Sender<()>,
}
impl Default for ServiceManager {
fn default() -> Self {
Self::new()
}
}
impl ServiceManager {
pub fn new() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
@@ -42,7 +36,6 @@ impl ServiceManager {
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
trace!(service = name, id = ?handle.id(), "service spawned",);
self.running_services.insert(name, handle);
}