mirror of
https://github.com/Xevion/banner.git
synced 2025-12-06 05:14:26 -06:00
Compare commits
8 Commits
020a00254f
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
966732a6d2 | ||
|
|
3292d35521 | ||
|
|
71ac0782d0 | ||
|
|
1c6d2d4b6e | ||
|
|
51f8256e61 | ||
|
|
b1ed2434f8 | ||
|
|
47c23459f1 | ||
|
|
8af9b0a1a2 |
@@ -13,6 +13,16 @@ go/
|
||||
# Development configuration
|
||||
bacon.toml
|
||||
.env
|
||||
.env.*
|
||||
!.env.example
|
||||
|
||||
# CI/CD
|
||||
.github/
|
||||
.git/
|
||||
|
||||
# Development tools
|
||||
Justfile
|
||||
rust-toolchain.toml
|
||||
|
||||
# Frontend build artifacts and cache
|
||||
web/node_modules/
|
||||
@@ -20,4 +30,22 @@ web/dist/
|
||||
web/.vite/
|
||||
web/.tanstack/
|
||||
web/.vscode/
|
||||
|
||||
# IDE and editor files
|
||||
.vscode/
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# OS files
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Test coverage
|
||||
coverage/
|
||||
*.profdata
|
||||
*.profraw
|
||||
|
||||
# SQLx offline mode (include this in builds)
|
||||
!.sqlx/
|
||||
|
||||
65
.github/workflows/ci.yml
vendored
Normal file
65
.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,65 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master]
|
||||
pull_request:
|
||||
branches: [master]
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
jobs:
|
||||
check:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install Rust toolchain
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
components: rustfmt, clippy
|
||||
|
||||
- name: Setup Bun
|
||||
uses: oven-sh/setup-bun@v1
|
||||
with:
|
||||
bun-version: latest
|
||||
|
||||
- name: Cache Rust dependencies
|
||||
uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
|
||||
- name: Install frontend dependencies
|
||||
working-directory: web
|
||||
run: bun install --frozen-lockfile
|
||||
|
||||
- name: Check Rust formatting
|
||||
run: cargo fmt --all -- --check
|
||||
|
||||
- name: Check TypeScript formatting
|
||||
working-directory: web
|
||||
run: bun run format:check
|
||||
|
||||
- name: TypeScript type check
|
||||
working-directory: web
|
||||
run: bun run typecheck
|
||||
|
||||
- name: ESLint
|
||||
working-directory: web
|
||||
run: bun run lint
|
||||
|
||||
- name: Clippy
|
||||
run: cargo clippy --all-features -- --deny warnings
|
||||
|
||||
- name: Run tests
|
||||
run: cargo test --all-features
|
||||
|
||||
- name: Build frontend
|
||||
working-directory: web
|
||||
run: bun run build
|
||||
|
||||
- name: Build backend
|
||||
run: cargo build --release --bin banner
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -254,6 +254,7 @@ dependencies = [
|
||||
"time",
|
||||
"tl",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -36,6 +36,7 @@ sqlx = { version = "0.8.6", features = [
|
||||
thiserror = "2.0.16"
|
||||
time = "0.3.43"
|
||||
tokio = { version = "1.47.1", features = ["full"] }
|
||||
tokio-util = "0.7"
|
||||
tl = "0.7.8"
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] }
|
||||
|
||||
74
Dockerfile
74
Dockerfile
@@ -2,11 +2,8 @@
|
||||
ARG RUST_VERSION=1.89.0
|
||||
ARG RAILWAY_GIT_COMMIT_SHA
|
||||
|
||||
# Frontend Build Stage
|
||||
FROM node:22-bookworm-slim AS frontend-builder
|
||||
|
||||
# Install pnpm
|
||||
RUN npm install -g pnpm
|
||||
# --- Frontend Build Stage ---
|
||||
FROM oven/bun:1 AS frontend-builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
@@ -14,63 +11,62 @@ WORKDIR /app
|
||||
COPY ./Cargo.toml ./
|
||||
|
||||
# Copy frontend package files
|
||||
COPY ./web/package.json ./web/pnpm-lock.yaml ./
|
||||
COPY ./web/package.json ./web/bun.lock* ./
|
||||
|
||||
# Install dependencies
|
||||
RUN pnpm install --frozen-lockfile
|
||||
RUN bun install --frozen-lockfile
|
||||
|
||||
# Copy frontend source code
|
||||
COPY ./web ./
|
||||
|
||||
# Build frontend
|
||||
RUN pnpm run build
|
||||
RUN bun run build
|
||||
|
||||
# Rust Build Stage
|
||||
FROM rust:${RUST_VERSION}-bookworm AS builder
|
||||
# --- Chef Base Stage ---
|
||||
FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} AS chef
|
||||
WORKDIR /app
|
||||
|
||||
# --- Planner Stage ---
|
||||
FROM chef AS planner
|
||||
COPY Cargo.toml Cargo.lock ./
|
||||
COPY build.rs ./
|
||||
COPY src ./src
|
||||
# Migrations & .sqlx specifically left out to avoid invalidating cache
|
||||
RUN cargo chef prepare --recipe-path recipe.json --bin banner
|
||||
|
||||
# --- Rust Build Stage ---
|
||||
FROM chef AS builder
|
||||
|
||||
# Set build-time environment variable for Railway Git commit SHA
|
||||
ARG RAILWAY_GIT_COMMIT_SHA
|
||||
ENV RAILWAY_GIT_COMMIT_SHA=${RAILWAY_GIT_COMMIT_SHA}
|
||||
|
||||
# Install build dependencies
|
||||
# Copy recipe from planner and build dependencies only
|
||||
COPY --from=planner /app/recipe.json recipe.json
|
||||
RUN cargo chef cook --release --recipe-path recipe.json --bin banner
|
||||
|
||||
# Install build dependencies for final compilation
|
||||
RUN apt-get update && apt-get install -y \
|
||||
pkg-config \
|
||||
libssl-dev \
|
||||
git \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /usr/src
|
||||
RUN USER=root cargo new --bin banner
|
||||
WORKDIR /usr/src/banner
|
||||
|
||||
# Copy dependency files for better layer caching
|
||||
COPY ./Cargo.toml ./Cargo.lock* ./
|
||||
|
||||
# Copy .git directory for build.rs to access Git information (if available)
|
||||
# This will copy .git (and .gitignore) if it exists, but won't fail if it doesn't
|
||||
# While normally a COPY requires at least one file, .gitignore should still be available, so this wildcard should always work
|
||||
COPY ./.git* ./
|
||||
|
||||
# Copy build.rs early so it can run during the first build
|
||||
COPY ./build.rs ./
|
||||
|
||||
# Build empty app with downloaded dependencies to produce a stable image layer for next build
|
||||
RUN cargo build --release
|
||||
|
||||
# Copy source code
|
||||
RUN rm src/*.rs
|
||||
COPY ./src ./src/
|
||||
|
||||
# Copy built frontend assets
|
||||
# Copy source code and built frontend assets
|
||||
COPY Cargo.toml Cargo.lock ./
|
||||
COPY build.rs ./
|
||||
COPY .git* ./
|
||||
COPY src ./src
|
||||
COPY migrations ./migrations
|
||||
COPY --from=frontend-builder /app/dist ./web/dist
|
||||
|
||||
# Build web app with embedded assets
|
||||
RUN rm ./target/release/deps/banner*
|
||||
RUN cargo build --release
|
||||
RUN cargo build --release --bin banner
|
||||
|
||||
# Strip the binary to reduce size
|
||||
RUN strip target/release/banner
|
||||
|
||||
# Runtime Stage - Debian slim for glibc compatibility
|
||||
# --- Runtime Stage ---
|
||||
FROM debian:12-slim
|
||||
|
||||
ARG APP=/usr/src/app
|
||||
@@ -94,7 +90,7 @@ RUN addgroup --gid $GID $APP_USER \
|
||||
&& mkdir -p ${APP}
|
||||
|
||||
# Copy application binary
|
||||
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/target/release/banner ${APP}/banner
|
||||
COPY --from=builder --chown=$APP_USER:$APP_USER /app/target/release/banner ${APP}/banner
|
||||
|
||||
# Set proper permissions
|
||||
RUN chmod +x ${APP}/banner
|
||||
@@ -117,4 +113,4 @@ ENV HOSTS=0.0.0.0,[::]
|
||||
|
||||
# Implicitly uses PORT environment variable
|
||||
# temporary: running without 'scraper' service
|
||||
CMD ["sh", "-c", "exec ./banner --services web,bot"]
|
||||
CMD ["sh", "-c", "exec ./banner --services web,bot"]
|
||||
|
||||
65
Justfile
65
Justfile
@@ -1,12 +1,65 @@
|
||||
default_services := "bot,web,scraper"
|
||||
|
||||
default:
|
||||
just --list
|
||||
|
||||
# Run all checks (format, clippy, tests, lint)
|
||||
check:
|
||||
cargo fmt --all -- --check
|
||||
cargo clippy --all-features -- --deny warnings
|
||||
cargo nextest run
|
||||
bun run --cwd web typecheck
|
||||
bun run --cwd web lint
|
||||
|
||||
# Format all Rust and TypeScript code
|
||||
format:
|
||||
cargo fmt --all
|
||||
bun run --cwd web format
|
||||
|
||||
# Check formatting without modifying (CI-friendly)
|
||||
format-check:
|
||||
cargo fmt --all -- --check
|
||||
bun run --cwd web format:check
|
||||
|
||||
# Start PostgreSQL in Docker and update .env with connection string
|
||||
db:
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Find available port
|
||||
PORT=$(shuf -i 49152-65535 -n 1)
|
||||
while ss -tlnp 2>/dev/null | grep -q ":$PORT "; do
|
||||
PORT=$(shuf -i 49152-65535 -n 1)
|
||||
done
|
||||
|
||||
# Start PostgreSQL container
|
||||
docker run -d \
|
||||
--name banner-postgres \
|
||||
-e POSTGRES_PASSWORD=banner \
|
||||
-e POSTGRES_USER=banner \
|
||||
-e POSTGRES_DB=banner \
|
||||
-p "$PORT:5432" \
|
||||
postgres:17-alpine
|
||||
|
||||
# Update .env file
|
||||
DB_URL="postgresql://banner:banner@localhost:$PORT/banner"
|
||||
if [ -f .env ]; then
|
||||
sed -i.bak "s|^DATABASE_URL=.*|DATABASE_URL=$DB_URL|" .env
|
||||
else
|
||||
echo "DATABASE_URL=$DB_URL" > .env
|
||||
fi
|
||||
|
||||
echo "PostgreSQL started on port $PORT"
|
||||
echo "DATABASE_URL=$DB_URL"
|
||||
echo "Run: sqlx migrate run"
|
||||
|
||||
# Auto-reloading frontend server
|
||||
frontend:
|
||||
pnpm run -C web dev
|
||||
bun run --cwd web dev
|
||||
|
||||
# Production build of frontend
|
||||
build-frontend:
|
||||
pnpm run -C web build
|
||||
bun run --cwd web build
|
||||
|
||||
# Auto-reloading backend server
|
||||
backend *ARGS:
|
||||
@@ -14,15 +67,13 @@ backend *ARGS:
|
||||
|
||||
# Production build
|
||||
build:
|
||||
pnpm run -C web build
|
||||
bun run --cwd web build
|
||||
cargo build --release --bin banner
|
||||
|
||||
# Run auto-reloading development build with release characteristics (frontend is embedded, non-auto-reloading)
|
||||
# This is useful for testing backend release-mode details.
|
||||
# Run auto-reloading development build with release characteristics
|
||||
dev-build *ARGS='--services web --tracing pretty': build-frontend
|
||||
bacon --headless run -- --profile dev-release -- {{ARGS}}
|
||||
|
||||
# Auto-reloading development build for both frontend and backend
|
||||
# Will not notice if either the frontend/backend crashes, but will generally be resistant to stopping on their own.
|
||||
[parallel]
|
||||
dev *ARGS='--services web,bot': frontend (backend ARGS)
|
||||
dev *ARGS='--services web,bot': frontend (backend ARGS)
|
||||
|
||||
@@ -26,7 +26,7 @@ The application consists of three modular services that can be run independently
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
pnpm install -C web # Install frontend dependencies
|
||||
bun install --cwd web # Install frontend dependencies
|
||||
cargo build # Build the backend
|
||||
|
||||
just dev # Runs auto-reloading dev build
|
||||
|
||||
3
migrations/20251103093649_add_retry_tracking.sql
Normal file
3
migrations/20251103093649_add_retry_tracking.sql
Normal file
@@ -0,0 +1,3 @@
|
||||
-- Add retry tracking columns to scrape_jobs table
|
||||
ALTER TABLE scrape_jobs ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0);
|
||||
ALTER TABLE scrape_jobs ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 5 CHECK (max_retries >= 0);
|
||||
45
migrations/20251103104300_add_performance_indexes.sql
Normal file
45
migrations/20251103104300_add_performance_indexes.sql
Normal file
@@ -0,0 +1,45 @@
|
||||
-- Performance optimization indexes
|
||||
|
||||
-- Index for term-based queries (most common access pattern)
|
||||
CREATE INDEX IF NOT EXISTS idx_courses_term_code ON courses(term_code);
|
||||
|
||||
-- Index for subject-based filtering
|
||||
CREATE INDEX IF NOT EXISTS idx_courses_subject ON courses(subject);
|
||||
|
||||
-- Composite index for subject + term queries
|
||||
CREATE INDEX IF NOT EXISTS idx_courses_subject_term ON courses(subject, term_code);
|
||||
|
||||
-- Index for course number lookups
|
||||
CREATE INDEX IF NOT EXISTS idx_courses_course_number ON courses(course_number);
|
||||
|
||||
-- Index for last scraped timestamp (useful for finding stale data)
|
||||
CREATE INDEX IF NOT EXISTS idx_courses_last_scraped ON courses(last_scraped_at);
|
||||
|
||||
-- Index for course metrics time-series queries
|
||||
-- BRIN index is optimal for time-series data
|
||||
CREATE INDEX IF NOT EXISTS idx_course_metrics_timestamp ON course_metrics USING BRIN(timestamp);
|
||||
|
||||
-- B-tree index for specific course metric lookups
|
||||
CREATE INDEX IF NOT EXISTS idx_course_metrics_course_timestamp
|
||||
ON course_metrics(course_id, timestamp DESC);
|
||||
|
||||
-- Partial index for pending scrape jobs (only unlocked jobs)
|
||||
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_pending
|
||||
ON scrape_jobs(execute_at ASC)
|
||||
WHERE locked_at IS NULL;
|
||||
|
||||
-- Index for high-priority job processing
|
||||
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_priority_pending
|
||||
ON scrape_jobs(priority DESC, execute_at ASC)
|
||||
WHERE locked_at IS NULL;
|
||||
|
||||
-- Index for retry tracking
|
||||
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_retry_count
|
||||
ON scrape_jobs(retry_count)
|
||||
WHERE retry_count > 0 AND locked_at IS NULL;
|
||||
|
||||
-- Analyze tables to update statistics
|
||||
ANALYZE courses;
|
||||
ANALYZE course_metrics;
|
||||
ANALYZE course_audits;
|
||||
ANALYZE scrape_jobs;
|
||||
53
migrations/20251103104400_optimize_indexes.sql
Normal file
53
migrations/20251103104400_optimize_indexes.sql
Normal file
@@ -0,0 +1,53 @@
|
||||
-- Index Optimization Follow-up Migration
|
||||
|
||||
-- Reason: Redundant with composite index idx_courses_subject_term
|
||||
DROP INDEX IF EXISTS idx_courses_subject;
|
||||
|
||||
-- Remove: idx_scrape_jobs_retry_count
|
||||
DROP INDEX IF EXISTS idx_scrape_jobs_retry_count;
|
||||
|
||||
-- Purpose: Optimize the scheduler's frequent query (runs every 60 seconds)
|
||||
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_scheduler_lookup
|
||||
ON scrape_jobs(target_type, target_payload)
|
||||
WHERE locked_at IS NULL;
|
||||
|
||||
-- Note: We use (target_type, target_payload) instead of including locked_at
|
||||
-- in the index columns because:
|
||||
-- 1. The WHERE clause filters locked_at IS NULL (partial index optimization)
|
||||
-- 2. target_payload is JSONB and already large; keeping it as an indexed column
|
||||
-- allows PostgreSQL to use index-only scans for the SELECT target_payload query
|
||||
-- 3. This design minimizes index size while maximizing query performance
|
||||
|
||||
|
||||
-- Purpose: Enable efficient audit trail queries by course
|
||||
CREATE INDEX IF NOT EXISTS idx_course_audits_course_timestamp
|
||||
ON course_audits(course_id, timestamp DESC);
|
||||
|
||||
-- Purpose: Enable queries like "Show all changes in the last 24 hours"
|
||||
CREATE INDEX IF NOT EXISTS idx_course_audits_timestamp
|
||||
ON course_audits(timestamp DESC);
|
||||
|
||||
|
||||
-- The BRIN index on course_metrics(timestamp) assumes data is inserted in
|
||||
-- chronological order. BRIN indexes are only effective when data is physically
|
||||
-- ordered on disk. If you perform:
|
||||
-- - Backfills of historical data
|
||||
-- - Out-of-order inserts
|
||||
-- - Frequent UPDATEs that move rows
|
||||
--
|
||||
-- Then the BRIN index effectiveness will degrade. Monitor with:
|
||||
-- SELECT * FROM brin_page_items(get_raw_page('idx_course_metrics_timestamp', 1));
|
||||
--
|
||||
-- If you see poor selectivity, consider:
|
||||
-- 1. REINDEX to rebuild after bulk loads
|
||||
-- 2. Switch to B-tree if inserts are not time-ordered
|
||||
-- 3. Use CLUSTER to physically reorder the table (requires downtime)
|
||||
|
||||
COMMENT ON INDEX idx_course_metrics_timestamp IS
|
||||
'BRIN index - requires chronologically ordered inserts for efficiency. Monitor selectivity.';
|
||||
|
||||
-- Update statistics for query planner
|
||||
ANALYZE courses;
|
||||
ANALYZE course_metrics;
|
||||
ANALYZE course_audits;
|
||||
ANALYZE scrape_jobs;
|
||||
49
src/app.rs
49
src/app.rs
@@ -42,11 +42,7 @@ impl App {
|
||||
|
||||
// Check if the database URL is via private networking
|
||||
let is_private = config.database_url.contains("railway.internal");
|
||||
let slow_threshold = if is_private {
|
||||
Duration::from_millis(200)
|
||||
} else {
|
||||
Duration::from_millis(500)
|
||||
};
|
||||
let slow_threshold = Duration::from_millis(if is_private { 200 } else { 500 });
|
||||
|
||||
// Create database connection pool
|
||||
let db_pool = PgPoolOptions::new()
|
||||
@@ -66,6 +62,14 @@ impl App {
|
||||
"database pool established"
|
||||
);
|
||||
|
||||
// Run database migrations
|
||||
info!("Running database migrations...");
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&db_pool)
|
||||
.await
|
||||
.expect("Failed to run database migrations");
|
||||
info!("Database migrations completed successfully");
|
||||
|
||||
// Create BannerApi and AppState
|
||||
let banner_api = BannerApi::new_with_config(
|
||||
config.banner_base_url.clone(),
|
||||
@@ -108,10 +112,6 @@ impl App {
|
||||
.register_service(ServiceName::Scraper.as_str(), scraper_service);
|
||||
}
|
||||
|
||||
if services.contains(&ServiceName::Bot) {
|
||||
// Bot service will be set up separately in run() method since it's async
|
||||
}
|
||||
|
||||
// Check if any services are enabled
|
||||
if !self.service_manager.has_services() && !services.contains(&ServiceName::Bot) {
|
||||
error!("No services enabled. Cannot start application.");
|
||||
@@ -123,10 +123,28 @@ impl App {
|
||||
|
||||
/// Setup bot service if enabled
|
||||
pub async fn setup_bot_service(&mut self) -> Result<(), anyhow::Error> {
|
||||
let client = BotService::create_client(&self.config, self.app_state.clone())
|
||||
.await
|
||||
.expect("Failed to create Discord client");
|
||||
let bot_service = Box::new(BotService::new(client));
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
|
||||
// Create shutdown channel for status update task
|
||||
let (status_shutdown_tx, status_shutdown_rx) = broadcast::channel(1);
|
||||
let status_task_handle = Arc::new(Mutex::new(None));
|
||||
|
||||
let client = BotService::create_client(
|
||||
&self.config,
|
||||
self.app_state.clone(),
|
||||
status_task_handle.clone(),
|
||||
status_shutdown_rx,
|
||||
)
|
||||
.await
|
||||
.expect("Failed to create Discord client");
|
||||
|
||||
let bot_service = Box::new(BotService::new(
|
||||
client,
|
||||
status_task_handle,
|
||||
status_shutdown_tx,
|
||||
));
|
||||
|
||||
self.service_manager
|
||||
.register_service(ServiceName::Bot.as_str(), bot_service);
|
||||
Ok(())
|
||||
@@ -147,9 +165,4 @@ impl App {
|
||||
pub fn config(&self) -> &Config {
|
||||
&self.config
|
||||
}
|
||||
|
||||
/// Get a reference to the app state
|
||||
pub fn app_state(&self) -> &AppState {
|
||||
&self.app_state
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,6 +152,13 @@ impl BannerApi {
|
||||
}
|
||||
|
||||
/// Performs a course search and handles common response processing.
|
||||
#[tracing::instrument(
|
||||
skip(self, query),
|
||||
fields(
|
||||
term = %term,
|
||||
subject = %query.get_subject().unwrap_or(&"all".to_string())
|
||||
)
|
||||
)]
|
||||
async fn perform_search(
|
||||
&self,
|
||||
term: &str,
|
||||
@@ -318,12 +325,6 @@ impl BannerApi {
|
||||
sort: &str,
|
||||
sort_descending: bool,
|
||||
) -> Result<SearchResult, BannerApiError> {
|
||||
debug!(
|
||||
term = term,
|
||||
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
|
||||
max_results = query.get_max_results(),
|
||||
"Starting course search"
|
||||
);
|
||||
self.perform_search(term, query, sort, sort_descending)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
//! JSON parsing utilities for the Banner API client.
|
||||
|
||||
use anyhow::Result;
|
||||
use serde_json;
|
||||
use serde_json::{self, Value};
|
||||
|
||||
/// Attempt to parse JSON and, on failure, include a contextual snippet of the
|
||||
/// line where the error occurred. This prevents dumping huge JSON bodies to logs.
|
||||
/// line where the error occurred.
|
||||
///
|
||||
/// In debug builds, this provides detailed context including the full JSON object
|
||||
/// containing the error and type mismatch information. In release builds, it shows
|
||||
/// a minimal snippet to prevent dumping huge JSON bodies to production logs.
|
||||
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
|
||||
let jd = &mut serde_json::Deserializer::from_str(body);
|
||||
match serde_path_to_error::deserialize(jd) {
|
||||
@@ -12,27 +16,247 @@ pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Re
|
||||
Err(err) => {
|
||||
let inner_err = err.inner();
|
||||
let (line, column) = (inner_err.line(), inner_err.column());
|
||||
let snippet = build_error_snippet(body, line, column, 20);
|
||||
let path = err.path().to_string();
|
||||
|
||||
let msg = inner_err.to_string();
|
||||
let loc = format!(" at line {line} column {column}");
|
||||
let msg_without_loc = msg.strip_suffix(&loc).unwrap_or(&msg).to_string();
|
||||
|
||||
let mut final_err = String::new();
|
||||
if !path.is_empty() && path != "." {
|
||||
final_err.push_str(&format!("for path '{}' ", path));
|
||||
}
|
||||
final_err.push_str(&format!(
|
||||
"({msg_without_loc}) at line {line} column {column}"
|
||||
));
|
||||
final_err.push_str(&format!("\n{snippet}"));
|
||||
// Build error message differently for debug vs release builds
|
||||
let final_err = if cfg!(debug_assertions) {
|
||||
// Debug mode: provide detailed context
|
||||
let type_info = parse_type_mismatch(&msg_without_loc);
|
||||
let context = extract_json_object_at_path(body, err.path(), line, column);
|
||||
|
||||
let mut err_msg = String::new();
|
||||
if !path.is_empty() && path != "." {
|
||||
err_msg.push_str(&format!("for path '{}'\n", path));
|
||||
}
|
||||
err_msg.push_str(&format!(
|
||||
"({}) at line {} column {}\n\n",
|
||||
type_info, line, column
|
||||
));
|
||||
err_msg.push_str(&context);
|
||||
|
||||
err_msg
|
||||
} else {
|
||||
// Release mode: minimal snippet to keep logs concise
|
||||
let snippet = build_error_snippet(body, line, column, 20);
|
||||
|
||||
let mut err_msg = String::new();
|
||||
if !path.is_empty() && path != "." {
|
||||
err_msg.push_str(&format!("for path '{}' ", path));
|
||||
}
|
||||
err_msg.push_str(&format!(
|
||||
"({}) at line {} column {}",
|
||||
msg_without_loc, line, column
|
||||
));
|
||||
err_msg.push_str(&format!("\n{}", snippet));
|
||||
|
||||
err_msg
|
||||
};
|
||||
|
||||
Err(anyhow::anyhow!(final_err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract type mismatch information from a serde error message.
|
||||
///
|
||||
/// Parses error messages like "invalid type: null, expected a string" to extract
|
||||
/// the expected and actual types for clearer error reporting.
|
||||
///
|
||||
/// Returns a formatted string like "(expected a string, got null)" or the original
|
||||
/// message if parsing fails.
|
||||
fn parse_type_mismatch(error_msg: &str) -> String {
|
||||
// Try to parse "invalid type: X, expected Y" format
|
||||
if let Some(invalid_start) = error_msg.find("invalid type: ") {
|
||||
let after_prefix = &error_msg[invalid_start + "invalid type: ".len()..];
|
||||
|
||||
if let Some(comma_pos) = after_prefix.find(", expected ") {
|
||||
let actual_type = &after_prefix[..comma_pos];
|
||||
let expected_part = &after_prefix[comma_pos + ", expected ".len()..];
|
||||
|
||||
// Clean up expected part (remove " at line X column Y" if present)
|
||||
let expected_type = expected_part
|
||||
.split(" at line ")
|
||||
.next()
|
||||
.unwrap_or(expected_part)
|
||||
.trim();
|
||||
|
||||
return format!("expected {}, got {}", expected_type, actual_type);
|
||||
}
|
||||
}
|
||||
|
||||
// Try to parse "expected X at line Y" format
|
||||
if error_msg.starts_with("expected ")
|
||||
&& let Some(expected_part) = error_msg.split(" at line ").next()
|
||||
{
|
||||
return expected_part.to_string();
|
||||
}
|
||||
|
||||
// Fallback: return original message without location info
|
||||
error_msg.to_string()
|
||||
}
|
||||
|
||||
/// Extract and pretty-print the JSON object/array containing the parse error.
|
||||
///
|
||||
/// This function navigates to the error location using the serde path and extracts
|
||||
/// the parent object or array to provide better context for debugging.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `body` - The raw JSON string
|
||||
/// * `path` - The serde path to the error (e.g., "data[0].faculty[0].displayName")
|
||||
/// * `line` - Line number of the error (for fallback)
|
||||
/// * `column` - Column number of the error (for fallback)
|
||||
///
|
||||
/// # Returns
|
||||
/// A formatted string containing the JSON object with the error, or a fallback snippet
|
||||
fn extract_json_object_at_path(
|
||||
body: &str,
|
||||
path: &serde_path_to_error::Path,
|
||||
line: usize,
|
||||
column: usize,
|
||||
) -> String {
|
||||
// Try to parse the entire JSON structure
|
||||
let root_value: Value = match serde_json::from_str(body) {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
// If we can't parse the JSON at all, fall back to line snippet
|
||||
return build_error_snippet(body, line, column, 20);
|
||||
}
|
||||
};
|
||||
|
||||
// Navigate to the error location using the path
|
||||
let path_str = path.to_string();
|
||||
let segments = parse_path_segments(&path_str);
|
||||
|
||||
let (context_value, context_name) = navigate_to_context(&root_value, &segments);
|
||||
|
||||
// Pretty-print the context value with limited depth to avoid huge output
|
||||
match serde_json::to_string_pretty(&context_value) {
|
||||
Ok(pretty) => {
|
||||
// Limit output to ~50 lines to prevent log spam
|
||||
let lines: Vec<&str> = pretty.lines().collect();
|
||||
let truncated = if lines.len() > 50 {
|
||||
let mut result = lines[..47].join("\n");
|
||||
result.push_str("\n ... (truncated, ");
|
||||
result.push_str(&(lines.len() - 47).to_string());
|
||||
result.push_str(" more lines)");
|
||||
result
|
||||
} else {
|
||||
pretty
|
||||
};
|
||||
|
||||
format!("{} at '{}':\n{}", context_name, path_str, truncated)
|
||||
}
|
||||
Err(_) => {
|
||||
// Fallback to simple snippet if pretty-print fails
|
||||
build_error_snippet(body, line, column, 20)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a JSON path string into segments for navigation.
|
||||
///
|
||||
/// Converts paths like "data[0].faculty[1].displayName" into a sequence of
|
||||
/// object keys and array indices.
|
||||
fn parse_path_segments(path: &str) -> Vec<PathSegment> {
|
||||
let mut segments = Vec::new();
|
||||
let mut current = String::new();
|
||||
let mut in_bracket = false;
|
||||
|
||||
for ch in path.chars() {
|
||||
match ch {
|
||||
'.' if !in_bracket => {
|
||||
if !current.is_empty() {
|
||||
segments.push(PathSegment::Key(current.clone()));
|
||||
current.clear();
|
||||
}
|
||||
}
|
||||
'[' => {
|
||||
if !current.is_empty() {
|
||||
segments.push(PathSegment::Key(current.clone()));
|
||||
current.clear();
|
||||
}
|
||||
in_bracket = true;
|
||||
}
|
||||
']' => {
|
||||
if in_bracket && !current.is_empty() {
|
||||
if let Ok(index) = current.parse::<usize>() {
|
||||
segments.push(PathSegment::Index(index));
|
||||
}
|
||||
current.clear();
|
||||
}
|
||||
in_bracket = false;
|
||||
}
|
||||
_ => current.push(ch),
|
||||
}
|
||||
}
|
||||
|
||||
if !current.is_empty() {
|
||||
segments.push(PathSegment::Key(current));
|
||||
}
|
||||
|
||||
segments
|
||||
}
|
||||
|
||||
/// Represents a segment in a JSON path (either an object key or array index).
|
||||
#[derive(Debug)]
|
||||
enum PathSegment {
|
||||
Key(String),
|
||||
Index(usize),
|
||||
}
|
||||
|
||||
/// Navigate through a JSON value using path segments and return the appropriate context.
|
||||
///
|
||||
/// This function walks the JSON structure and returns the parent object/array that
|
||||
/// contains the error, providing meaningful context for debugging.
|
||||
///
|
||||
/// # Returns
|
||||
/// A tuple of (context_value, description) where context_value is the JSON to display
|
||||
/// and description is a human-readable name for what we're showing.
|
||||
fn navigate_to_context<'a>(
|
||||
mut current: &'a Value,
|
||||
segments: &[PathSegment],
|
||||
) -> (&'a Value, &'static str) {
|
||||
// If path is empty or just root, return the whole value
|
||||
if segments.is_empty() {
|
||||
return (current, "Root object");
|
||||
}
|
||||
|
||||
// Try to navigate to the parent of the error location
|
||||
// We want to show the containing object/array, not just the failing field
|
||||
let parent_depth = segments.len().saturating_sub(1);
|
||||
|
||||
for (i, segment) in segments.iter().enumerate() {
|
||||
// Stop one level before the end to show the parent context
|
||||
if i >= parent_depth {
|
||||
break;
|
||||
}
|
||||
|
||||
match segment {
|
||||
PathSegment::Key(key) => {
|
||||
if let Some(next) = current.get(key) {
|
||||
current = next;
|
||||
} else {
|
||||
// Can't navigate further, return what we have
|
||||
return (current, "Partial context (navigation stopped)");
|
||||
}
|
||||
}
|
||||
PathSegment::Index(idx) => {
|
||||
if let Some(next) = current.get(idx) {
|
||||
current = next;
|
||||
} else {
|
||||
return (current, "Partial context (index out of bounds)");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(current, "Object containing error")
|
||||
}
|
||||
|
||||
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
|
||||
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
|
||||
if target_line.is_empty() {
|
||||
@@ -53,3 +277,139 @@ fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usiz
|
||||
|
||||
format!("...{slice}...\n {indicator}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde::Deserialize;
|
||||
|
||||
#[test]
|
||||
fn test_parse_type_mismatch_invalid_type() {
|
||||
let msg = "invalid type: null, expected a string at line 45 column 29";
|
||||
let result = parse_type_mismatch(msg);
|
||||
assert_eq!(result, "expected a string, got null");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_type_mismatch_expected() {
|
||||
let msg = "expected value at line 1 column 1";
|
||||
let result = parse_type_mismatch(msg);
|
||||
assert_eq!(result, "expected value");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_path_segments_simple() {
|
||||
let segments = parse_path_segments("data.name");
|
||||
assert_eq!(segments.len(), 2);
|
||||
match &segments[0] {
|
||||
PathSegment::Key(k) => assert_eq!(k, "data"),
|
||||
_ => panic!("Expected Key segment"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_path_segments_with_array() {
|
||||
let segments = parse_path_segments("data[0].faculty[1].displayName");
|
||||
assert_eq!(segments.len(), 5);
|
||||
match &segments[0] {
|
||||
PathSegment::Key(k) => assert_eq!(k, "data"),
|
||||
_ => panic!("Expected Key segment"),
|
||||
}
|
||||
match &segments[1] {
|
||||
PathSegment::Index(i) => assert_eq!(*i, 0),
|
||||
_ => panic!("Expected Index segment"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_with_context_null_value() {
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct TestStruct {
|
||||
name: String,
|
||||
}
|
||||
|
||||
let json = r#"{"name": null}"#;
|
||||
let result: Result<TestStruct> = parse_json_with_context(json);
|
||||
|
||||
assert!(result.is_err());
|
||||
let err_msg = result.unwrap_err().to_string();
|
||||
|
||||
// Should contain path info
|
||||
assert!(err_msg.contains("name"));
|
||||
|
||||
// In debug mode, should contain detailed context
|
||||
if cfg!(debug_assertions) {
|
||||
assert!(err_msg.contains("expected"));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_navigate_to_context() {
|
||||
let json = r#"{"data": [{"faculty": [{"name": "John"}]}]}"#;
|
||||
let value: Value = serde_json::from_str(json).unwrap();
|
||||
|
||||
let segments = parse_path_segments("data[0].faculty[0].name");
|
||||
let (context, _) = navigate_to_context(&value, &segments);
|
||||
|
||||
// Should return the faculty[0] object (parent of 'name')
|
||||
assert!(context.is_object());
|
||||
assert!(context.get("name").is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_realistic_banner_error() {
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Course {
|
||||
#[allow(dead_code)]
|
||||
#[serde(rename = "courseTitle")]
|
||||
course_title: String,
|
||||
faculty: Vec<Faculty>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Faculty {
|
||||
#[serde(rename = "displayName")]
|
||||
display_name: String,
|
||||
#[allow(dead_code)]
|
||||
email: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SearchResult {
|
||||
data: Vec<Course>,
|
||||
}
|
||||
|
||||
// Simulate Banner API response with null faculty displayName
|
||||
// This mimics the actual error from SPN subject scrape
|
||||
let json = r#"{
|
||||
"data": [
|
||||
{
|
||||
"courseTitle": "Spanish Conversation",
|
||||
"faculty": [
|
||||
{
|
||||
"displayName": null,
|
||||
"email": "instructor@utsa.edu"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let result: Result<SearchResult> = parse_json_with_context(json);
|
||||
assert!(result.is_err());
|
||||
|
||||
let err_msg = result.unwrap_err().to_string();
|
||||
println!("\n=== Error output in debug mode ===\n{}\n", err_msg);
|
||||
|
||||
// Verify error contains key information
|
||||
assert!(err_msg.contains("data[0].faculty[0].displayName"));
|
||||
|
||||
// In debug mode, should show detailed context
|
||||
if cfg!(debug_assertions) {
|
||||
// Should show type mismatch info
|
||||
assert!(err_msg.contains("expected") && err_msg.contains("got"));
|
||||
// Should show surrounding JSON context with the faculty object
|
||||
assert!(err_msg.contains("email"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,10 +3,13 @@
|
||||
use http::Extensions;
|
||||
use reqwest::{Request, Response};
|
||||
use reqwest_middleware::{Middleware, Next};
|
||||
use tracing::{trace, warn};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
pub struct TransparentMiddleware;
|
||||
|
||||
/// Threshold for logging slow requests at DEBUG level (in milliseconds)
|
||||
const SLOW_REQUEST_THRESHOLD_MS: u128 = 1000;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Middleware for TransparentMiddleware {
|
||||
async fn handle(
|
||||
@@ -15,33 +18,56 @@ impl Middleware for TransparentMiddleware {
|
||||
extensions: &mut Extensions,
|
||||
next: Next<'_>,
|
||||
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
||||
trace!(
|
||||
domain = req.url().domain(),
|
||||
headers = ?req.headers(),
|
||||
"{method} {path}",
|
||||
method = req.method().to_string(),
|
||||
path = req.url().path(),
|
||||
);
|
||||
let method = req.method().to_string();
|
||||
let path = req.url().path().to_string();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let response_result = next.run(req, extensions).await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
match response_result {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
trace!(
|
||||
"{code} {reason} {path}",
|
||||
code = response.status().as_u16(),
|
||||
reason = response.status().canonical_reason().unwrap_or("??"),
|
||||
path = response.url().path(),
|
||||
);
|
||||
let duration_ms = duration.as_millis();
|
||||
if duration_ms >= SLOW_REQUEST_THRESHOLD_MS {
|
||||
debug!(
|
||||
method = method,
|
||||
path = path,
|
||||
status = response.status().as_u16(),
|
||||
duration_ms = duration_ms,
|
||||
"Request completed (slow)"
|
||||
);
|
||||
} else {
|
||||
trace!(
|
||||
method = method,
|
||||
path = path,
|
||||
status = response.status().as_u16(),
|
||||
duration_ms = duration_ms,
|
||||
"Request completed"
|
||||
);
|
||||
}
|
||||
Ok(response)
|
||||
} else {
|
||||
let e = response.error_for_status_ref().unwrap_err();
|
||||
warn!(error = ?e, "Request failed (server)");
|
||||
warn!(
|
||||
method = method,
|
||||
path = path,
|
||||
error = ?e,
|
||||
status = response.status().as_u16(),
|
||||
duration_ms = duration.as_millis(),
|
||||
"Request failed"
|
||||
);
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(error = ?error, "Request failed (middleware)");
|
||||
warn!(
|
||||
method = method,
|
||||
path = path,
|
||||
error = ?error,
|
||||
duration_ms = duration.as_millis(),
|
||||
"Request failed"
|
||||
);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::banner::rate_limiter::{RequestType, SharedRateLimiter};
|
||||
use http::Extensions;
|
||||
use reqwest::{Request, Response};
|
||||
use reqwest_middleware::{Middleware, Next};
|
||||
use tracing::{debug, trace, warn};
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
/// Middleware that enforces rate limiting based on request URL patterns
|
||||
@@ -18,6 +18,16 @@ impl RateLimitMiddleware {
|
||||
Self { rate_limiter }
|
||||
}
|
||||
|
||||
/// Returns a human-readable description of the rate limit for a request type
|
||||
fn get_rate_limit_description(request_type: RequestType) -> &'static str {
|
||||
match request_type {
|
||||
RequestType::Session => "6 rpm (~10s interval)",
|
||||
RequestType::Search => "30 rpm (~2s interval)",
|
||||
RequestType::Metadata => "20 rpm (~3s interval)",
|
||||
RequestType::Reset => "10 rpm (~6s interval)",
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the request type based on the URL path
|
||||
fn get_request_type(url: &Url) -> RequestType {
|
||||
let path = url.path();
|
||||
@@ -53,49 +63,22 @@ impl Middleware for RateLimitMiddleware {
|
||||
) -> std::result::Result<Response, reqwest_middleware::Error> {
|
||||
let request_type = Self::get_request_type(req.url());
|
||||
|
||||
trace!(
|
||||
url = %req.url(),
|
||||
request_type = ?request_type,
|
||||
"Rate limiting request"
|
||||
);
|
||||
|
||||
// Wait for permission to make the request
|
||||
let start = std::time::Instant::now();
|
||||
self.rate_limiter.wait_for_permission(request_type).await;
|
||||
let wait_duration = start.elapsed();
|
||||
|
||||
trace!(
|
||||
url = %req.url(),
|
||||
request_type = ?request_type,
|
||||
"Rate limit permission granted, making request"
|
||||
);
|
||||
// Only log if rate limiting caused significant delay (>= 500ms)
|
||||
if wait_duration.as_millis() >= 500 {
|
||||
let limit_desc = Self::get_rate_limit_description(request_type);
|
||||
debug!(
|
||||
request_type = ?request_type,
|
||||
wait_ms = wait_duration.as_millis(),
|
||||
rate_limit = limit_desc,
|
||||
"Rate limit caused delay"
|
||||
);
|
||||
}
|
||||
|
||||
// Make the actual request
|
||||
let response_result = next.run(req, extensions).await;
|
||||
|
||||
match response_result {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
trace!(
|
||||
url = %response.url(),
|
||||
status = response.status().as_u16(),
|
||||
"Request completed successfully"
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
url = %response.url(),
|
||||
status = response.status().as_u16(),
|
||||
"Request completed with error status"
|
||||
);
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(
|
||||
url = ?error.url(),
|
||||
error = ?error,
|
||||
"Request failed"
|
||||
);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
next.run(req, extensions).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use governor::{
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
/// Different types of Banner API requests with different rate limits
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
@@ -99,12 +98,8 @@ impl BannerRateLimiter {
|
||||
RequestType::Reset => &self.reset_limiter,
|
||||
};
|
||||
|
||||
trace!(request_type = ?request_type, "Waiting for rate limit permission");
|
||||
|
||||
// Wait until we can make the request
|
||||
// Wait until we can make the request (logging handled by middleware)
|
||||
limiter.until_ready().await;
|
||||
|
||||
trace!(request_type = ?request_type, "Rate limit permission granted");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,6 @@ impl BannerSession {
|
||||
|
||||
/// Updates the last activity timestamp
|
||||
pub fn touch(&mut self) {
|
||||
trace!(id = self.unique_session_id, "Session was used");
|
||||
self.last_activity = Some(Instant::now());
|
||||
}
|
||||
|
||||
@@ -162,7 +161,7 @@ impl TermPool {
|
||||
async fn release(&self, session: BannerSession) {
|
||||
let id = session.unique_session_id.clone();
|
||||
if session.is_expired() {
|
||||
trace!(id = id, "Session is now expired, dropping.");
|
||||
debug!(id = id, "Session expired, dropping");
|
||||
// Wake up a waiter, as it might need to create a new session
|
||||
// if this was the last one.
|
||||
self.notifier.notify_one();
|
||||
@@ -171,10 +170,8 @@ impl TermPool {
|
||||
|
||||
let mut queue = self.sessions.lock().await;
|
||||
queue.push_back(session);
|
||||
let queue_size = queue.len();
|
||||
drop(queue); // Release lock before notifying
|
||||
|
||||
trace!(id = id, queue_size, "Session returned to pool");
|
||||
self.notifier.notify_one();
|
||||
}
|
||||
}
|
||||
@@ -204,22 +201,21 @@ impl SessionPool {
|
||||
.or_insert_with(|| Arc::new(TermPool::new()))
|
||||
.clone();
|
||||
|
||||
let start = Instant::now();
|
||||
let mut waited_for_creation = false;
|
||||
|
||||
loop {
|
||||
// Fast path: Try to get an existing, non-expired session.
|
||||
{
|
||||
let mut queue = term_pool.sessions.lock().await;
|
||||
if let Some(session) = queue.pop_front() {
|
||||
if !session.is_expired() {
|
||||
trace!(id = session.unique_session_id, "Reusing session from pool");
|
||||
return Ok(PooledSession {
|
||||
session: Some(session),
|
||||
pool: Arc::clone(&term_pool),
|
||||
});
|
||||
} else {
|
||||
trace!(
|
||||
id = session.unique_session_id,
|
||||
"Popped an expired session, discarding."
|
||||
);
|
||||
debug!(id = session.unique_session_id, "Discarded expired session");
|
||||
}
|
||||
}
|
||||
} // MutexGuard is dropped, lock is released.
|
||||
@@ -229,7 +225,10 @@ impl SessionPool {
|
||||
if *is_creating_guard {
|
||||
// Another task is already creating a session. Release the lock and wait.
|
||||
drop(is_creating_guard);
|
||||
trace!("Another task is creating a session, waiting for notification...");
|
||||
if !waited_for_creation {
|
||||
trace!("Waiting for another task to create session");
|
||||
waited_for_creation = true;
|
||||
}
|
||||
term_pool.notifier.notified().await;
|
||||
// Loop back to the top to try the fast path again.
|
||||
continue;
|
||||
@@ -240,12 +239,11 @@ impl SessionPool {
|
||||
drop(is_creating_guard);
|
||||
|
||||
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
|
||||
trace!("Pool empty, racing notifier vs rate limiter...");
|
||||
trace!("Pool empty, creating new session");
|
||||
tokio::select! {
|
||||
_ = term_pool.notifier.notified() => {
|
||||
// A session was returned while we were waiting!
|
||||
// We are no longer the creator. Reset the flag and loop to race for the new session.
|
||||
trace!("Notified that a session was returned. Looping to retry.");
|
||||
let mut guard = term_pool.is_creating.lock().await;
|
||||
*guard = false;
|
||||
drop(guard);
|
||||
@@ -253,7 +251,6 @@ impl SessionPool {
|
||||
}
|
||||
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
|
||||
// The rate limit has elapsed. It's our job to create the session.
|
||||
trace!("Rate limiter ready. Proceeding to create a new session.");
|
||||
let new_session_result = self.create_session(&term).await;
|
||||
|
||||
// After creation, we are no longer the creator. Reset the flag
|
||||
@@ -265,7 +262,12 @@ impl SessionPool {
|
||||
|
||||
match new_session_result {
|
||||
Ok(new_session) => {
|
||||
debug!(id = new_session.unique_session_id, "Successfully created new session");
|
||||
let elapsed = start.elapsed();
|
||||
debug!(
|
||||
id = new_session.unique_session_id,
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"Created new session"
|
||||
);
|
||||
return Ok(PooledSession {
|
||||
session: Some(new_session),
|
||||
pool: term_pool,
|
||||
@@ -298,8 +300,12 @@ impl SessionPool {
|
||||
.get_all("Set-Cookie")
|
||||
.iter()
|
||||
.filter_map(|header_value| {
|
||||
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) {
|
||||
Some((cookie.name().to_string(), cookie.value().to_string()))
|
||||
if let Ok(cookie_str) = header_value.to_str() {
|
||||
if let Ok(cookie) = Cookie::parse(cookie_str) {
|
||||
Some((cookie.name().to_string(), cookie.value().to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -310,16 +316,14 @@ impl SessionPool {
|
||||
return Err(anyhow::anyhow!("Failed to get cookies"));
|
||||
}
|
||||
|
||||
let jsessionid = cookies.get("JSESSIONID").unwrap();
|
||||
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
|
||||
let jsessionid = cookies
|
||||
.get("JSESSIONID")
|
||||
.ok_or_else(|| anyhow::anyhow!("JSESSIONID cookie missing after validation"))?;
|
||||
let ssb_cookie = cookies
|
||||
.get("SSB_COOKIE")
|
||||
.ok_or_else(|| anyhow::anyhow!("SSB_COOKIE cookie missing after validation"))?;
|
||||
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
|
||||
|
||||
trace!(
|
||||
jsessionid = jsessionid,
|
||||
ssb_cookie = ssb_cookie,
|
||||
"New session cookies acquired"
|
||||
);
|
||||
|
||||
self.http
|
||||
.get(format!("{}/selfServiceMenu/data", self.base_url))
|
||||
.header("Cookie", &cookie_header)
|
||||
@@ -435,8 +439,23 @@ impl SessionPool {
|
||||
|
||||
let redirect: RedirectResponse = response.json().await?;
|
||||
|
||||
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string();
|
||||
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap();
|
||||
let base_url_path = self
|
||||
.base_url
|
||||
.parse::<Url>()
|
||||
.context("Failed to parse base URL")?
|
||||
.path()
|
||||
.to_string();
|
||||
let non_overlap_redirect =
|
||||
redirect
|
||||
.fwd_url
|
||||
.strip_prefix(&base_url_path)
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Redirect URL '{}' does not start with expected prefix '{}'",
|
||||
redirect.fwd_url,
|
||||
base_url_path
|
||||
)
|
||||
})?;
|
||||
|
||||
// Follow the redirect
|
||||
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
|
||||
@@ -454,7 +473,6 @@ impl SessionPool {
|
||||
));
|
||||
}
|
||||
|
||||
trace!(term = term, "successfully selected term");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
135
src/data/batch.rs
Normal file
135
src/data/batch.rs
Normal file
@@ -0,0 +1,135 @@
|
||||
//! Batch database operations for improved performance.
|
||||
|
||||
use crate::banner::Course;
|
||||
use crate::error::Result;
|
||||
use sqlx::PgPool;
|
||||
use std::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
/// Batch upsert courses in a single database query.
|
||||
///
|
||||
/// This function performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses
|
||||
/// in a single round-trip to the database, significantly reducing overhead compared
|
||||
/// to individual inserts.
|
||||
///
|
||||
/// # Performance
|
||||
/// - Reduces N database round-trips to 1
|
||||
/// - Typical usage: 50-200 courses per batch
|
||||
/// - PostgreSQL parameter limit: 65,535 (we use ~10 per course)
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `courses` - Slice of Course structs from the Banner API
|
||||
/// * `db_pool` - PostgreSQL connection pool
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Ok(())` on success
|
||||
/// * `Err(_)` if the database operation fails
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use banner::data::batch::batch_upsert_courses;
|
||||
/// use banner::banner::Course;
|
||||
/// use sqlx::PgPool;
|
||||
///
|
||||
/// async fn example(courses: &[Course], pool: &PgPool) -> anyhow::Result<()> {
|
||||
/// batch_upsert_courses(courses, pool).await?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> {
|
||||
// Early return for empty batches
|
||||
if courses.is_empty() {
|
||||
info!("No courses to upsert, skipping batch operation");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let course_count = courses.len();
|
||||
|
||||
// Extract course fields into vectors for UNNEST
|
||||
let crns: Vec<&str> = courses
|
||||
.iter()
|
||||
.map(|c| c.course_reference_number.as_str())
|
||||
.collect();
|
||||
|
||||
let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect();
|
||||
|
||||
let course_numbers: Vec<&str> = courses.iter().map(|c| c.course_number.as_str()).collect();
|
||||
|
||||
let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect();
|
||||
|
||||
let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect();
|
||||
|
||||
let enrollments: Vec<i32> = courses.iter().map(|c| c.enrollment).collect();
|
||||
|
||||
let max_enrollments: Vec<i32> = courses.iter().map(|c| c.maximum_enrollment).collect();
|
||||
|
||||
let wait_counts: Vec<i32> = courses.iter().map(|c| c.wait_count).collect();
|
||||
|
||||
let wait_capacities: Vec<i32> = courses.iter().map(|c| c.wait_capacity).collect();
|
||||
|
||||
// Perform batch upsert using UNNEST for efficient bulk insertion
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
INSERT INTO courses (
|
||||
crn, subject, course_number, title, term_code,
|
||||
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
|
||||
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
|
||||
array_fill(NOW()::timestamptz, ARRAY[$10])
|
||||
) AS t(
|
||||
crn, subject, course_number, title, term_code,
|
||||
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
|
||||
)
|
||||
ON CONFLICT (crn, term_code)
|
||||
DO UPDATE SET
|
||||
subject = EXCLUDED.subject,
|
||||
course_number = EXCLUDED.course_number,
|
||||
title = EXCLUDED.title,
|
||||
enrollment = EXCLUDED.enrollment,
|
||||
max_enrollment = EXCLUDED.max_enrollment,
|
||||
wait_count = EXCLUDED.wait_count,
|
||||
wait_capacity = EXCLUDED.wait_capacity,
|
||||
last_scraped_at = EXCLUDED.last_scraped_at
|
||||
"#,
|
||||
)
|
||||
.bind(&crns)
|
||||
.bind(&subjects)
|
||||
.bind(&course_numbers)
|
||||
.bind(&titles)
|
||||
.bind(&term_codes)
|
||||
.bind(&enrollments)
|
||||
.bind(&max_enrollments)
|
||||
.bind(&wait_counts)
|
||||
.bind(&wait_capacities)
|
||||
.bind(course_count as i32)
|
||||
.execute(db_pool)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?;
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
info!(
|
||||
courses_count = course_count,
|
||||
rows_affected = result.rows_affected(),
|
||||
duration_ms = duration.as_millis(),
|
||||
"Batch upserted courses"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_empty_batch_returns_ok() {
|
||||
// This is a basic compile-time test
|
||||
// Runtime tests would require sqlx::test macro and a test database
|
||||
let courses: Vec<Course> = vec![];
|
||||
assert_eq!(courses.len(), 0);
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
//! Database models and schema.
|
||||
|
||||
pub mod batch;
|
||||
pub mod models;
|
||||
|
||||
@@ -72,4 +72,8 @@ pub struct ScrapeJob {
|
||||
pub execute_at: DateTime<Utc>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub locked_at: Option<DateTime<Utc>>,
|
||||
/// Number of retry attempts for this job (non-negative, enforced by CHECK constraint)
|
||||
pub retry_count: i32,
|
||||
/// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint)
|
||||
pub max_retries: i32,
|
||||
}
|
||||
|
||||
@@ -11,4 +11,5 @@ pub mod scraper;
|
||||
pub mod services;
|
||||
pub mod signals;
|
||||
pub mod state;
|
||||
pub mod utils;
|
||||
pub mod web;
|
||||
|
||||
@@ -7,10 +7,13 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
||||
/// Configure and initialize logging for the application
|
||||
pub fn setup_logging(config: &Config, tracing_format: TracingFormat) {
|
||||
// Configure logging based on config
|
||||
// Note: Even when base_level is trace or debug, we suppress trace logs from noisy
|
||||
// infrastructure modules to keep output readable. These modules use debug for important
|
||||
// events and trace only for very detailed debugging.
|
||||
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||
let base_level = &config.log_level;
|
||||
EnvFilter::new(format!(
|
||||
"warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn",
|
||||
"warn,banner={},banner::rate_limiter=warn,banner::session=debug,banner::rate_limit_middleware=warn,banner::middleware=debug",
|
||||
base_level
|
||||
))
|
||||
});
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use super::Job;
|
||||
use crate::banner::{BannerApi, Course, SearchQuery, Term};
|
||||
use crate::banner::{BannerApi, SearchQuery, Term};
|
||||
use crate::data::batch::batch_upsert_courses;
|
||||
use crate::data::models::TargetType;
|
||||
use crate::error::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
use tracing::{debug, info, trace};
|
||||
use tracing::{debug, info};
|
||||
|
||||
/// Job implementation for scraping subject data
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -24,9 +25,9 @@ impl Job for SubjectJob {
|
||||
TargetType::Subject
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))]
|
||||
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> {
|
||||
let subject_code = &self.subject;
|
||||
debug!(subject = subject_code, "Processing subject job");
|
||||
|
||||
// Get the current term
|
||||
let term = Term::get_current().inner().to_string();
|
||||
@@ -42,9 +43,7 @@ impl Job for SubjectJob {
|
||||
count = courses_from_api.len(),
|
||||
"Found courses"
|
||||
);
|
||||
for course in courses_from_api {
|
||||
self.upsert_course(&course, db_pool).await?;
|
||||
}
|
||||
batch_upsert_courses(&courses_from_api, db_pool).await?;
|
||||
}
|
||||
|
||||
debug!(subject = subject_code, "Subject job completed");
|
||||
@@ -55,39 +54,3 @@ impl Job for SubjectJob {
|
||||
format!("Scrape subject: {}", self.subject)
|
||||
}
|
||||
}
|
||||
|
||||
impl SubjectJob {
|
||||
async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (crn, term_code) DO UPDATE SET
|
||||
subject = EXCLUDED.subject,
|
||||
course_number = EXCLUDED.course_number,
|
||||
title = EXCLUDED.title,
|
||||
enrollment = EXCLUDED.enrollment,
|
||||
max_enrollment = EXCLUDED.max_enrollment,
|
||||
wait_count = EXCLUDED.wait_count,
|
||||
wait_capacity = EXCLUDED.wait_capacity,
|
||||
last_scraped_at = EXCLUDED.last_scraped_at
|
||||
"#,
|
||||
)
|
||||
.bind(&course.course_reference_number)
|
||||
.bind(&course.subject)
|
||||
.bind(&course.course_number)
|
||||
.bind(&course.course_title)
|
||||
.bind(&course.term)
|
||||
.bind(course.enrollment)
|
||||
.bind(course.maximum_enrollment)
|
||||
.bind(course.wait_count)
|
||||
.bind(course.wait_capacity)
|
||||
.bind(chrono::Utc::now())
|
||||
.execute(db_pool)
|
||||
.await
|
||||
.map(|result| {
|
||||
trace!(subject = course.subject, crn = course.course_reference_number, result = ?result, "Course upserted");
|
||||
})
|
||||
.map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,15 @@ pub mod scheduler;
|
||||
pub mod worker;
|
||||
|
||||
use crate::banner::BannerApi;
|
||||
use crate::services::Service;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::info;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use self::scheduler::Scheduler;
|
||||
use self::worker::Worker;
|
||||
use crate::services::Service;
|
||||
|
||||
/// The main service that will be managed by the application's `ServiceManager`.
|
||||
///
|
||||
@@ -21,6 +22,7 @@ pub struct ScraperService {
|
||||
banner_api: Arc<BannerApi>,
|
||||
scheduler_handle: Option<JoinHandle<()>>,
|
||||
worker_handles: Vec<JoinHandle<()>>,
|
||||
shutdown_tx: Option<broadcast::Sender<()>>,
|
||||
}
|
||||
|
||||
impl ScraperService {
|
||||
@@ -31,6 +33,7 @@ impl ScraperService {
|
||||
banner_api,
|
||||
scheduler_handle: None,
|
||||
worker_handles: Vec::new(),
|
||||
shutdown_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,9 +41,14 @@ impl ScraperService {
|
||||
pub fn start(&mut self) {
|
||||
info!("ScraperService starting");
|
||||
|
||||
// Create shutdown channel
|
||||
let (shutdown_tx, _) = broadcast::channel(1);
|
||||
self.shutdown_tx = Some(shutdown_tx.clone());
|
||||
|
||||
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
|
||||
let shutdown_rx = shutdown_tx.subscribe();
|
||||
let scheduler_handle = tokio::spawn(async move {
|
||||
scheduler.run().await;
|
||||
scheduler.run(shutdown_rx).await;
|
||||
});
|
||||
self.scheduler_handle = Some(scheduler_handle);
|
||||
info!("Scheduler task spawned");
|
||||
@@ -48,8 +56,9 @@ impl ScraperService {
|
||||
let worker_count = 4; // This could be configurable
|
||||
for i in 0..worker_count {
|
||||
let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone());
|
||||
let shutdown_rx = shutdown_tx.subscribe();
|
||||
let worker_handle = tokio::spawn(async move {
|
||||
worker.run().await;
|
||||
worker.run(shutdown_rx).await;
|
||||
});
|
||||
self.worker_handles.push(worker_handle);
|
||||
}
|
||||
@@ -58,18 +67,6 @@ impl ScraperService {
|
||||
"Spawned worker tasks"
|
||||
);
|
||||
}
|
||||
|
||||
/// Signals all child tasks to gracefully shut down.
|
||||
pub async fn shutdown(&mut self) {
|
||||
info!("Shutting down scraper service");
|
||||
if let Some(handle) = self.scheduler_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
for handle in self.worker_handles.drain(..) {
|
||||
handle.abort();
|
||||
}
|
||||
info!("Scraper service shutdown");
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -85,7 +82,35 @@ impl Service for ScraperService {
|
||||
}
|
||||
|
||||
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
|
||||
self.shutdown().await;
|
||||
info!("Shutting down scraper service");
|
||||
|
||||
// Send shutdown signal to all tasks
|
||||
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||
let _ = shutdown_tx.send(());
|
||||
} else {
|
||||
warn!("No shutdown channel found for scraper service");
|
||||
return Err(anyhow::anyhow!("No shutdown channel available"));
|
||||
}
|
||||
|
||||
// Collect all handles
|
||||
let mut all_handles = Vec::new();
|
||||
if let Some(handle) = self.scheduler_handle.take() {
|
||||
all_handles.push(handle);
|
||||
}
|
||||
all_handles.append(&mut self.worker_handles);
|
||||
|
||||
// Wait for all tasks to complete (no internal timeout - let ServiceManager handle it)
|
||||
let results = futures::future::join_all(all_handles).await;
|
||||
let failed = results.iter().filter(|r| r.is_err()).count();
|
||||
if failed > 0 {
|
||||
warn!(
|
||||
failed_count = failed,
|
||||
"Some scraper tasks panicked during shutdown"
|
||||
);
|
||||
return Err(anyhow::anyhow!("{} task(s) panicked", failed));
|
||||
}
|
||||
|
||||
info!("All scraper tasks shutdown gracefully");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,10 @@ use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Periodically analyzes data and enqueues prioritized scrape jobs.
|
||||
pub struct Scheduler {
|
||||
@@ -23,31 +25,92 @@ impl Scheduler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the scheduler's main loop.
|
||||
pub async fn run(&self) {
|
||||
/// Runs the scheduler's main loop with graceful shutdown support.
|
||||
///
|
||||
/// The scheduler wakes up every 60 seconds to analyze data and enqueue jobs.
|
||||
/// When a shutdown signal is received:
|
||||
/// 1. Any in-progress scheduling work is gracefully cancelled via CancellationToken
|
||||
/// 2. The scheduler waits up to 5 seconds for work to complete
|
||||
/// 3. If timeout occurs, the task is abandoned (it will be aborted when dropped)
|
||||
///
|
||||
/// This ensures that shutdown is responsive even if scheduling work is blocked.
|
||||
pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) {
|
||||
info!("Scheduler service started");
|
||||
let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute
|
||||
|
||||
let work_interval = Duration::from_secs(60);
|
||||
let mut next_run = time::Instant::now();
|
||||
let mut current_work: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = None;
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
// Scheduler analyzing data...
|
||||
if let Err(e) = self.schedule_jobs().await {
|
||||
error!(error = ?e, "Failed to schedule jobs");
|
||||
tokio::select! {
|
||||
_ = time::sleep_until(next_run) => {
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
||||
// Spawn work in separate task to allow graceful cancellation during shutdown.
|
||||
// Without this, shutdown would have to wait for the full scheduling cycle.
|
||||
let work_handle = tokio::spawn({
|
||||
let db_pool = self.db_pool.clone();
|
||||
let banner_api = self.banner_api.clone();
|
||||
let cancel_token = cancel_token.clone();
|
||||
|
||||
async move {
|
||||
tokio::select! {
|
||||
result = Self::schedule_jobs_impl(&db_pool, &banner_api) => {
|
||||
if let Err(e) = result {
|
||||
error!(error = ?e, "Failed to schedule jobs");
|
||||
}
|
||||
}
|
||||
_ = cancel_token.cancelled() => {
|
||||
debug!("Scheduling work cancelled gracefully");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
current_work = Some((work_handle, cancel_token));
|
||||
next_run = time::Instant::now() + work_interval;
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Scheduler received shutdown signal");
|
||||
|
||||
if let Some((handle, cancel_token)) = current_work.take() {
|
||||
cancel_token.cancel();
|
||||
|
||||
// Wait briefly for graceful completion
|
||||
if tokio::time::timeout(Duration::from_secs(5), handle).await.is_err() {
|
||||
warn!("Scheduling work did not complete within 5s, abandoning");
|
||||
} else {
|
||||
debug!("Scheduling work completed gracefully");
|
||||
}
|
||||
}
|
||||
|
||||
info!("Scheduler exiting gracefully");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The core logic for deciding what jobs to create.
|
||||
async fn schedule_jobs(&self) -> Result<()> {
|
||||
/// Core scheduling logic that analyzes data and creates scrape jobs.
|
||||
///
|
||||
/// Strategy:
|
||||
/// 1. Fetch all subjects for the current term from Banner API
|
||||
/// 2. Query existing jobs in a single batch query
|
||||
/// 3. Create jobs only for subjects that don't have pending jobs
|
||||
///
|
||||
/// This is a static method (not &self) to allow it to be called from spawned tasks.
|
||||
#[tracing::instrument(skip_all, fields(term))]
|
||||
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
|
||||
// For now, we will implement a simple baseline scheduling strategy:
|
||||
// 1. Get a list of all subjects from the Banner API.
|
||||
// 2. Query existing jobs for all subjects in a single query.
|
||||
// 3. Create new jobs only for subjects that don't have existing jobs.
|
||||
let term = Term::get_current().inner().to_string();
|
||||
|
||||
tracing::Span::current().record("term", term.as_str());
|
||||
debug!(term = term, "Enqueuing subject jobs");
|
||||
|
||||
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
|
||||
let subjects = banner_api.get_subjects("", &term, 1, 500).await?;
|
||||
debug!(
|
||||
subject_count = subjects.len(),
|
||||
"Retrieved subjects from API"
|
||||
@@ -61,12 +124,12 @@ impl Scheduler {
|
||||
|
||||
// Query existing jobs for all subjects in a single query
|
||||
let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as(
|
||||
"SELECT target_payload FROM scrape_jobs
|
||||
"SELECT target_payload FROM scrape_jobs
|
||||
WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL",
|
||||
)
|
||||
.bind(TargetType::Subject)
|
||||
.bind(&subject_payloads)
|
||||
.fetch_all(&self.db_pool)
|
||||
.fetch_all(db_pool)
|
||||
.await?;
|
||||
|
||||
// Convert to a HashSet for efficient lookup
|
||||
@@ -76,6 +139,7 @@ impl Scheduler {
|
||||
.collect();
|
||||
|
||||
// Filter out subjects that already have jobs and prepare new jobs
|
||||
let mut skipped_count = 0;
|
||||
let new_jobs: Vec<_> = subjects
|
||||
.into_iter()
|
||||
.filter_map(|subject| {
|
||||
@@ -84,7 +148,7 @@ impl Scheduler {
|
||||
let payload_str = payload.to_string();
|
||||
|
||||
if existing_payloads.contains(&payload_str) {
|
||||
trace!(subject = subject.code, "Job already exists, skipping");
|
||||
skipped_count += 1;
|
||||
None
|
||||
} else {
|
||||
Some((payload, subject.code))
|
||||
@@ -92,10 +156,14 @@ impl Scheduler {
|
||||
})
|
||||
.collect();
|
||||
|
||||
if skipped_count > 0 {
|
||||
debug!(count = skipped_count, "Skipped subjects with existing jobs");
|
||||
}
|
||||
|
||||
// Insert all new jobs in a single batch
|
||||
if !new_jobs.is_empty() {
|
||||
let now = chrono::Utc::now();
|
||||
let mut tx = self.db_pool.begin().await?;
|
||||
let mut tx = db_pool.begin().await?;
|
||||
|
||||
for (payload, subject_code) in new_jobs {
|
||||
sqlx::query(
|
||||
|
||||
@@ -5,8 +5,9 @@ use crate::scraper::jobs::{JobError, JobType};
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{Instrument, debug, error, info, trace, warn};
|
||||
|
||||
/// A single worker instance.
|
||||
///
|
||||
@@ -28,79 +29,52 @@ impl Worker {
|
||||
}
|
||||
|
||||
/// Runs the worker's main loop.
|
||||
pub async fn run(&self) {
|
||||
info!(worker_id = self.id, "Worker started.");
|
||||
loop {
|
||||
match self.fetch_and_lock_job().await {
|
||||
Ok(Some(job)) => {
|
||||
let job_id = job.id;
|
||||
debug!(worker_id = self.id, job_id = job.id, "Processing job");
|
||||
match self.process_job(job).await {
|
||||
Ok(()) => {
|
||||
debug!(worker_id = self.id, job_id, "Job completed");
|
||||
// If successful, delete the job.
|
||||
if let Err(delete_err) = self.delete_job(job_id).await {
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
?delete_err,
|
||||
"Failed to delete job"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(JobError::Recoverable(e)) => {
|
||||
// Check if the error is due to an invalid session
|
||||
if let Some(BannerApiError::InvalidSession(_)) =
|
||||
e.downcast_ref::<BannerApiError>()
|
||||
{
|
||||
warn!(
|
||||
worker_id = self.id,
|
||||
job_id, "Invalid session detected. Forcing session refresh."
|
||||
);
|
||||
} else {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
|
||||
}
|
||||
pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) {
|
||||
info!(worker_id = self.id, "Worker started");
|
||||
|
||||
// Unlock the job so it can be retried
|
||||
if let Err(unlock_err) = self.unlock_job(job_id).await {
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
?unlock_err,
|
||||
"Failed to unlock job"
|
||||
);
|
||||
}
|
||||
loop {
|
||||
// Fetch and lock a job, racing against shutdown signal
|
||||
let job = tokio::select! {
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!(worker_id = self.id, "Worker received shutdown signal, exiting gracefully");
|
||||
break;
|
||||
}
|
||||
result = self.fetch_and_lock_job() => {
|
||||
match result {
|
||||
Ok(Some(job)) => job,
|
||||
Ok(None) => {
|
||||
trace!(worker_id = self.id, "No jobs available, waiting");
|
||||
time::sleep(Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
Err(JobError::Unrecoverable(e)) => {
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
error = ?e,
|
||||
"Job corrupted, deleting"
|
||||
);
|
||||
// Parse errors are unrecoverable - delete the job
|
||||
if let Err(delete_err) = self.delete_job(job_id).await {
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
?delete_err,
|
||||
"Failed to delete corrupted job"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(worker_id = self.id, error = ?e, "Failed to fetch job, waiting");
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// No job found, wait for a bit before polling again.
|
||||
trace!(worker_id = self.id, "No jobs available, waiting");
|
||||
time::sleep(Duration::from_secs(5)).await;
|
||||
};
|
||||
|
||||
let job_id = job.id;
|
||||
let retry_count = job.retry_count;
|
||||
let max_retries = job.max_retries;
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Process the job, racing against shutdown signal
|
||||
let process_result = tokio::select! {
|
||||
_ = shutdown_rx.recv() => {
|
||||
self.handle_shutdown_during_processing(job_id).await;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(worker_id = self.id, error = ?e, "Failed to fetch job");
|
||||
// Wait before retrying to avoid spamming errors.
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
result = self.process_job(job) => result
|
||||
};
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
// Handle the job processing result
|
||||
self.handle_job_result(job_id, retry_count, max_retries, process_result, duration)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,20 +111,31 @@ impl Worker {
|
||||
// Get the job implementation
|
||||
let job_impl = job_type.boxed();
|
||||
|
||||
debug!(
|
||||
worker_id = self.id,
|
||||
// Create span with job context
|
||||
let span = tracing::debug_span!(
|
||||
"process_job",
|
||||
job_id = job.id,
|
||||
description = job_impl.description(),
|
||||
"Processing job"
|
||||
job_type = job_impl.description()
|
||||
);
|
||||
|
||||
// Process the job - API errors are recoverable
|
||||
job_impl
|
||||
.process(&self.banner_api, &self.db_pool)
|
||||
.await
|
||||
.map_err(JobError::Recoverable)?;
|
||||
async move {
|
||||
debug!(
|
||||
worker_id = self.id,
|
||||
job_id = job.id,
|
||||
description = job_impl.description(),
|
||||
"Processing job"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
// Process the job - API errors are recoverable
|
||||
job_impl
|
||||
.process(&self.banner_api, &self.db_pool)
|
||||
.await
|
||||
.map_err(JobError::Recoverable)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_job(&self, job_id: i32) -> Result<()> {
|
||||
@@ -166,7 +151,150 @@ impl Worker {
|
||||
.bind(job_id)
|
||||
.execute(&self.db_pool)
|
||||
.await?;
|
||||
info!(worker_id = self.id, job_id, "Job unlocked for retry");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result<bool> {
|
||||
let result = sqlx::query_scalar::<_, Option<i32>>(
|
||||
"UPDATE scrape_jobs
|
||||
SET locked_at = NULL, retry_count = retry_count + 1
|
||||
WHERE id = $1
|
||||
RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(max_retries)
|
||||
.fetch_one(&self.db_pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.is_some())
|
||||
}
|
||||
|
||||
/// Handle shutdown signal received during job processing
|
||||
async fn handle_shutdown_during_processing(&self, job_id: i32) {
|
||||
info!(
|
||||
worker_id = self.id,
|
||||
job_id, "Shutdown received during job processing"
|
||||
);
|
||||
|
||||
if let Err(e) = self.unlock_job(job_id).await {
|
||||
warn!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
error = ?e,
|
||||
"Failed to unlock job during shutdown"
|
||||
);
|
||||
} else {
|
||||
debug!(worker_id = self.id, job_id, "Job unlocked during shutdown");
|
||||
}
|
||||
|
||||
info!(worker_id = self.id, "Worker exiting gracefully");
|
||||
}
|
||||
|
||||
/// Handle the result of job processing
|
||||
async fn handle_job_result(
|
||||
&self,
|
||||
job_id: i32,
|
||||
retry_count: i32,
|
||||
max_retries: i32,
|
||||
result: Result<(), JobError>,
|
||||
duration: std::time::Duration,
|
||||
) {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
debug!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
"Job completed successfully"
|
||||
);
|
||||
if let Err(e) = self.delete_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
|
||||
}
|
||||
}
|
||||
Err(JobError::Recoverable(e)) => {
|
||||
self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration)
|
||||
.await;
|
||||
}
|
||||
Err(JobError::Unrecoverable(e)) => {
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
error = ?e,
|
||||
"Job corrupted, deleting"
|
||||
);
|
||||
if let Err(e) = self.delete_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle recoverable errors by logging appropriately and unlocking the job
|
||||
async fn handle_recoverable_error(
|
||||
&self,
|
||||
job_id: i32,
|
||||
retry_count: i32,
|
||||
max_retries: i32,
|
||||
e: anyhow::Error,
|
||||
duration: std::time::Duration,
|
||||
) {
|
||||
let next_attempt = retry_count.saturating_add(1);
|
||||
let remaining_retries = max_retries.saturating_sub(next_attempt);
|
||||
|
||||
// Log the error appropriately based on type
|
||||
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
|
||||
warn!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
retry_attempt = next_attempt,
|
||||
max_retries = max_retries,
|
||||
remaining_retries = remaining_retries,
|
||||
"Invalid session detected, will retry"
|
||||
);
|
||||
} else {
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
retry_attempt = next_attempt,
|
||||
max_retries = max_retries,
|
||||
remaining_retries = remaining_retries,
|
||||
error = ?e,
|
||||
"Failed to process job, will retry"
|
||||
);
|
||||
}
|
||||
|
||||
// Atomically unlock and increment retry count, checking if retry is allowed
|
||||
match self.unlock_and_increment_retry(job_id, max_retries).await {
|
||||
Ok(can_retry) if can_retry => {
|
||||
info!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
retry_attempt = next_attempt,
|
||||
remaining_retries = remaining_retries,
|
||||
"Job unlocked for retry"
|
||||
);
|
||||
}
|
||||
Ok(_) => {
|
||||
// Max retries exceeded (detected atomically)
|
||||
error!(
|
||||
worker_id = self.id,
|
||||
job_id,
|
||||
duration_ms = duration.as_millis(),
|
||||
retry_count = next_attempt,
|
||||
max_retries = max_retries,
|
||||
error = ?e,
|
||||
"Job failed permanently (max retries exceeded), deleting"
|
||||
);
|
||||
if let Err(e) = self.delete_job(job_id).await {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,12 +7,16 @@ use serenity::Client;
|
||||
use serenity::all::{ActivityData, ClientBuilder, GatewayIntents};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error, warn};
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Discord bot service implementation
|
||||
pub struct BotService {
|
||||
client: Client,
|
||||
shard_manager: Arc<serenity::gateway::ShardManager>,
|
||||
status_task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
|
||||
status_shutdown_tx: Option<broadcast::Sender<()>>,
|
||||
}
|
||||
|
||||
impl BotService {
|
||||
@@ -20,6 +24,8 @@ impl BotService {
|
||||
pub async fn create_client(
|
||||
config: &Config,
|
||||
app_state: AppState,
|
||||
status_task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
|
||||
status_shutdown_rx: broadcast::Receiver<()>,
|
||||
) -> Result<Client, anyhow::Error> {
|
||||
let intents = GatewayIntents::non_privileged();
|
||||
let bot_target_guild = config.bot_target_guild;
|
||||
@@ -74,6 +80,7 @@ impl BotService {
|
||||
})
|
||||
.setup(move |ctx, _ready, framework| {
|
||||
let app_state = app_state.clone();
|
||||
let status_task_handle = status_task_handle.clone();
|
||||
Box::pin(async move {
|
||||
poise::builtins::register_in_guild(
|
||||
ctx,
|
||||
@@ -83,8 +90,13 @@ impl BotService {
|
||||
.await?;
|
||||
poise::builtins::register_globally(ctx, &framework.options().commands).await?;
|
||||
|
||||
// Start status update task
|
||||
Self::start_status_update_task(ctx.clone(), app_state.clone()).await;
|
||||
// Start status update task with shutdown support
|
||||
let handle = Self::start_status_update_task(
|
||||
ctx.clone(),
|
||||
app_state.clone(),
|
||||
status_shutdown_rx,
|
||||
);
|
||||
*status_task_handle.lock().await = Some(handle);
|
||||
|
||||
Ok(Data { app_state })
|
||||
})
|
||||
@@ -96,8 +108,12 @@ impl BotService {
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Start the status update task for the Discord bot
|
||||
async fn start_status_update_task(ctx: serenity::client::Context, app_state: AppState) {
|
||||
/// Start the status update task for the Discord bot with graceful shutdown support
|
||||
fn start_status_update_task(
|
||||
ctx: serenity::client::Context,
|
||||
app_state: AppState,
|
||||
mut shutdown_rx: broadcast::Receiver<()>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let max_interval = Duration::from_secs(300); // 5 minutes
|
||||
let base_interval = Duration::from_secs(30);
|
||||
@@ -106,59 +122,72 @@ impl BotService {
|
||||
|
||||
// This runs once immediately on startup, then with adaptive intervals
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// Get the course count, update the activity if it has changed/hasn't been set this session
|
||||
let course_count = app_state.get_course_count().await.unwrap();
|
||||
if previous_course_count.is_none() || previous_course_count != Some(course_count) {
|
||||
ctx.set_activity(Some(ActivityData::playing(format!(
|
||||
"Querying {:} classes",
|
||||
course_count.to_formatted_string(&Locale::en)
|
||||
))));
|
||||
}
|
||||
|
||||
// Increase or reset the interval
|
||||
interval = tokio::time::interval(
|
||||
// Avoid logging the first 'change'
|
||||
if course_count != previous_course_count.unwrap_or(0) {
|
||||
if previous_course_count.is_some() {
|
||||
debug!(
|
||||
new_course_count = course_count,
|
||||
last_interval = interval.period().as_secs(),
|
||||
"Course count changed, resetting interval"
|
||||
);
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
// Get the course count, update the activity if it has changed/hasn't been set this session
|
||||
let course_count = app_state.get_course_count().await.unwrap();
|
||||
if previous_course_count.is_none() || previous_course_count != Some(course_count) {
|
||||
ctx.set_activity(Some(ActivityData::playing(format!(
|
||||
"Querying {:} classes",
|
||||
course_count.to_formatted_string(&Locale::en)
|
||||
))));
|
||||
}
|
||||
|
||||
// Record the new course count
|
||||
previous_course_count = Some(course_count);
|
||||
// Increase or reset the interval
|
||||
interval = tokio::time::interval(
|
||||
// Avoid logging the first 'change'
|
||||
if course_count != previous_course_count.unwrap_or(0) {
|
||||
if previous_course_count.is_some() {
|
||||
debug!(
|
||||
new_course_count = course_count,
|
||||
last_interval = interval.period().as_secs(),
|
||||
"Course count changed, resetting interval"
|
||||
);
|
||||
}
|
||||
|
||||
// Reset to base interval
|
||||
base_interval
|
||||
} else {
|
||||
// Increase interval by 10% (up to maximum)
|
||||
let new_interval = interval.period().mul_f32(1.1).min(max_interval);
|
||||
debug!(
|
||||
current_course_count = course_count,
|
||||
last_interval = interval.period().as_secs(),
|
||||
new_interval = new_interval.as_secs(),
|
||||
"Course count unchanged, increasing interval"
|
||||
// Record the new course count
|
||||
previous_course_count = Some(course_count);
|
||||
|
||||
// Reset to base interval
|
||||
base_interval
|
||||
} else {
|
||||
// Increase interval by 10% (up to maximum)
|
||||
let new_interval = interval.period().mul_f32(1.1).min(max_interval);
|
||||
debug!(
|
||||
current_course_count = course_count,
|
||||
last_interval = interval.period().as_secs(),
|
||||
new_interval = new_interval.as_secs(),
|
||||
"Course count unchanged, increasing interval"
|
||||
);
|
||||
|
||||
new_interval
|
||||
},
|
||||
);
|
||||
|
||||
new_interval
|
||||
},
|
||||
);
|
||||
|
||||
// Reset the interval, otherwise it will tick again immediately
|
||||
interval.reset();
|
||||
// Reset the interval, otherwise it will tick again immediately
|
||||
interval.reset();
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Status update task received shutdown signal");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new(client: Client) -> Self {
|
||||
pub fn new(
|
||||
client: Client,
|
||||
status_task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
|
||||
status_shutdown_tx: broadcast::Sender<()>,
|
||||
) -> Self {
|
||||
let shard_manager = client.shard_manager.clone();
|
||||
|
||||
Self {
|
||||
client,
|
||||
shard_manager,
|
||||
status_task_handle,
|
||||
status_shutdown_tx: Some(status_shutdown_tx),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -183,6 +212,28 @@ impl Service for BotService {
|
||||
}
|
||||
|
||||
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
|
||||
// Signal status update task to stop
|
||||
if let Some(status_shutdown_tx) = self.status_shutdown_tx.take() {
|
||||
let _ = status_shutdown_tx.send(());
|
||||
}
|
||||
|
||||
// Wait for status update task to complete (with timeout)
|
||||
let handle = self.status_task_handle.lock().await.take();
|
||||
if let Some(handle) = handle {
|
||||
match tokio::time::timeout(Duration::from_secs(2), handle).await {
|
||||
Ok(Ok(())) => {
|
||||
debug!("Status update task completed gracefully");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!(error = ?e, "Status update task panicked");
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Status update task did not complete within 2s timeout");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown Discord shards
|
||||
self.shard_manager.shutdown_all().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
use crate::services::{Service, ServiceResult, run_service};
|
||||
|
||||
/// Manages multiple services and their lifecycle
|
||||
pub struct ServiceManager {
|
||||
registered_services: HashMap<String, Box<dyn Service>>,
|
||||
running_services: HashMap<String, JoinHandle<ServiceResult>>,
|
||||
service_handles: HashMap<String, tokio::task::AbortHandle>,
|
||||
completion_rx: Option<mpsc::UnboundedReceiver<(String, ServiceResult)>>,
|
||||
completion_tx: mpsc::UnboundedSender<(String, ServiceResult)>,
|
||||
shutdown_tx: broadcast::Sender<()>,
|
||||
}
|
||||
|
||||
@@ -22,9 +23,13 @@ impl Default for ServiceManager {
|
||||
impl ServiceManager {
|
||||
pub fn new() -> Self {
|
||||
let (shutdown_tx, _) = broadcast::channel(1);
|
||||
let (completion_tx, completion_rx) = mpsc::unbounded_channel();
|
||||
|
||||
Self {
|
||||
registered_services: HashMap::new(),
|
||||
running_services: HashMap::new(),
|
||||
service_handles: HashMap::new(),
|
||||
completion_rx: Some(completion_rx),
|
||||
completion_tx,
|
||||
shutdown_tx,
|
||||
}
|
||||
}
|
||||
@@ -46,9 +51,20 @@ impl ServiceManager {
|
||||
|
||||
for (name, service) in self.registered_services.drain() {
|
||||
let shutdown_rx = self.shutdown_tx.subscribe();
|
||||
let handle = tokio::spawn(run_service(service, shutdown_rx));
|
||||
let completion_tx = self.completion_tx.clone();
|
||||
let name_clone = name.clone();
|
||||
|
||||
// Spawn service task
|
||||
let handle = tokio::spawn(async move {
|
||||
let result = run_service(service, shutdown_rx).await;
|
||||
// Send completion notification
|
||||
let _ = completion_tx.send((name_clone, result));
|
||||
});
|
||||
|
||||
// Store abort handle for shutdown control
|
||||
self.service_handles
|
||||
.insert(name.clone(), handle.abort_handle());
|
||||
debug!(service = name, id = ?handle.id(), "service spawned");
|
||||
self.running_services.insert(name, handle);
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -62,7 +78,7 @@ impl ServiceManager {
|
||||
/// Run all services until one completes or fails
|
||||
/// Returns the first service that completes and its result
|
||||
pub async fn run(&mut self) -> (String, ServiceResult) {
|
||||
if self.running_services.is_empty() {
|
||||
if self.service_handles.is_empty() {
|
||||
return (
|
||||
"none".to_string(),
|
||||
ServiceResult::Error(anyhow::anyhow!("No services to run")),
|
||||
@@ -71,99 +87,134 @@ impl ServiceManager {
|
||||
|
||||
info!(
|
||||
"servicemanager running {} services",
|
||||
self.running_services.len()
|
||||
self.service_handles.len()
|
||||
);
|
||||
|
||||
// Wait for any service to complete
|
||||
loop {
|
||||
let mut completed_services = Vec::new();
|
||||
// Wait for any service to complete via the channel
|
||||
let completion_rx = self
|
||||
.completion_rx
|
||||
.as_mut()
|
||||
.expect("completion_rx should be available");
|
||||
|
||||
for (name, handle) in &mut self.running_services {
|
||||
if handle.is_finished() {
|
||||
completed_services.push(name.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(completed_name) = completed_services.first() {
|
||||
let handle = self.running_services.remove(completed_name).unwrap();
|
||||
match handle.await {
|
||||
Ok(result) => {
|
||||
return (completed_name.clone(), result);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(service = completed_name, "service task panicked: {e}");
|
||||
return (
|
||||
completed_name.clone(),
|
||||
ServiceResult::Error(anyhow::anyhow!("Task panic: {e}")),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Small delay to prevent busy-waiting
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
completion_rx
|
||||
.recv()
|
||||
.await
|
||||
.map(|(name, result)| {
|
||||
self.service_handles.remove(&name);
|
||||
(name, result)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
(
|
||||
"channel_closed".to_string(),
|
||||
ServiceResult::Error(anyhow::anyhow!("Completion channel closed")),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Shutdown all services gracefully with a timeout.
|
||||
///
|
||||
/// If any service fails to shutdown, it will return an error containing the names of the services that failed to shutdown.
|
||||
/// If all services shutdown successfully, the function will return the duration elapsed.
|
||||
/// All services receive the shutdown signal simultaneously and shut down in parallel.
|
||||
/// Each service gets the full timeout duration (they don't share/consume from a budget).
|
||||
/// If any service fails to shutdown within the timeout, it will be aborted.
|
||||
///
|
||||
/// Returns the elapsed time if all succeed, or a list of failed service names.
|
||||
pub async fn shutdown(&mut self, timeout: Duration) -> Result<Duration, Vec<String>> {
|
||||
let service_count = self.running_services.len();
|
||||
let service_names: Vec<_> = self.running_services.keys().cloned().collect();
|
||||
let service_count = self.service_handles.len();
|
||||
let service_names: Vec<_> = self.service_handles.keys().cloned().collect();
|
||||
|
||||
info!(
|
||||
service_count,
|
||||
services = ?service_names,
|
||||
timeout = format!("{:.2?}", timeout),
|
||||
"shutting down {} services with {:?} timeout",
|
||||
"shutting down {} services in parallel with {:?} timeout each",
|
||||
service_count,
|
||||
timeout
|
||||
);
|
||||
|
||||
// Send shutdown signal to all services
|
||||
if service_count == 0 {
|
||||
return Ok(Duration::ZERO);
|
||||
}
|
||||
|
||||
// Send shutdown signal to all services simultaneously
|
||||
let _ = self.shutdown_tx.send(());
|
||||
|
||||
// Wait for all services to complete
|
||||
let start_time = std::time::Instant::now();
|
||||
let mut pending_services = Vec::new();
|
||||
|
||||
for (name, handle) in self.running_services.drain() {
|
||||
match tokio::time::timeout(timeout, handle).await {
|
||||
Ok(Ok(_)) => {
|
||||
trace!(service = name, "service shutdown completed");
|
||||
// Collect results from all services with timeout
|
||||
let completion_rx = self
|
||||
.completion_rx
|
||||
.as_mut()
|
||||
.expect("completion_rx should be available");
|
||||
|
||||
// Collect all completion results with a single timeout
|
||||
let collect_future = async {
|
||||
let mut collected: Vec<Option<(String, ServiceResult)>> = Vec::new();
|
||||
for _ in 0..service_count {
|
||||
if let Some(result) = completion_rx.recv().await {
|
||||
collected.push(Some(result));
|
||||
} else {
|
||||
collected.push(None);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!(service = name, error = ?e, "service shutdown failed");
|
||||
pending_services.push(name);
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(service = name, "service shutdown timed out");
|
||||
pending_services.push(name);
|
||||
}
|
||||
collected
|
||||
};
|
||||
|
||||
let results = match tokio::time::timeout(timeout, collect_future).await {
|
||||
Ok(results) => results,
|
||||
Err(_) => {
|
||||
// Timeout exceeded - abort all remaining services
|
||||
warn!(
|
||||
timeout = format!("{:.2?}", timeout),
|
||||
"shutdown timeout exceeded - aborting all remaining services"
|
||||
);
|
||||
|
||||
let failed: Vec<String> = self.service_handles.keys().cloned().collect();
|
||||
for handle in self.service_handles.values() {
|
||||
handle.abort();
|
||||
}
|
||||
self.service_handles.clear();
|
||||
|
||||
return Err(failed);
|
||||
}
|
||||
};
|
||||
|
||||
// Process results and identify failures
|
||||
let mut failed_services = Vec::new();
|
||||
for (name, service_result) in results.into_iter().flatten() {
|
||||
self.service_handles.remove(&name);
|
||||
|
||||
if matches!(service_result, ServiceResult::GracefulShutdown) {
|
||||
trace!(service = name, "service shutdown completed");
|
||||
} else {
|
||||
warn!(
|
||||
service = name,
|
||||
result = ?service_result,
|
||||
"service shutdown with non-graceful result"
|
||||
);
|
||||
failed_services.push(name);
|
||||
}
|
||||
}
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
if pending_services.is_empty() {
|
||||
|
||||
if failed_services.is_empty() {
|
||||
info!(
|
||||
service_count,
|
||||
elapsed = format!("{:.2?}", elapsed),
|
||||
"services shutdown completed: {}",
|
||||
"all services shutdown successfully: {}",
|
||||
service_names.join(", ")
|
||||
);
|
||||
Ok(elapsed)
|
||||
} else {
|
||||
warn!(
|
||||
pending_count = pending_services.len(),
|
||||
pending_services = ?pending_services,
|
||||
failed_count = failed_services.len(),
|
||||
failed_services = ?failed_services,
|
||||
elapsed = format!("{:.2?}", elapsed),
|
||||
"services shutdown completed with {} pending: {}",
|
||||
pending_services.len(),
|
||||
pending_services.join(", ")
|
||||
"{} service(s) failed to shutdown gracefully: {}",
|
||||
failed_services.len(),
|
||||
failed_services.join(", ")
|
||||
);
|
||||
Err(pending_services)
|
||||
Err(failed_services)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,11 @@ pub trait Service: Send + Sync {
|
||||
|
||||
/// Gracefully shutdown the service
|
||||
///
|
||||
/// An 'Ok' result does not mean the service has completed shutdown, it merely means that the service shutdown was initiated.
|
||||
/// Implementations should initiate shutdown and MAY wait for completion.
|
||||
/// Services are expected to respond to this call and begin cleanup promptly.
|
||||
/// When managed by ServiceManager, the configured timeout (default 8s) applies to
|
||||
/// ALL services combined, not per-service. Services should complete shutdown as
|
||||
/// quickly as possible to avoid timeout.
|
||||
async fn shutdown(&mut self) -> Result<(), anyhow::Error>;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::web::{BannerState, create_router};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::{info, warn, trace};
|
||||
use tracing::{info, trace, warn};
|
||||
|
||||
/// Web server service implementation
|
||||
pub struct WebService {
|
||||
@@ -33,16 +33,12 @@ impl Service for WebService {
|
||||
let app = create_router(self.banner_state.clone());
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
|
||||
info!(
|
||||
service = "web",
|
||||
link = format!("http://localhost:{}", addr.port()),
|
||||
"starting web server",
|
||||
);
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
info!(
|
||||
service = "web",
|
||||
address = %addr,
|
||||
link = format!("http://localhost:{}", addr.port()),
|
||||
"web server listening"
|
||||
);
|
||||
|
||||
@@ -61,13 +57,16 @@ impl Service for WebService {
|
||||
})
|
||||
.await?;
|
||||
|
||||
trace!(service = "web", "graceful shutdown completed");
|
||||
info!(service = "web", "web server stopped");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
|
||||
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||
let _ = shutdown_tx.send(());
|
||||
trace!(service = "web", "sent shutdown signal to axum");
|
||||
} else {
|
||||
warn!(
|
||||
service = "web",
|
||||
|
||||
1
src/utils/mod.rs
Normal file
1
src/utils/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod shutdown;
|
||||
32
src/utils/shutdown.rs
Normal file
32
src/utils/shutdown.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::warn;
|
||||
|
||||
/// Helper for joining multiple task handles with proper error handling.
|
||||
///
|
||||
/// This function waits for all tasks to complete and reports any that panicked.
|
||||
/// Returns an error if any task panicked, otherwise returns Ok.
|
||||
pub async fn join_tasks(handles: Vec<JoinHandle<()>>) -> Result<(), anyhow::Error> {
|
||||
let results = futures::future::join_all(handles).await;
|
||||
|
||||
let failed = results.iter().filter(|r| r.is_err()).count();
|
||||
if failed > 0 {
|
||||
warn!(failed_count = failed, "Some tasks panicked during shutdown");
|
||||
Err(anyhow::anyhow!("{} task(s) panicked", failed))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper for joining multiple task handles with a timeout.
|
||||
///
|
||||
/// Waits for all tasks to complete within the specified timeout.
|
||||
/// If timeout occurs, remaining tasks are aborted.
|
||||
pub async fn join_tasks_with_timeout(
|
||||
handles: Vec<JoinHandle<()>>,
|
||||
timeout: std::time::Duration,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
match tokio::time::timeout(timeout, join_tasks(handles)).await {
|
||||
Ok(result) => result,
|
||||
Err(_) => Err(anyhow::anyhow!("Task join timed out after {:?}", timeout)),
|
||||
}
|
||||
}
|
||||
39
tests/basic_test.rs
Normal file
39
tests/basic_test.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use banner::utils::shutdown::join_tasks;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_join_tasks_success() {
|
||||
// Create some tasks that complete successfully
|
||||
let handles: Vec<JoinHandle<()>> = vec![
|
||||
tokio::spawn(async { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await }),
|
||||
tokio::spawn(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await }),
|
||||
tokio::spawn(async { /* immediate completion */ }),
|
||||
];
|
||||
|
||||
// All tasks should complete successfully
|
||||
let result = join_tasks(handles).await;
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"Expected all tasks to complete successfully"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_join_tasks_with_panic() {
|
||||
// Create some tasks, including one that panics
|
||||
let handles: Vec<JoinHandle<()>> = vec![
|
||||
tokio::spawn(async { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await }),
|
||||
tokio::spawn(async { panic!("intentional test panic") }),
|
||||
tokio::spawn(async { /* immediate completion */ }),
|
||||
];
|
||||
|
||||
// Should return an error because one task panicked
|
||||
let result = join_tasks(handles).await;
|
||||
assert!(result.is_err(), "Expected an error when a task panics");
|
||||
|
||||
let error_msg = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
error_msg.contains("1 task(s) panicked"),
|
||||
"Error message should mention panicked tasks"
|
||||
);
|
||||
}
|
||||
30
web/biome.json
Normal file
30
web/biome.json
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"$schema": "https://biomejs.dev/schemas/1.9.4/schema.json",
|
||||
"vcs": {
|
||||
"enabled": true,
|
||||
"clientKind": "git",
|
||||
"useIgnoreFile": true
|
||||
},
|
||||
"files": {
|
||||
"ignoreUnknown": false,
|
||||
"ignore": ["dist/", "node_modules/", ".tanstack/"]
|
||||
},
|
||||
"formatter": {
|
||||
"enabled": true,
|
||||
"indentStyle": "space",
|
||||
"indentWidth": 2,
|
||||
"lineWidth": 100,
|
||||
"lineEnding": "lf"
|
||||
},
|
||||
"javascript": {
|
||||
"formatter": {
|
||||
"quoteStyle": "double",
|
||||
"trailingCommas": "es5",
|
||||
"semicolons": "always",
|
||||
"arrowParentheses": "always"
|
||||
}
|
||||
},
|
||||
"linter": {
|
||||
"enabled": false
|
||||
}
|
||||
}
|
||||
1297
web/bun.lock
Normal file
1297
web/bun.lock
Normal file
File diff suppressed because it is too large
Load Diff
60
web/eslint.config.js
Normal file
60
web/eslint.config.js
Normal file
@@ -0,0 +1,60 @@
|
||||
import js from "@eslint/js";
|
||||
import tseslint from "typescript-eslint";
|
||||
import react from "eslint-plugin-react";
|
||||
import reactHooks from "eslint-plugin-react-hooks";
|
||||
import reactRefresh from "eslint-plugin-react-refresh";
|
||||
|
||||
export default tseslint.config(
|
||||
// Ignore generated files and build outputs
|
||||
{
|
||||
ignores: ["dist", "node_modules", "src/routeTree.gen.ts", "*.config.js"],
|
||||
},
|
||||
// Base configs
|
||||
js.configs.recommended,
|
||||
...tseslint.configs.recommendedTypeChecked,
|
||||
// React plugin configuration
|
||||
{
|
||||
files: ["**/*.{ts,tsx}"],
|
||||
plugins: {
|
||||
react,
|
||||
"react-hooks": reactHooks,
|
||||
"react-refresh": reactRefresh,
|
||||
},
|
||||
languageOptions: {
|
||||
parserOptions: {
|
||||
project: true,
|
||||
tsconfigRootDir: import.meta.dirname,
|
||||
ecmaFeatures: {
|
||||
jsx: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
settings: {
|
||||
react: {
|
||||
version: "19.0",
|
||||
},
|
||||
},
|
||||
rules: {
|
||||
// React rules
|
||||
...react.configs.recommended.rules,
|
||||
...react.configs["jsx-runtime"].rules,
|
||||
...reactHooks.configs.recommended.rules,
|
||||
|
||||
// React Refresh
|
||||
"react-refresh/only-export-components": ["warn", { allowConstantExport: true }],
|
||||
|
||||
// TypeScript overrides
|
||||
"@typescript-eslint/no-unused-vars": [
|
||||
"error",
|
||||
{
|
||||
argsIgnorePattern: "^_",
|
||||
varsIgnorePattern: "^_",
|
||||
},
|
||||
],
|
||||
"@typescript-eslint/no-explicit-any": "warn",
|
||||
|
||||
// Disable prop-types since we're using TypeScript
|
||||
"react/prop-types": "off",
|
||||
},
|
||||
}
|
||||
);
|
||||
@@ -7,7 +7,11 @@
|
||||
"start": "vite --port 3000",
|
||||
"build": "vite build && tsc",
|
||||
"serve": "vite preview",
|
||||
"test": "vitest run"
|
||||
"test": "vitest run",
|
||||
"lint": "tsc && eslint . --ext .ts,.tsx",
|
||||
"typecheck": "tsc --noEmit",
|
||||
"format": "biome format --write .",
|
||||
"format:check": "biome format ."
|
||||
},
|
||||
"dependencies": {
|
||||
"@radix-ui/themes": "^3.2.1",
|
||||
@@ -23,14 +27,21 @@
|
||||
"recharts": "^3.2.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@biomejs/biome": "^1.9.4",
|
||||
"@eslint/js": "^9.39.0",
|
||||
"@testing-library/dom": "^10.4.0",
|
||||
"@testing-library/react": "^16.2.0",
|
||||
"@types/node": "^24.3.3",
|
||||
"@types/react": "^19.0.8",
|
||||
"@types/react-dom": "^19.0.3",
|
||||
"@vitejs/plugin-react": "^4.3.4",
|
||||
"eslint": "^9.39.0",
|
||||
"eslint-plugin-react": "^7.37.5",
|
||||
"eslint-plugin-react-hooks": "^7.0.1",
|
||||
"eslint-plugin-react-refresh": "^0.4.24",
|
||||
"jsdom": "^26.0.0",
|
||||
"typescript": "^5.7.2",
|
||||
"typescript-eslint": "^8.46.2",
|
||||
"vite": "^6.3.5",
|
||||
"vitest": "^3.0.5",
|
||||
"web-vitals": "^4.2.4"
|
||||
|
||||
4620
web/pnpm-lock.yaml
generated
4620
web/pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,6 @@
|
||||
.App {
|
||||
min-height: 100vh;
|
||||
font-family:
|
||||
-apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
|
||||
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
|
||||
"Cantarell", "Fira Sans", "Droid Sans", "Helvetica Neue", sans-serif;
|
||||
background-color: var(--color-background);
|
||||
color: var(--color-text);
|
||||
|
||||
@@ -38,12 +38,10 @@ export class BannerApiClient {
|
||||
const response = await fetch(`${this.baseUrl}${endpoint}`);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(
|
||||
`API request failed: ${response.status} ${response.statusText}`
|
||||
);
|
||||
throw new Error(`API request failed: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
return response.json();
|
||||
return (await response.json()) as T;
|
||||
}
|
||||
|
||||
async getHealth(): Promise<HealthResponse> {
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
const reportWebVitals = (onPerfEntry?: () => void) => {
|
||||
if (onPerfEntry && onPerfEntry instanceof Function) {
|
||||
import('web-vitals').then(({ onCLS, onINP, onFCP, onLCP, onTTFB }) => {
|
||||
onCLS(onPerfEntry)
|
||||
onINP(onPerfEntry)
|
||||
onFCP(onPerfEntry)
|
||||
onLCP(onPerfEntry)
|
||||
onTTFB(onPerfEntry)
|
||||
})
|
||||
void import("web-vitals").then(({ onCLS, onINP, onFCP, onLCP, onTTFB }) => {
|
||||
onCLS(onPerfEntry);
|
||||
onINP(onPerfEntry);
|
||||
onFCP(onPerfEntry);
|
||||
onLCP(onPerfEntry);
|
||||
onTTFB(onPerfEntry);
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export default reportWebVitals
|
||||
export default reportWebVitals;
|
||||
|
||||
@@ -8,52 +8,52 @@
|
||||
// You should NOT make any changes in this file as it will be overwritten.
|
||||
// Additionally, you should also exclude this file from your linter and/or formatter to prevent it from being checked or modified.
|
||||
|
||||
import { Route as rootRouteImport } from './routes/__root'
|
||||
import { Route as IndexRouteImport } from './routes/index'
|
||||
import { Route as rootRouteImport } from "./routes/__root";
|
||||
import { Route as IndexRouteImport } from "./routes/index";
|
||||
|
||||
const IndexRoute = IndexRouteImport.update({
|
||||
id: '/',
|
||||
path: '/',
|
||||
id: "/",
|
||||
path: "/",
|
||||
getParentRoute: () => rootRouteImport,
|
||||
} as any)
|
||||
} as any);
|
||||
|
||||
export interface FileRoutesByFullPath {
|
||||
'/': typeof IndexRoute
|
||||
"/": typeof IndexRoute;
|
||||
}
|
||||
export interface FileRoutesByTo {
|
||||
'/': typeof IndexRoute
|
||||
"/": typeof IndexRoute;
|
||||
}
|
||||
export interface FileRoutesById {
|
||||
__root__: typeof rootRouteImport
|
||||
'/': typeof IndexRoute
|
||||
__root__: typeof rootRouteImport;
|
||||
"/": typeof IndexRoute;
|
||||
}
|
||||
export interface FileRouteTypes {
|
||||
fileRoutesByFullPath: FileRoutesByFullPath
|
||||
fullPaths: '/'
|
||||
fileRoutesByTo: FileRoutesByTo
|
||||
to: '/'
|
||||
id: '__root__' | '/'
|
||||
fileRoutesById: FileRoutesById
|
||||
fileRoutesByFullPath: FileRoutesByFullPath;
|
||||
fullPaths: "/";
|
||||
fileRoutesByTo: FileRoutesByTo;
|
||||
to: "/";
|
||||
id: "__root__" | "/";
|
||||
fileRoutesById: FileRoutesById;
|
||||
}
|
||||
export interface RootRouteChildren {
|
||||
IndexRoute: typeof IndexRoute
|
||||
IndexRoute: typeof IndexRoute;
|
||||
}
|
||||
|
||||
declare module '@tanstack/react-router' {
|
||||
declare module "@tanstack/react-router" {
|
||||
interface FileRoutesByPath {
|
||||
'/': {
|
||||
id: '/'
|
||||
path: '/'
|
||||
fullPath: '/'
|
||||
preLoaderRoute: typeof IndexRouteImport
|
||||
parentRoute: typeof rootRouteImport
|
||||
}
|
||||
"/": {
|
||||
id: "/";
|
||||
path: "/";
|
||||
fullPath: "/";
|
||||
preLoaderRoute: typeof IndexRouteImport;
|
||||
parentRoute: typeof rootRouteImport;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const rootRouteChildren: RootRouteChildren = {
|
||||
IndexRoute: IndexRoute,
|
||||
}
|
||||
};
|
||||
export const routeTree = rootRouteImport
|
||||
._addFileChildren(rootRouteChildren)
|
||||
._addFileTypes<FileRouteTypes>()
|
||||
._addFileTypes<FileRouteTypes>();
|
||||
|
||||
@@ -101,13 +101,11 @@ const getOverallHealth = (state: StatusState): Status | "Unreachable" => {
|
||||
const getServices = (state: StatusState): Service[] => {
|
||||
if (state.mode !== "response") return [];
|
||||
|
||||
return Object.entries(state.status.services).map(
|
||||
([serviceId, serviceInfo]) => ({
|
||||
name: serviceInfo.name,
|
||||
status: serviceInfo.status,
|
||||
icon: SERVICE_ICONS[serviceId] || SERVICE_ICONS.default,
|
||||
})
|
||||
);
|
||||
return Object.entries(state.status.services).map(([serviceId, serviceInfo]) => ({
|
||||
name: serviceInfo.name,
|
||||
status: serviceInfo.status,
|
||||
icon: SERVICE_ICONS[serviceId] || SERVICE_ICONS.default,
|
||||
}));
|
||||
};
|
||||
|
||||
const StatusDisplay = ({ status }: { status: Status | "Unreachable" }) => {
|
||||
@@ -197,17 +195,11 @@ function App() {
|
||||
|
||||
// Create a timeout promise
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
setTimeout(
|
||||
() => reject(new Error("Request timeout")),
|
||||
REQUEST_TIMEOUT
|
||||
);
|
||||
setTimeout(() => reject(new Error("Request timeout")), REQUEST_TIMEOUT);
|
||||
});
|
||||
|
||||
// Race between the API call and timeout
|
||||
const statusData = await Promise.race([
|
||||
client.getStatus(),
|
||||
timeoutPromise,
|
||||
]);
|
||||
const statusData = await Promise.race([client.getStatus(), timeoutPromise]);
|
||||
|
||||
const endTime = Date.now();
|
||||
const responseTime = endTime - startTime;
|
||||
@@ -219,8 +211,7 @@ function App() {
|
||||
lastFetch: new Date(),
|
||||
});
|
||||
} catch (err) {
|
||||
const errorMessage =
|
||||
err instanceof Error ? err.message : "Failed to fetch data";
|
||||
const errorMessage = err instanceof Error ? err.message : "Failed to fetch data";
|
||||
|
||||
// Check if it's a timeout error
|
||||
if (errorMessage === "Request timeout") {
|
||||
@@ -237,11 +228,11 @@ function App() {
|
||||
}
|
||||
|
||||
// Schedule the next request after the current one completes
|
||||
timeoutId = setTimeout(fetchData, REFRESH_INTERVAL);
|
||||
timeoutId = setTimeout(() => void fetchData(), REFRESH_INTERVAL);
|
||||
};
|
||||
|
||||
// Start the first request immediately
|
||||
fetchData();
|
||||
void fetchData();
|
||||
|
||||
return () => {
|
||||
if (timeoutId) {
|
||||
@@ -302,12 +293,8 @@ function App() {
|
||||
<Flex direction="column" gap="3" style={{ marginTop: "16px" }}>
|
||||
{shouldShowSkeleton
|
||||
? // Show skeleton for 3 services during initial loading only
|
||||
Array.from({ length: 3 }).map((_, index) => (
|
||||
<SkeletonService key={index} />
|
||||
))
|
||||
: services.map((service) => (
|
||||
<ServiceStatus key={service.name} service={service} />
|
||||
))}
|
||||
Array.from({ length: 3 }).map((_, index) => <SkeletonService key={index} />)
|
||||
: services.map((service) => <ServiceStatus key={service.name} service={service} />)}
|
||||
</Flex>
|
||||
|
||||
<Flex direction="column" gap="2" style={BORDER_STYLES}>
|
||||
@@ -326,17 +313,11 @@ function App() {
|
||||
{shouldShowLastFetch ? (
|
||||
<TimingRow icon={Clock} name="Last Updated">
|
||||
{isLoading ? (
|
||||
<Text
|
||||
size="2"
|
||||
style={{ paddingBottom: "2px" }}
|
||||
color="gray"
|
||||
>
|
||||
<Text size="2" style={{ paddingBottom: "2px" }} color="gray">
|
||||
Loading...
|
||||
</Text>
|
||||
) : (
|
||||
<Tooltip
|
||||
content={`as of ${state.lastFetch.toLocaleTimeString()}`}
|
||||
>
|
||||
<Tooltip content={`as of ${state.lastFetch.toLocaleTimeString()}`}>
|
||||
<abbr
|
||||
style={{
|
||||
cursor: "pointer",
|
||||
@@ -363,12 +344,7 @@ function App() {
|
||||
</Flex>
|
||||
</Flex>
|
||||
</Card>
|
||||
<Flex
|
||||
justify="center"
|
||||
style={{ marginTop: "12px" }}
|
||||
gap="2"
|
||||
align="center"
|
||||
>
|
||||
<Flex justify="center" style={{ marginTop: "12px" }} gap="2" align="center">
|
||||
{__APP_VERSION__ && (
|
||||
<Text
|
||||
size="1"
|
||||
|
||||
@@ -2,14 +2,12 @@
|
||||
|
||||
body {
|
||||
margin: 0;
|
||||
font-family:
|
||||
-apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
|
||||
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
|
||||
"Cantarell", "Fira Sans", "Droid Sans", "Helvetica Neue", sans-serif;
|
||||
-webkit-font-smoothing: antialiased;
|
||||
-moz-osx-font-smoothing: grayscale;
|
||||
}
|
||||
|
||||
code {
|
||||
font-family:
|
||||
source-code-pro, Menlo, Monaco, Consolas, "Courier New", monospace;
|
||||
font-family: source-code-pro, Menlo, Monaco, Consolas, "Courier New", monospace;
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
"moduleResolution": "bundler",
|
||||
"allowImportingTsExtensions": true,
|
||||
"verbatimModuleSyntax": true,
|
||||
"isolatedModules": true,
|
||||
"noEmit": true,
|
||||
|
||||
/* Linting */
|
||||
@@ -22,7 +23,7 @@
|
||||
"noUncheckedSideEffectImports": true,
|
||||
"baseUrl": ".",
|
||||
"paths": {
|
||||
"@/*": ["./src/*"],
|
||||
"@/*": ["./src/*"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,10 +7,7 @@ import { readFileSync, existsSync } from "node:fs";
|
||||
// Extract version from Cargo.toml
|
||||
function getVersion() {
|
||||
const filename = "Cargo.toml";
|
||||
const paths = [
|
||||
resolve(__dirname, filename),
|
||||
resolve(__dirname, "..", filename),
|
||||
];
|
||||
const paths = [resolve(__dirname, filename), resolve(__dirname, "..", filename)];
|
||||
|
||||
for (const path of paths) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user