Compare commits

...

8 Commits

Author SHA1 Message Date
Ryan Walters
966732a6d2 feat: modernize build tooling and add CI/CD workflow
Switch to Bun for 2-5x faster frontend builds, implement cargo-chef for
reliable Rust dependency caching, and add Biome for fast code
formatting.

Build system improvements:
- Replace pnpm with Bun for frontend package management
- Add cargo-chef to Dockerfile for better Rust build layer caching
- Update all commands to use bun instead of pnpm

Developer experience:
- Add comprehensive Justfile commands (format, format-check, db)
- Implement automated PostgreSQL Docker setup with random port
allocation
- Add stricter checks (--deny warnings on clippy, --all-features flag)

Code quality:
- Add Biome formatter for 10-100x faster TypeScript/JavaScript
formatting
- Add GitHub Actions CI/CD workflow for automated checks
- Update .dockerignore with comprehensive exclusions
- Format all code with cargo fmt (Rust) and Biome (TypeScript)

All changes maintain backward compatibility and can be tested
incrementally.
2025-11-18 18:59:03 -06:00
Ryan Walters
3292d35521 build(docker): copy migrations directory to build context
Ensures database migration files are available during the Docker build process.
2025-11-03 12:07:27 -06:00
Ryan Walters
71ac0782d0 feat(json): enhance error context with debug mode detailed reporting
Improve JSON parsing error messages with build-specific behavior:
- Debug builds: Show full parent object context and type mismatch details
- Release builds: Keep minimal snippets to avoid log spam

Add comprehensive test coverage for error handling and path parsing.
2025-11-03 12:04:20 -06:00
Ryan Walters
1c6d2d4b6e perf: implement batch operations and optimize database indexes
Add batch upsert functionality to reduce database round-trips from N to 1 when inserting courses. Create comprehensive database indexes for common query patterns including term/subject lookups, time-series metrics, and job scheduling. Remove redundant indexes and add monitoring guidance for BRIN index effectiveness.
2025-11-03 11:18:42 -06:00
Ryan Walters
51f8256e61 feat: implement comprehensive retry mechanism and improve observability
Add retry tracking to scrape jobs with configurable max retries (default 5), implement
automatic database migrations on startup, and significantly reduce logging noise from
infrastructure layers. Enhanced tracing with structured spans for better debugging while
keeping output readable by suppressing verbose trace logs from rate limiters and session
management. Improved error handling with detailed retry context and proper session cookie
validation.
2025-11-03 10:18:07 -06:00
Ryan Walters
b1ed2434f8 feat: add ESLint configuration and testing infrastructure
Add comprehensive ESLint setup with React and TypeScript support, create basic integration tests for the shutdown utilities, and enhance the Justfile with a new check command that runs all validation steps (cargo check, clippy, tests, and linting).
2025-11-03 02:21:35 -06:00
Ryan Walters
47c23459f1 refactor: implement comprehensive graceful shutdown across all services
Implements graceful shutdown with broadcast channels and proper timeout handling
for scraper workers, scheduler, bot service, and status update tasks. Introduces
centralized shutdown utilities and improves service manager to handle parallel
shutdown with per-service timeouts instead of shared timeout budgets.

Key changes:
- Add utils module with shutdown helper functions
- Update ScraperService to return errors on shutdown failures
- Refactor scheduler with cancellable work tasks and 5s grace period
- Extract worker shutdown logic into helper methods for clarity
- Add broadcast channel shutdown support to BotService and status task
- Improve ServiceManager to shutdown services in parallel with individual timeouts
2025-11-03 02:10:01 -06:00
Ryan Walters
8af9b0a1a2 refactor(scraper): implement graceful shutdown with broadcast channels
Replace task abortion with broadcast-based graceful shutdown for scheduler and workers. Implement cancellation tokens for in-progress work with 5s timeout. Add tokio-util dependency for CancellationToken support. Update ServiceManager to use completion channels and abort handles for better service lifecycle control.
2025-11-03 01:22:12 -06:00
46 changed files with 3044 additions and 5153 deletions

View File

@@ -13,6 +13,16 @@ go/
# Development configuration
bacon.toml
.env
.env.*
!.env.example
# CI/CD
.github/
.git/
# Development tools
Justfile
rust-toolchain.toml
# Frontend build artifacts and cache
web/node_modules/
@@ -20,4 +30,22 @@ web/dist/
web/.vite/
web/.tanstack/
web/.vscode/
# IDE and editor files
.vscode/
.idea/
*.swp
*.swo
*~
# OS files
.DS_Store
Thumbs.db
# Test coverage
coverage/
*.profdata
*.profraw
# SQLx offline mode (include this in builds)
!.sqlx/

65
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,65 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]
env:
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy
- name: Setup Bun
uses: oven-sh/setup-bun@v1
with:
bun-version: latest
- name: Cache Rust dependencies
uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install frontend dependencies
working-directory: web
run: bun install --frozen-lockfile
- name: Check Rust formatting
run: cargo fmt --all -- --check
- name: Check TypeScript formatting
working-directory: web
run: bun run format:check
- name: TypeScript type check
working-directory: web
run: bun run typecheck
- name: ESLint
working-directory: web
run: bun run lint
- name: Clippy
run: cargo clippy --all-features -- --deny warnings
- name: Run tests
run: cargo test --all-features
- name: Build frontend
working-directory: web
run: bun run build
- name: Build backend
run: cargo build --release --bin banner

1
Cargo.lock generated
View File

@@ -254,6 +254,7 @@ dependencies = [
"time",
"tl",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",

View File

@@ -36,6 +36,7 @@ sqlx = { version = "0.8.6", features = [
thiserror = "2.0.16"
time = "0.3.43"
tokio = { version = "1.47.1", features = ["full"] }
tokio-util = "0.7"
tl = "0.7.8"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] }

View File

@@ -2,11 +2,8 @@
ARG RUST_VERSION=1.89.0
ARG RAILWAY_GIT_COMMIT_SHA
# Frontend Build Stage
FROM node:22-bookworm-slim AS frontend-builder
# Install pnpm
RUN npm install -g pnpm
# --- Frontend Build Stage ---
FROM oven/bun:1 AS frontend-builder
WORKDIR /app
@@ -14,63 +11,62 @@ WORKDIR /app
COPY ./Cargo.toml ./
# Copy frontend package files
COPY ./web/package.json ./web/pnpm-lock.yaml ./
COPY ./web/package.json ./web/bun.lock* ./
# Install dependencies
RUN pnpm install --frozen-lockfile
RUN bun install --frozen-lockfile
# Copy frontend source code
COPY ./web ./
# Build frontend
RUN pnpm run build
RUN bun run build
# Rust Build Stage
FROM rust:${RUST_VERSION}-bookworm AS builder
# --- Chef Base Stage ---
FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} AS chef
WORKDIR /app
# --- Planner Stage ---
FROM chef AS planner
COPY Cargo.toml Cargo.lock ./
COPY build.rs ./
COPY src ./src
# Migrations & .sqlx specifically left out to avoid invalidating cache
RUN cargo chef prepare --recipe-path recipe.json --bin banner
# --- Rust Build Stage ---
FROM chef AS builder
# Set build-time environment variable for Railway Git commit SHA
ARG RAILWAY_GIT_COMMIT_SHA
ENV RAILWAY_GIT_COMMIT_SHA=${RAILWAY_GIT_COMMIT_SHA}
# Install build dependencies
# Copy recipe from planner and build dependencies only
COPY --from=planner /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json --bin banner
# Install build dependencies for final compilation
RUN apt-get update && apt-get install -y \
pkg-config \
libssl-dev \
git \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /usr/src
RUN USER=root cargo new --bin banner
WORKDIR /usr/src/banner
# Copy dependency files for better layer caching
COPY ./Cargo.toml ./Cargo.lock* ./
# Copy .git directory for build.rs to access Git information (if available)
# This will copy .git (and .gitignore) if it exists, but won't fail if it doesn't
# While normally a COPY requires at least one file, .gitignore should still be available, so this wildcard should always work
COPY ./.git* ./
# Copy build.rs early so it can run during the first build
COPY ./build.rs ./
# Build empty app with downloaded dependencies to produce a stable image layer for next build
RUN cargo build --release
# Copy source code
RUN rm src/*.rs
COPY ./src ./src/
# Copy built frontend assets
# Copy source code and built frontend assets
COPY Cargo.toml Cargo.lock ./
COPY build.rs ./
COPY .git* ./
COPY src ./src
COPY migrations ./migrations
COPY --from=frontend-builder /app/dist ./web/dist
# Build web app with embedded assets
RUN rm ./target/release/deps/banner*
RUN cargo build --release
RUN cargo build --release --bin banner
# Strip the binary to reduce size
RUN strip target/release/banner
# Runtime Stage - Debian slim for glibc compatibility
# --- Runtime Stage ---
FROM debian:12-slim
ARG APP=/usr/src/app
@@ -94,7 +90,7 @@ RUN addgroup --gid $GID $APP_USER \
&& mkdir -p ${APP}
# Copy application binary
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/target/release/banner ${APP}/banner
COPY --from=builder --chown=$APP_USER:$APP_USER /app/target/release/banner ${APP}/banner
# Set proper permissions
RUN chmod +x ${APP}/banner
@@ -117,4 +113,4 @@ ENV HOSTS=0.0.0.0,[::]
# Implicitly uses PORT environment variable
# temporary: running without 'scraper' service
CMD ["sh", "-c", "exec ./banner --services web,bot"]
CMD ["sh", "-c", "exec ./banner --services web,bot"]

View File

@@ -1,12 +1,65 @@
default_services := "bot,web,scraper"
default:
just --list
# Run all checks (format, clippy, tests, lint)
check:
cargo fmt --all -- --check
cargo clippy --all-features -- --deny warnings
cargo nextest run
bun run --cwd web typecheck
bun run --cwd web lint
# Format all Rust and TypeScript code
format:
cargo fmt --all
bun run --cwd web format
# Check formatting without modifying (CI-friendly)
format-check:
cargo fmt --all -- --check
bun run --cwd web format:check
# Start PostgreSQL in Docker and update .env with connection string
db:
#!/usr/bin/env bash
set -euo pipefail
# Find available port
PORT=$(shuf -i 49152-65535 -n 1)
while ss -tlnp 2>/dev/null | grep -q ":$PORT "; do
PORT=$(shuf -i 49152-65535 -n 1)
done
# Start PostgreSQL container
docker run -d \
--name banner-postgres \
-e POSTGRES_PASSWORD=banner \
-e POSTGRES_USER=banner \
-e POSTGRES_DB=banner \
-p "$PORT:5432" \
postgres:17-alpine
# Update .env file
DB_URL="postgresql://banner:banner@localhost:$PORT/banner"
if [ -f .env ]; then
sed -i.bak "s|^DATABASE_URL=.*|DATABASE_URL=$DB_URL|" .env
else
echo "DATABASE_URL=$DB_URL" > .env
fi
echo "PostgreSQL started on port $PORT"
echo "DATABASE_URL=$DB_URL"
echo "Run: sqlx migrate run"
# Auto-reloading frontend server
frontend:
pnpm run -C web dev
bun run --cwd web dev
# Production build of frontend
build-frontend:
pnpm run -C web build
bun run --cwd web build
# Auto-reloading backend server
backend *ARGS:
@@ -14,15 +67,13 @@ backend *ARGS:
# Production build
build:
pnpm run -C web build
bun run --cwd web build
cargo build --release --bin banner
# Run auto-reloading development build with release characteristics (frontend is embedded, non-auto-reloading)
# This is useful for testing backend release-mode details.
# Run auto-reloading development build with release characteristics
dev-build *ARGS='--services web --tracing pretty': build-frontend
bacon --headless run -- --profile dev-release -- {{ARGS}}
# Auto-reloading development build for both frontend and backend
# Will not notice if either the frontend/backend crashes, but will generally be resistant to stopping on their own.
[parallel]
dev *ARGS='--services web,bot': frontend (backend ARGS)
dev *ARGS='--services web,bot': frontend (backend ARGS)

View File

@@ -26,7 +26,7 @@ The application consists of three modular services that can be run independently
## Quick Start
```bash
pnpm install -C web # Install frontend dependencies
bun install --cwd web # Install frontend dependencies
cargo build # Build the backend
just dev # Runs auto-reloading dev build

View File

@@ -0,0 +1,3 @@
-- Add retry tracking columns to scrape_jobs table
ALTER TABLE scrape_jobs ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0 CHECK (retry_count >= 0);
ALTER TABLE scrape_jobs ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 5 CHECK (max_retries >= 0);

View File

@@ -0,0 +1,45 @@
-- Performance optimization indexes
-- Index for term-based queries (most common access pattern)
CREATE INDEX IF NOT EXISTS idx_courses_term_code ON courses(term_code);
-- Index for subject-based filtering
CREATE INDEX IF NOT EXISTS idx_courses_subject ON courses(subject);
-- Composite index for subject + term queries
CREATE INDEX IF NOT EXISTS idx_courses_subject_term ON courses(subject, term_code);
-- Index for course number lookups
CREATE INDEX IF NOT EXISTS idx_courses_course_number ON courses(course_number);
-- Index for last scraped timestamp (useful for finding stale data)
CREATE INDEX IF NOT EXISTS idx_courses_last_scraped ON courses(last_scraped_at);
-- Index for course metrics time-series queries
-- BRIN index is optimal for time-series data
CREATE INDEX IF NOT EXISTS idx_course_metrics_timestamp ON course_metrics USING BRIN(timestamp);
-- B-tree index for specific course metric lookups
CREATE INDEX IF NOT EXISTS idx_course_metrics_course_timestamp
ON course_metrics(course_id, timestamp DESC);
-- Partial index for pending scrape jobs (only unlocked jobs)
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_pending
ON scrape_jobs(execute_at ASC)
WHERE locked_at IS NULL;
-- Index for high-priority job processing
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_priority_pending
ON scrape_jobs(priority DESC, execute_at ASC)
WHERE locked_at IS NULL;
-- Index for retry tracking
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_retry_count
ON scrape_jobs(retry_count)
WHERE retry_count > 0 AND locked_at IS NULL;
-- Analyze tables to update statistics
ANALYZE courses;
ANALYZE course_metrics;
ANALYZE course_audits;
ANALYZE scrape_jobs;

View File

@@ -0,0 +1,53 @@
-- Index Optimization Follow-up Migration
-- Reason: Redundant with composite index idx_courses_subject_term
DROP INDEX IF EXISTS idx_courses_subject;
-- Remove: idx_scrape_jobs_retry_count
DROP INDEX IF EXISTS idx_scrape_jobs_retry_count;
-- Purpose: Optimize the scheduler's frequent query (runs every 60 seconds)
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_scheduler_lookup
ON scrape_jobs(target_type, target_payload)
WHERE locked_at IS NULL;
-- Note: We use (target_type, target_payload) instead of including locked_at
-- in the index columns because:
-- 1. The WHERE clause filters locked_at IS NULL (partial index optimization)
-- 2. target_payload is JSONB and already large; keeping it as an indexed column
-- allows PostgreSQL to use index-only scans for the SELECT target_payload query
-- 3. This design minimizes index size while maximizing query performance
-- Purpose: Enable efficient audit trail queries by course
CREATE INDEX IF NOT EXISTS idx_course_audits_course_timestamp
ON course_audits(course_id, timestamp DESC);
-- Purpose: Enable queries like "Show all changes in the last 24 hours"
CREATE INDEX IF NOT EXISTS idx_course_audits_timestamp
ON course_audits(timestamp DESC);
-- The BRIN index on course_metrics(timestamp) assumes data is inserted in
-- chronological order. BRIN indexes are only effective when data is physically
-- ordered on disk. If you perform:
-- - Backfills of historical data
-- - Out-of-order inserts
-- - Frequent UPDATEs that move rows
--
-- Then the BRIN index effectiveness will degrade. Monitor with:
-- SELECT * FROM brin_page_items(get_raw_page('idx_course_metrics_timestamp', 1));
--
-- If you see poor selectivity, consider:
-- 1. REINDEX to rebuild after bulk loads
-- 2. Switch to B-tree if inserts are not time-ordered
-- 3. Use CLUSTER to physically reorder the table (requires downtime)
COMMENT ON INDEX idx_course_metrics_timestamp IS
'BRIN index - requires chronologically ordered inserts for efficiency. Monitor selectivity.';
-- Update statistics for query planner
ANALYZE courses;
ANALYZE course_metrics;
ANALYZE course_audits;
ANALYZE scrape_jobs;

View File

@@ -42,11 +42,7 @@ impl App {
// Check if the database URL is via private networking
let is_private = config.database_url.contains("railway.internal");
let slow_threshold = if is_private {
Duration::from_millis(200)
} else {
Duration::from_millis(500)
};
let slow_threshold = Duration::from_millis(if is_private { 200 } else { 500 });
// Create database connection pool
let db_pool = PgPoolOptions::new()
@@ -66,6 +62,14 @@ impl App {
"database pool established"
);
// Run database migrations
info!("Running database migrations...");
sqlx::migrate!("./migrations")
.run(&db_pool)
.await
.expect("Failed to run database migrations");
info!("Database migrations completed successfully");
// Create BannerApi and AppState
let banner_api = BannerApi::new_with_config(
config.banner_base_url.clone(),
@@ -108,10 +112,6 @@ impl App {
.register_service(ServiceName::Scraper.as_str(), scraper_service);
}
if services.contains(&ServiceName::Bot) {
// Bot service will be set up separately in run() method since it's async
}
// Check if any services are enabled
if !self.service_manager.has_services() && !services.contains(&ServiceName::Bot) {
error!("No services enabled. Cannot start application.");
@@ -123,10 +123,28 @@ impl App {
/// Setup bot service if enabled
pub async fn setup_bot_service(&mut self) -> Result<(), anyhow::Error> {
let client = BotService::create_client(&self.config, self.app_state.clone())
.await
.expect("Failed to create Discord client");
let bot_service = Box::new(BotService::new(client));
use std::sync::Arc;
use tokio::sync::{Mutex, broadcast};
// Create shutdown channel for status update task
let (status_shutdown_tx, status_shutdown_rx) = broadcast::channel(1);
let status_task_handle = Arc::new(Mutex::new(None));
let client = BotService::create_client(
&self.config,
self.app_state.clone(),
status_task_handle.clone(),
status_shutdown_rx,
)
.await
.expect("Failed to create Discord client");
let bot_service = Box::new(BotService::new(
client,
status_task_handle,
status_shutdown_tx,
));
self.service_manager
.register_service(ServiceName::Bot.as_str(), bot_service);
Ok(())
@@ -147,9 +165,4 @@ impl App {
pub fn config(&self) -> &Config {
&self.config
}
/// Get a reference to the app state
pub fn app_state(&self) -> &AppState {
&self.app_state
}
}

View File

@@ -152,6 +152,13 @@ impl BannerApi {
}
/// Performs a course search and handles common response processing.
#[tracing::instrument(
skip(self, query),
fields(
term = %term,
subject = %query.get_subject().unwrap_or(&"all".to_string())
)
)]
async fn perform_search(
&self,
term: &str,
@@ -318,12 +325,6 @@ impl BannerApi {
sort: &str,
sort_descending: bool,
) -> Result<SearchResult, BannerApiError> {
debug!(
term = term,
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
max_results = query.get_max_results(),
"Starting course search"
);
self.perform_search(term, query, sort, sort_descending)
.await
}

View File

@@ -1,10 +1,14 @@
//! JSON parsing utilities for the Banner API client.
use anyhow::Result;
use serde_json;
use serde_json::{self, Value};
/// Attempt to parse JSON and, on failure, include a contextual snippet of the
/// line where the error occurred. This prevents dumping huge JSON bodies to logs.
/// line where the error occurred.
///
/// In debug builds, this provides detailed context including the full JSON object
/// containing the error and type mismatch information. In release builds, it shows
/// a minimal snippet to prevent dumping huge JSON bodies to production logs.
pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Result<T> {
let jd = &mut serde_json::Deserializer::from_str(body);
match serde_path_to_error::deserialize(jd) {
@@ -12,27 +16,247 @@ pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Re
Err(err) => {
let inner_err = err.inner();
let (line, column) = (inner_err.line(), inner_err.column());
let snippet = build_error_snippet(body, line, column, 20);
let path = err.path().to_string();
let msg = inner_err.to_string();
let loc = format!(" at line {line} column {column}");
let msg_without_loc = msg.strip_suffix(&loc).unwrap_or(&msg).to_string();
let mut final_err = String::new();
if !path.is_empty() && path != "." {
final_err.push_str(&format!("for path '{}' ", path));
}
final_err.push_str(&format!(
"({msg_without_loc}) at line {line} column {column}"
));
final_err.push_str(&format!("\n{snippet}"));
// Build error message differently for debug vs release builds
let final_err = if cfg!(debug_assertions) {
// Debug mode: provide detailed context
let type_info = parse_type_mismatch(&msg_without_loc);
let context = extract_json_object_at_path(body, err.path(), line, column);
let mut err_msg = String::new();
if !path.is_empty() && path != "." {
err_msg.push_str(&format!("for path '{}'\n", path));
}
err_msg.push_str(&format!(
"({}) at line {} column {}\n\n",
type_info, line, column
));
err_msg.push_str(&context);
err_msg
} else {
// Release mode: minimal snippet to keep logs concise
let snippet = build_error_snippet(body, line, column, 20);
let mut err_msg = String::new();
if !path.is_empty() && path != "." {
err_msg.push_str(&format!("for path '{}' ", path));
}
err_msg.push_str(&format!(
"({}) at line {} column {}",
msg_without_loc, line, column
));
err_msg.push_str(&format!("\n{}", snippet));
err_msg
};
Err(anyhow::anyhow!(final_err))
}
}
}
/// Extract type mismatch information from a serde error message.
///
/// Parses error messages like "invalid type: null, expected a string" to extract
/// the expected and actual types for clearer error reporting.
///
/// Returns a formatted string like "(expected a string, got null)" or the original
/// message if parsing fails.
fn parse_type_mismatch(error_msg: &str) -> String {
// Try to parse "invalid type: X, expected Y" format
if let Some(invalid_start) = error_msg.find("invalid type: ") {
let after_prefix = &error_msg[invalid_start + "invalid type: ".len()..];
if let Some(comma_pos) = after_prefix.find(", expected ") {
let actual_type = &after_prefix[..comma_pos];
let expected_part = &after_prefix[comma_pos + ", expected ".len()..];
// Clean up expected part (remove " at line X column Y" if present)
let expected_type = expected_part
.split(" at line ")
.next()
.unwrap_or(expected_part)
.trim();
return format!("expected {}, got {}", expected_type, actual_type);
}
}
// Try to parse "expected X at line Y" format
if error_msg.starts_with("expected ")
&& let Some(expected_part) = error_msg.split(" at line ").next()
{
return expected_part.to_string();
}
// Fallback: return original message without location info
error_msg.to_string()
}
/// Extract and pretty-print the JSON object/array containing the parse error.
///
/// This function navigates to the error location using the serde path and extracts
/// the parent object or array to provide better context for debugging.
///
/// # Arguments
/// * `body` - The raw JSON string
/// * `path` - The serde path to the error (e.g., "data[0].faculty[0].displayName")
/// * `line` - Line number of the error (for fallback)
/// * `column` - Column number of the error (for fallback)
///
/// # Returns
/// A formatted string containing the JSON object with the error, or a fallback snippet
fn extract_json_object_at_path(
body: &str,
path: &serde_path_to_error::Path,
line: usize,
column: usize,
) -> String {
// Try to parse the entire JSON structure
let root_value: Value = match serde_json::from_str(body) {
Ok(v) => v,
Err(_) => {
// If we can't parse the JSON at all, fall back to line snippet
return build_error_snippet(body, line, column, 20);
}
};
// Navigate to the error location using the path
let path_str = path.to_string();
let segments = parse_path_segments(&path_str);
let (context_value, context_name) = navigate_to_context(&root_value, &segments);
// Pretty-print the context value with limited depth to avoid huge output
match serde_json::to_string_pretty(&context_value) {
Ok(pretty) => {
// Limit output to ~50 lines to prevent log spam
let lines: Vec<&str> = pretty.lines().collect();
let truncated = if lines.len() > 50 {
let mut result = lines[..47].join("\n");
result.push_str("\n ... (truncated, ");
result.push_str(&(lines.len() - 47).to_string());
result.push_str(" more lines)");
result
} else {
pretty
};
format!("{} at '{}':\n{}", context_name, path_str, truncated)
}
Err(_) => {
// Fallback to simple snippet if pretty-print fails
build_error_snippet(body, line, column, 20)
}
}
}
/// Parse a JSON path string into segments for navigation.
///
/// Converts paths like "data[0].faculty[1].displayName" into a sequence of
/// object keys and array indices.
fn parse_path_segments(path: &str) -> Vec<PathSegment> {
let mut segments = Vec::new();
let mut current = String::new();
let mut in_bracket = false;
for ch in path.chars() {
match ch {
'.' if !in_bracket => {
if !current.is_empty() {
segments.push(PathSegment::Key(current.clone()));
current.clear();
}
}
'[' => {
if !current.is_empty() {
segments.push(PathSegment::Key(current.clone()));
current.clear();
}
in_bracket = true;
}
']' => {
if in_bracket && !current.is_empty() {
if let Ok(index) = current.parse::<usize>() {
segments.push(PathSegment::Index(index));
}
current.clear();
}
in_bracket = false;
}
_ => current.push(ch),
}
}
if !current.is_empty() {
segments.push(PathSegment::Key(current));
}
segments
}
/// Represents a segment in a JSON path (either an object key or array index).
#[derive(Debug)]
enum PathSegment {
Key(String),
Index(usize),
}
/// Navigate through a JSON value using path segments and return the appropriate context.
///
/// This function walks the JSON structure and returns the parent object/array that
/// contains the error, providing meaningful context for debugging.
///
/// # Returns
/// A tuple of (context_value, description) where context_value is the JSON to display
/// and description is a human-readable name for what we're showing.
fn navigate_to_context<'a>(
mut current: &'a Value,
segments: &[PathSegment],
) -> (&'a Value, &'static str) {
// If path is empty or just root, return the whole value
if segments.is_empty() {
return (current, "Root object");
}
// Try to navigate to the parent of the error location
// We want to show the containing object/array, not just the failing field
let parent_depth = segments.len().saturating_sub(1);
for (i, segment) in segments.iter().enumerate() {
// Stop one level before the end to show the parent context
if i >= parent_depth {
break;
}
match segment {
PathSegment::Key(key) => {
if let Some(next) = current.get(key) {
current = next;
} else {
// Can't navigate further, return what we have
return (current, "Partial context (navigation stopped)");
}
}
PathSegment::Index(idx) => {
if let Some(next) = current.get(idx) {
current = next;
} else {
return (current, "Partial context (index out of bounds)");
}
}
}
}
(current, "Object containing error")
}
fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usize) -> String {
let target_line = body.lines().nth(line.saturating_sub(1)).unwrap_or("");
if target_line.is_empty() {
@@ -53,3 +277,139 @@ fn build_error_snippet(body: &str, line: usize, column: usize, context_len: usiz
format!("...{slice}...\n {indicator}")
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[test]
fn test_parse_type_mismatch_invalid_type() {
let msg = "invalid type: null, expected a string at line 45 column 29";
let result = parse_type_mismatch(msg);
assert_eq!(result, "expected a string, got null");
}
#[test]
fn test_parse_type_mismatch_expected() {
let msg = "expected value at line 1 column 1";
let result = parse_type_mismatch(msg);
assert_eq!(result, "expected value");
}
#[test]
fn test_parse_path_segments_simple() {
let segments = parse_path_segments("data.name");
assert_eq!(segments.len(), 2);
match &segments[0] {
PathSegment::Key(k) => assert_eq!(k, "data"),
_ => panic!("Expected Key segment"),
}
}
#[test]
fn test_parse_path_segments_with_array() {
let segments = parse_path_segments("data[0].faculty[1].displayName");
assert_eq!(segments.len(), 5);
match &segments[0] {
PathSegment::Key(k) => assert_eq!(k, "data"),
_ => panic!("Expected Key segment"),
}
match &segments[1] {
PathSegment::Index(i) => assert_eq!(*i, 0),
_ => panic!("Expected Index segment"),
}
}
#[test]
fn test_parse_json_with_context_null_value() {
#[derive(Debug, Deserialize)]
struct TestStruct {
name: String,
}
let json = r#"{"name": null}"#;
let result: Result<TestStruct> = parse_json_with_context(json);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
// Should contain path info
assert!(err_msg.contains("name"));
// In debug mode, should contain detailed context
if cfg!(debug_assertions) {
assert!(err_msg.contains("expected"));
}
}
#[test]
fn test_navigate_to_context() {
let json = r#"{"data": [{"faculty": [{"name": "John"}]}]}"#;
let value: Value = serde_json::from_str(json).unwrap();
let segments = parse_path_segments("data[0].faculty[0].name");
let (context, _) = navigate_to_context(&value, &segments);
// Should return the faculty[0] object (parent of 'name')
assert!(context.is_object());
assert!(context.get("name").is_some());
}
#[test]
fn test_realistic_banner_error() {
#[derive(Debug, Deserialize)]
struct Course {
#[allow(dead_code)]
#[serde(rename = "courseTitle")]
course_title: String,
faculty: Vec<Faculty>,
}
#[derive(Debug, Deserialize)]
struct Faculty {
#[serde(rename = "displayName")]
display_name: String,
#[allow(dead_code)]
email: String,
}
#[derive(Debug, Deserialize)]
struct SearchResult {
data: Vec<Course>,
}
// Simulate Banner API response with null faculty displayName
// This mimics the actual error from SPN subject scrape
let json = r#"{
"data": [
{
"courseTitle": "Spanish Conversation",
"faculty": [
{
"displayName": null,
"email": "instructor@utsa.edu"
}
]
}
]
}"#;
let result: Result<SearchResult> = parse_json_with_context(json);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
println!("\n=== Error output in debug mode ===\n{}\n", err_msg);
// Verify error contains key information
assert!(err_msg.contains("data[0].faculty[0].displayName"));
// In debug mode, should show detailed context
if cfg!(debug_assertions) {
// Should show type mismatch info
assert!(err_msg.contains("expected") && err_msg.contains("got"));
// Should show surrounding JSON context with the faculty object
assert!(err_msg.contains("email"));
}
}
}

View File

@@ -3,10 +3,13 @@
use http::Extensions;
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next};
use tracing::{trace, warn};
use tracing::{debug, trace, warn};
pub struct TransparentMiddleware;
/// Threshold for logging slow requests at DEBUG level (in milliseconds)
const SLOW_REQUEST_THRESHOLD_MS: u128 = 1000;
#[async_trait::async_trait]
impl Middleware for TransparentMiddleware {
async fn handle(
@@ -15,33 +18,56 @@ impl Middleware for TransparentMiddleware {
extensions: &mut Extensions,
next: Next<'_>,
) -> std::result::Result<Response, reqwest_middleware::Error> {
trace!(
domain = req.url().domain(),
headers = ?req.headers(),
"{method} {path}",
method = req.method().to_string(),
path = req.url().path(),
);
let method = req.method().to_string();
let path = req.url().path().to_string();
let start = std::time::Instant::now();
let response_result = next.run(req, extensions).await;
let duration = start.elapsed();
match response_result {
Ok(response) => {
if response.status().is_success() {
trace!(
"{code} {reason} {path}",
code = response.status().as_u16(),
reason = response.status().canonical_reason().unwrap_or("??"),
path = response.url().path(),
);
let duration_ms = duration.as_millis();
if duration_ms >= SLOW_REQUEST_THRESHOLD_MS {
debug!(
method = method,
path = path,
status = response.status().as_u16(),
duration_ms = duration_ms,
"Request completed (slow)"
);
} else {
trace!(
method = method,
path = path,
status = response.status().as_u16(),
duration_ms = duration_ms,
"Request completed"
);
}
Ok(response)
} else {
let e = response.error_for_status_ref().unwrap_err();
warn!(error = ?e, "Request failed (server)");
warn!(
method = method,
path = path,
error = ?e,
status = response.status().as_u16(),
duration_ms = duration.as_millis(),
"Request failed"
);
Ok(response)
}
}
Err(error) => {
warn!(error = ?error, "Request failed (middleware)");
warn!(
method = method,
path = path,
error = ?error,
duration_ms = duration.as_millis(),
"Request failed"
);
Err(error)
}
}

View File

@@ -4,7 +4,7 @@ use crate::banner::rate_limiter::{RequestType, SharedRateLimiter};
use http::Extensions;
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next};
use tracing::{debug, trace, warn};
use tracing::debug;
use url::Url;
/// Middleware that enforces rate limiting based on request URL patterns
@@ -18,6 +18,16 @@ impl RateLimitMiddleware {
Self { rate_limiter }
}
/// Returns a human-readable description of the rate limit for a request type
fn get_rate_limit_description(request_type: RequestType) -> &'static str {
match request_type {
RequestType::Session => "6 rpm (~10s interval)",
RequestType::Search => "30 rpm (~2s interval)",
RequestType::Metadata => "20 rpm (~3s interval)",
RequestType::Reset => "10 rpm (~6s interval)",
}
}
/// Determines the request type based on the URL path
fn get_request_type(url: &Url) -> RequestType {
let path = url.path();
@@ -53,49 +63,22 @@ impl Middleware for RateLimitMiddleware {
) -> std::result::Result<Response, reqwest_middleware::Error> {
let request_type = Self::get_request_type(req.url());
trace!(
url = %req.url(),
request_type = ?request_type,
"Rate limiting request"
);
// Wait for permission to make the request
let start = std::time::Instant::now();
self.rate_limiter.wait_for_permission(request_type).await;
let wait_duration = start.elapsed();
trace!(
url = %req.url(),
request_type = ?request_type,
"Rate limit permission granted, making request"
);
// Only log if rate limiting caused significant delay (>= 500ms)
if wait_duration.as_millis() >= 500 {
let limit_desc = Self::get_rate_limit_description(request_type);
debug!(
request_type = ?request_type,
wait_ms = wait_duration.as_millis(),
rate_limit = limit_desc,
"Rate limit caused delay"
);
}
// Make the actual request
let response_result = next.run(req, extensions).await;
match response_result {
Ok(response) => {
if response.status().is_success() {
trace!(
url = %response.url(),
status = response.status().as_u16(),
"Request completed successfully"
);
} else {
warn!(
url = %response.url(),
status = response.status().as_u16(),
"Request completed with error status"
);
}
Ok(response)
}
Err(error) => {
warn!(
url = ?error.url(),
error = ?error,
"Request failed"
);
Err(error)
}
}
next.run(req, extensions).await
}
}

View File

@@ -8,7 +8,6 @@ use governor::{
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, trace, warn};
/// Different types of Banner API requests with different rate limits
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@@ -99,12 +98,8 @@ impl BannerRateLimiter {
RequestType::Reset => &self.reset_limiter,
};
trace!(request_type = ?request_type, "Waiting for rate limit permission");
// Wait until we can make the request
// Wait until we can make the request (logging handled by middleware)
limiter.until_ready().await;
trace!(request_type = ?request_type, "Rate limit permission granted");
}
}

View File

@@ -82,7 +82,6 @@ impl BannerSession {
/// Updates the last activity timestamp
pub fn touch(&mut self) {
trace!(id = self.unique_session_id, "Session was used");
self.last_activity = Some(Instant::now());
}
@@ -162,7 +161,7 @@ impl TermPool {
async fn release(&self, session: BannerSession) {
let id = session.unique_session_id.clone();
if session.is_expired() {
trace!(id = id, "Session is now expired, dropping.");
debug!(id = id, "Session expired, dropping");
// Wake up a waiter, as it might need to create a new session
// if this was the last one.
self.notifier.notify_one();
@@ -171,10 +170,8 @@ impl TermPool {
let mut queue = self.sessions.lock().await;
queue.push_back(session);
let queue_size = queue.len();
drop(queue); // Release lock before notifying
trace!(id = id, queue_size, "Session returned to pool");
self.notifier.notify_one();
}
}
@@ -204,22 +201,21 @@ impl SessionPool {
.or_insert_with(|| Arc::new(TermPool::new()))
.clone();
let start = Instant::now();
let mut waited_for_creation = false;
loop {
// Fast path: Try to get an existing, non-expired session.
{
let mut queue = term_pool.sessions.lock().await;
if let Some(session) = queue.pop_front() {
if !session.is_expired() {
trace!(id = session.unique_session_id, "Reusing session from pool");
return Ok(PooledSession {
session: Some(session),
pool: Arc::clone(&term_pool),
});
} else {
trace!(
id = session.unique_session_id,
"Popped an expired session, discarding."
);
debug!(id = session.unique_session_id, "Discarded expired session");
}
}
} // MutexGuard is dropped, lock is released.
@@ -229,7 +225,10 @@ impl SessionPool {
if *is_creating_guard {
// Another task is already creating a session. Release the lock and wait.
drop(is_creating_guard);
trace!("Another task is creating a session, waiting for notification...");
if !waited_for_creation {
trace!("Waiting for another task to create session");
waited_for_creation = true;
}
term_pool.notifier.notified().await;
// Loop back to the top to try the fast path again.
continue;
@@ -240,12 +239,11 @@ impl SessionPool {
drop(is_creating_guard);
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
trace!("Pool empty, racing notifier vs rate limiter...");
trace!("Pool empty, creating new session");
tokio::select! {
_ = term_pool.notifier.notified() => {
// A session was returned while we were waiting!
// We are no longer the creator. Reset the flag and loop to race for the new session.
trace!("Notified that a session was returned. Looping to retry.");
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
@@ -253,7 +251,6 @@ impl SessionPool {
}
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
// The rate limit has elapsed. It's our job to create the session.
trace!("Rate limiter ready. Proceeding to create a new session.");
let new_session_result = self.create_session(&term).await;
// After creation, we are no longer the creator. Reset the flag
@@ -265,7 +262,12 @@ impl SessionPool {
match new_session_result {
Ok(new_session) => {
debug!(id = new_session.unique_session_id, "Successfully created new session");
let elapsed = start.elapsed();
debug!(
id = new_session.unique_session_id,
elapsed_ms = elapsed.as_millis(),
"Created new session"
);
return Ok(PooledSession {
session: Some(new_session),
pool: term_pool,
@@ -298,8 +300,12 @@ impl SessionPool {
.get_all("Set-Cookie")
.iter()
.filter_map(|header_value| {
if let Ok(cookie) = Cookie::parse(header_value.to_str().unwrap()) {
Some((cookie.name().to_string(), cookie.value().to_string()))
if let Ok(cookie_str) = header_value.to_str() {
if let Ok(cookie) = Cookie::parse(cookie_str) {
Some((cookie.name().to_string(), cookie.value().to_string()))
} else {
None
}
} else {
None
}
@@ -310,16 +316,14 @@ impl SessionPool {
return Err(anyhow::anyhow!("Failed to get cookies"));
}
let jsessionid = cookies.get("JSESSIONID").unwrap();
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
let jsessionid = cookies
.get("JSESSIONID")
.ok_or_else(|| anyhow::anyhow!("JSESSIONID cookie missing after validation"))?;
let ssb_cookie = cookies
.get("SSB_COOKIE")
.ok_or_else(|| anyhow::anyhow!("SSB_COOKIE cookie missing after validation"))?;
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
trace!(
jsessionid = jsessionid,
ssb_cookie = ssb_cookie,
"New session cookies acquired"
);
self.http
.get(format!("{}/selfServiceMenu/data", self.base_url))
.header("Cookie", &cookie_header)
@@ -435,8 +439,23 @@ impl SessionPool {
let redirect: RedirectResponse = response.json().await?;
let base_url_path = self.base_url.parse::<Url>().unwrap().path().to_string();
let non_overlap_redirect = redirect.fwd_url.strip_prefix(&base_url_path).unwrap();
let base_url_path = self
.base_url
.parse::<Url>()
.context("Failed to parse base URL")?
.path()
.to_string();
let non_overlap_redirect =
redirect
.fwd_url
.strip_prefix(&base_url_path)
.ok_or_else(|| {
anyhow::anyhow!(
"Redirect URL '{}' does not start with expected prefix '{}'",
redirect.fwd_url,
base_url_path
)
})?;
// Follow the redirect
let redirect_url = format!("{}{}", self.base_url, non_overlap_redirect);
@@ -454,7 +473,6 @@ impl SessionPool {
));
}
trace!(term = term, "successfully selected term");
Ok(())
}
}

135
src/data/batch.rs Normal file
View File

@@ -0,0 +1,135 @@
//! Batch database operations for improved performance.
use crate::banner::Course;
use crate::error::Result;
use sqlx::PgPool;
use std::time::Instant;
use tracing::info;
/// Batch upsert courses in a single database query.
///
/// This function performs a bulk INSERT...ON CONFLICT DO UPDATE for all courses
/// in a single round-trip to the database, significantly reducing overhead compared
/// to individual inserts.
///
/// # Performance
/// - Reduces N database round-trips to 1
/// - Typical usage: 50-200 courses per batch
/// - PostgreSQL parameter limit: 65,535 (we use ~10 per course)
///
/// # Arguments
/// * `courses` - Slice of Course structs from the Banner API
/// * `db_pool` - PostgreSQL connection pool
///
/// # Returns
/// * `Ok(())` on success
/// * `Err(_)` if the database operation fails
///
/// # Example
/// ```no_run
/// use banner::data::batch::batch_upsert_courses;
/// use banner::banner::Course;
/// use sqlx::PgPool;
///
/// async fn example(courses: &[Course], pool: &PgPool) -> anyhow::Result<()> {
/// batch_upsert_courses(courses, pool).await?;
/// Ok(())
/// }
/// ```
pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Result<()> {
// Early return for empty batches
if courses.is_empty() {
info!("No courses to upsert, skipping batch operation");
return Ok(());
}
let start = Instant::now();
let course_count = courses.len();
// Extract course fields into vectors for UNNEST
let crns: Vec<&str> = courses
.iter()
.map(|c| c.course_reference_number.as_str())
.collect();
let subjects: Vec<&str> = courses.iter().map(|c| c.subject.as_str()).collect();
let course_numbers: Vec<&str> = courses.iter().map(|c| c.course_number.as_str()).collect();
let titles: Vec<&str> = courses.iter().map(|c| c.course_title.as_str()).collect();
let term_codes: Vec<&str> = courses.iter().map(|c| c.term.as_str()).collect();
let enrollments: Vec<i32> = courses.iter().map(|c| c.enrollment).collect();
let max_enrollments: Vec<i32> = courses.iter().map(|c| c.maximum_enrollment).collect();
let wait_counts: Vec<i32> = courses.iter().map(|c| c.wait_count).collect();
let wait_capacities: Vec<i32> = courses.iter().map(|c| c.wait_capacity).collect();
// Perform batch upsert using UNNEST for efficient bulk insertion
let result = sqlx::query(
r#"
INSERT INTO courses (
crn, subject, course_number, title, term_code,
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
)
SELECT * FROM UNNEST(
$1::text[], $2::text[], $3::text[], $4::text[], $5::text[],
$6::int4[], $7::int4[], $8::int4[], $9::int4[],
array_fill(NOW()::timestamptz, ARRAY[$10])
) AS t(
crn, subject, course_number, title, term_code,
enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at
)
ON CONFLICT (crn, term_code)
DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&crns)
.bind(&subjects)
.bind(&course_numbers)
.bind(&titles)
.bind(&term_codes)
.bind(&enrollments)
.bind(&max_enrollments)
.bind(&wait_counts)
.bind(&wait_capacities)
.bind(course_count as i32)
.execute(db_pool)
.await
.map_err(|e| anyhow::anyhow!("Failed to batch upsert courses: {}", e))?;
let duration = start.elapsed();
info!(
courses_count = course_count,
rows_affected = result.rows_affected(),
duration_ms = duration.as_millis(),
"Batch upserted courses"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_batch_returns_ok() {
// This is a basic compile-time test
// Runtime tests would require sqlx::test macro and a test database
let courses: Vec<Course> = vec![];
assert_eq!(courses.len(), 0);
}
}

View File

@@ -1,3 +1,4 @@
//! Database models and schema.
pub mod batch;
pub mod models;

View File

@@ -72,4 +72,8 @@ pub struct ScrapeJob {
pub execute_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub locked_at: Option<DateTime<Utc>>,
/// Number of retry attempts for this job (non-negative, enforced by CHECK constraint)
pub retry_count: i32,
/// Maximum number of retry attempts allowed (non-negative, enforced by CHECK constraint)
pub max_retries: i32,
}

View File

@@ -11,4 +11,5 @@ pub mod scraper;
pub mod services;
pub mod signals;
pub mod state;
pub mod utils;
pub mod web;

View File

@@ -7,10 +7,13 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber};
/// Configure and initialize logging for the application
pub fn setup_logging(config: &Config, tracing_format: TracingFormat) {
// Configure logging based on config
// Note: Even when base_level is trace or debug, we suppress trace logs from noisy
// infrastructure modules to keep output readable. These modules use debug for important
// events and trace only for very detailed debugging.
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let base_level = &config.log_level;
EnvFilter::new(format!(
"warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn",
"warn,banner={},banner::rate_limiter=warn,banner::session=debug,banner::rate_limit_middleware=warn,banner::middleware=debug",
base_level
))
});

View File

@@ -1,10 +1,11 @@
use super::Job;
use crate::banner::{BannerApi, Course, SearchQuery, Term};
use crate::banner::{BannerApi, SearchQuery, Term};
use crate::data::batch::batch_upsert_courses;
use crate::data::models::TargetType;
use crate::error::Result;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tracing::{debug, info, trace};
use tracing::{debug, info};
/// Job implementation for scraping subject data
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -24,9 +25,9 @@ impl Job for SubjectJob {
TargetType::Subject
}
#[tracing::instrument(skip(self, banner_api, db_pool), fields(subject = %self.subject))]
async fn process(&self, banner_api: &BannerApi, db_pool: &PgPool) -> Result<()> {
let subject_code = &self.subject;
debug!(subject = subject_code, "Processing subject job");
// Get the current term
let term = Term::get_current().inner().to_string();
@@ -42,9 +43,7 @@ impl Job for SubjectJob {
count = courses_from_api.len(),
"Found courses"
);
for course in courses_from_api {
self.upsert_course(&course, db_pool).await?;
}
batch_upsert_courses(&courses_from_api, db_pool).await?;
}
debug!(subject = subject_code, "Subject job completed");
@@ -55,39 +54,3 @@ impl Job for SubjectJob {
format!("Scrape subject: {}", self.subject)
}
}
impl SubjectJob {
async fn upsert_course(&self, course: &Course, db_pool: &PgPool) -> Result<()> {
sqlx::query(
r#"
INSERT INTO courses (crn, subject, course_number, title, term_code, enrollment, max_enrollment, wait_count, wait_capacity, last_scraped_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (crn, term_code) DO UPDATE SET
subject = EXCLUDED.subject,
course_number = EXCLUDED.course_number,
title = EXCLUDED.title,
enrollment = EXCLUDED.enrollment,
max_enrollment = EXCLUDED.max_enrollment,
wait_count = EXCLUDED.wait_count,
wait_capacity = EXCLUDED.wait_capacity,
last_scraped_at = EXCLUDED.last_scraped_at
"#,
)
.bind(&course.course_reference_number)
.bind(&course.subject)
.bind(&course.course_number)
.bind(&course.course_title)
.bind(&course.term)
.bind(course.enrollment)
.bind(course.maximum_enrollment)
.bind(course.wait_count)
.bind(course.wait_capacity)
.bind(chrono::Utc::now())
.execute(db_pool)
.await
.map(|result| {
trace!(subject = course.subject, crn = course.course_reference_number, result = ?result, "Course upserted");
})
.map_err(|e| anyhow::anyhow!("Failed to upsert course: {e}"))
}
}

View File

@@ -3,14 +3,15 @@ pub mod scheduler;
pub mod worker;
use crate::banner::BannerApi;
use crate::services::Service;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::info;
use tracing::{info, warn};
use self::scheduler::Scheduler;
use self::worker::Worker;
use crate::services::Service;
/// The main service that will be managed by the application's `ServiceManager`.
///
@@ -21,6 +22,7 @@ pub struct ScraperService {
banner_api: Arc<BannerApi>,
scheduler_handle: Option<JoinHandle<()>>,
worker_handles: Vec<JoinHandle<()>>,
shutdown_tx: Option<broadcast::Sender<()>>,
}
impl ScraperService {
@@ -31,6 +33,7 @@ impl ScraperService {
banner_api,
scheduler_handle: None,
worker_handles: Vec::new(),
shutdown_tx: None,
}
}
@@ -38,9 +41,14 @@ impl ScraperService {
pub fn start(&mut self) {
info!("ScraperService starting");
// Create shutdown channel
let (shutdown_tx, _) = broadcast::channel(1);
self.shutdown_tx = Some(shutdown_tx.clone());
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
let shutdown_rx = shutdown_tx.subscribe();
let scheduler_handle = tokio::spawn(async move {
scheduler.run().await;
scheduler.run(shutdown_rx).await;
});
self.scheduler_handle = Some(scheduler_handle);
info!("Scheduler task spawned");
@@ -48,8 +56,9 @@ impl ScraperService {
let worker_count = 4; // This could be configurable
for i in 0..worker_count {
let worker = Worker::new(i, self.db_pool.clone(), self.banner_api.clone());
let shutdown_rx = shutdown_tx.subscribe();
let worker_handle = tokio::spawn(async move {
worker.run().await;
worker.run(shutdown_rx).await;
});
self.worker_handles.push(worker_handle);
}
@@ -58,18 +67,6 @@ impl ScraperService {
"Spawned worker tasks"
);
}
/// Signals all child tasks to gracefully shut down.
pub async fn shutdown(&mut self) {
info!("Shutting down scraper service");
if let Some(handle) = self.scheduler_handle.take() {
handle.abort();
}
for handle in self.worker_handles.drain(..) {
handle.abort();
}
info!("Scraper service shutdown");
}
}
#[async_trait::async_trait]
@@ -85,7 +82,35 @@ impl Service for ScraperService {
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
self.shutdown().await;
info!("Shutting down scraper service");
// Send shutdown signal to all tasks
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
} else {
warn!("No shutdown channel found for scraper service");
return Err(anyhow::anyhow!("No shutdown channel available"));
}
// Collect all handles
let mut all_handles = Vec::new();
if let Some(handle) = self.scheduler_handle.take() {
all_handles.push(handle);
}
all_handles.append(&mut self.worker_handles);
// Wait for all tasks to complete (no internal timeout - let ServiceManager handle it)
let results = futures::future::join_all(all_handles).await;
let failed = results.iter().filter(|r| r.is_err()).count();
if failed > 0 {
warn!(
failed_count = failed,
"Some scraper tasks panicked during shutdown"
);
return Err(anyhow::anyhow!("{} task(s) panicked", failed));
}
info!("All scraper tasks shutdown gracefully");
Ok(())
}
}

View File

@@ -6,8 +6,10 @@ use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time;
use tracing::{debug, error, info, trace};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
@@ -23,31 +25,92 @@ impl Scheduler {
}
}
/// Runs the scheduler's main loop.
pub async fn run(&self) {
/// Runs the scheduler's main loop with graceful shutdown support.
///
/// The scheduler wakes up every 60 seconds to analyze data and enqueue jobs.
/// When a shutdown signal is received:
/// 1. Any in-progress scheduling work is gracefully cancelled via CancellationToken
/// 2. The scheduler waits up to 5 seconds for work to complete
/// 3. If timeout occurs, the task is abandoned (it will be aborted when dropped)
///
/// This ensures that shutdown is responsive even if scheduling work is blocked.
pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) {
info!("Scheduler service started");
let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute
let work_interval = Duration::from_secs(60);
let mut next_run = time::Instant::now();
let mut current_work: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = None;
loop {
interval.tick().await;
// Scheduler analyzing data...
if let Err(e) = self.schedule_jobs().await {
error!(error = ?e, "Failed to schedule jobs");
tokio::select! {
_ = time::sleep_until(next_run) => {
let cancel_token = CancellationToken::new();
// Spawn work in separate task to allow graceful cancellation during shutdown.
// Without this, shutdown would have to wait for the full scheduling cycle.
let work_handle = tokio::spawn({
let db_pool = self.db_pool.clone();
let banner_api = self.banner_api.clone();
let cancel_token = cancel_token.clone();
async move {
tokio::select! {
result = Self::schedule_jobs_impl(&db_pool, &banner_api) => {
if let Err(e) = result {
error!(error = ?e, "Failed to schedule jobs");
}
}
_ = cancel_token.cancelled() => {
debug!("Scheduling work cancelled gracefully");
}
}
}
});
current_work = Some((work_handle, cancel_token));
next_run = time::Instant::now() + work_interval;
}
_ = shutdown_rx.recv() => {
info!("Scheduler received shutdown signal");
if let Some((handle, cancel_token)) = current_work.take() {
cancel_token.cancel();
// Wait briefly for graceful completion
if tokio::time::timeout(Duration::from_secs(5), handle).await.is_err() {
warn!("Scheduling work did not complete within 5s, abandoning");
} else {
debug!("Scheduling work completed gracefully");
}
}
info!("Scheduler exiting gracefully");
break;
}
}
}
}
/// The core logic for deciding what jobs to create.
async fn schedule_jobs(&self) -> Result<()> {
/// Core scheduling logic that analyzes data and creates scrape jobs.
///
/// Strategy:
/// 1. Fetch all subjects for the current term from Banner API
/// 2. Query existing jobs in a single batch query
/// 3. Create jobs only for subjects that don't have pending jobs
///
/// This is a static method (not &self) to allow it to be called from spawned tasks.
#[tracing::instrument(skip_all, fields(term))]
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API.
// 2. Query existing jobs for all subjects in a single query.
// 3. Create new jobs only for subjects that don't have existing jobs.
let term = Term::get_current().inner().to_string();
tracing::Span::current().record("term", term.as_str());
debug!(term = term, "Enqueuing subject jobs");
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
let subjects = banner_api.get_subjects("", &term, 1, 500).await?;
debug!(
subject_count = subjects.len(),
"Retrieved subjects from API"
@@ -61,12 +124,12 @@ impl Scheduler {
// Query existing jobs for all subjects in a single query
let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as(
"SELECT target_payload FROM scrape_jobs
"SELECT target_payload FROM scrape_jobs
WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL",
)
.bind(TargetType::Subject)
.bind(&subject_payloads)
.fetch_all(&self.db_pool)
.fetch_all(db_pool)
.await?;
// Convert to a HashSet for efficient lookup
@@ -76,6 +139,7 @@ impl Scheduler {
.collect();
// Filter out subjects that already have jobs and prepare new jobs
let mut skipped_count = 0;
let new_jobs: Vec<_> = subjects
.into_iter()
.filter_map(|subject| {
@@ -84,7 +148,7 @@ impl Scheduler {
let payload_str = payload.to_string();
if existing_payloads.contains(&payload_str) {
trace!(subject = subject.code, "Job already exists, skipping");
skipped_count += 1;
None
} else {
Some((payload, subject.code))
@@ -92,10 +156,14 @@ impl Scheduler {
})
.collect();
if skipped_count > 0 {
debug!(count = skipped_count, "Skipped subjects with existing jobs");
}
// Insert all new jobs in a single batch
if !new_jobs.is_empty() {
let now = chrono::Utc::now();
let mut tx = self.db_pool.begin().await?;
let mut tx = db_pool.begin().await?;
for (payload, subject_code) in new_jobs {
sqlx::query(

View File

@@ -5,8 +5,9 @@ use crate::scraper::jobs::{JobError, JobType};
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time;
use tracing::{debug, error, info, trace, warn};
use tracing::{Instrument, debug, error, info, trace, warn};
/// A single worker instance.
///
@@ -28,79 +29,52 @@ impl Worker {
}
/// Runs the worker's main loop.
pub async fn run(&self) {
info!(worker_id = self.id, "Worker started.");
loop {
match self.fetch_and_lock_job().await {
Ok(Some(job)) => {
let job_id = job.id;
debug!(worker_id = self.id, job_id = job.id, "Processing job");
match self.process_job(job).await {
Ok(()) => {
debug!(worker_id = self.id, job_id, "Job completed");
// If successful, delete the job.
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete job"
);
}
}
Err(JobError::Recoverable(e)) => {
// Check if the error is due to an invalid session
if let Some(BannerApiError::InvalidSession(_)) =
e.downcast_ref::<BannerApiError>()
{
warn!(
worker_id = self.id,
job_id, "Invalid session detected. Forcing session refresh."
);
} else {
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
}
pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) {
info!(worker_id = self.id, "Worker started");
// Unlock the job so it can be retried
if let Err(unlock_err) = self.unlock_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?unlock_err,
"Failed to unlock job"
);
}
loop {
// Fetch and lock a job, racing against shutdown signal
let job = tokio::select! {
_ = shutdown_rx.recv() => {
info!(worker_id = self.id, "Worker received shutdown signal, exiting gracefully");
break;
}
result = self.fetch_and_lock_job() => {
match result {
Ok(Some(job)) => job,
Ok(None) => {
trace!(worker_id = self.id, "No jobs available, waiting");
time::sleep(Duration::from_secs(5)).await;
continue;
}
Err(JobError::Unrecoverable(e)) => {
error!(
worker_id = self.id,
job_id,
error = ?e,
"Job corrupted, deleting"
);
// Parse errors are unrecoverable - delete the job
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete corrupted job"
);
}
Err(e) => {
warn!(worker_id = self.id, error = ?e, "Failed to fetch job, waiting");
time::sleep(Duration::from_secs(10)).await;
continue;
}
}
}
Ok(None) => {
// No job found, wait for a bit before polling again.
trace!(worker_id = self.id, "No jobs available, waiting");
time::sleep(Duration::from_secs(5)).await;
};
let job_id = job.id;
let retry_count = job.retry_count;
let max_retries = job.max_retries;
let start = std::time::Instant::now();
// Process the job, racing against shutdown signal
let process_result = tokio::select! {
_ = shutdown_rx.recv() => {
self.handle_shutdown_during_processing(job_id).await;
break;
}
Err(e) => {
warn!(worker_id = self.id, error = ?e, "Failed to fetch job");
// Wait before retrying to avoid spamming errors.
time::sleep(Duration::from_secs(10)).await;
}
}
result = self.process_job(job) => result
};
let duration = start.elapsed();
// Handle the job processing result
self.handle_job_result(job_id, retry_count, max_retries, process_result, duration)
.await;
}
}
@@ -137,20 +111,31 @@ impl Worker {
// Get the job implementation
let job_impl = job_type.boxed();
debug!(
worker_id = self.id,
// Create span with job context
let span = tracing::debug_span!(
"process_job",
job_id = job.id,
description = job_impl.description(),
"Processing job"
job_type = job_impl.description()
);
// Process the job - API errors are recoverable
job_impl
.process(&self.banner_api, &self.db_pool)
.await
.map_err(JobError::Recoverable)?;
async move {
debug!(
worker_id = self.id,
job_id = job.id,
description = job_impl.description(),
"Processing job"
);
Ok(())
// Process the job - API errors are recoverable
job_impl
.process(&self.banner_api, &self.db_pool)
.await
.map_err(JobError::Recoverable)?;
Ok(())
}
.instrument(span)
.await
}
async fn delete_job(&self, job_id: i32) -> Result<()> {
@@ -166,7 +151,150 @@ impl Worker {
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job unlocked for retry");
Ok(())
}
async fn unlock_and_increment_retry(&self, job_id: i32, max_retries: i32) -> Result<bool> {
let result = sqlx::query_scalar::<_, Option<i32>>(
"UPDATE scrape_jobs
SET locked_at = NULL, retry_count = retry_count + 1
WHERE id = $1
RETURNING CASE WHEN retry_count + 1 < $2 THEN retry_count + 1 ELSE NULL END",
)
.bind(job_id)
.bind(max_retries)
.fetch_one(&self.db_pool)
.await?;
Ok(result.is_some())
}
/// Handle shutdown signal received during job processing
async fn handle_shutdown_during_processing(&self, job_id: i32) {
info!(
worker_id = self.id,
job_id, "Shutdown received during job processing"
);
if let Err(e) = self.unlock_job(job_id).await {
warn!(
worker_id = self.id,
job_id,
error = ?e,
"Failed to unlock job during shutdown"
);
} else {
debug!(worker_id = self.id, job_id, "Job unlocked during shutdown");
}
info!(worker_id = self.id, "Worker exiting gracefully");
}
/// Handle the result of job processing
async fn handle_job_result(
&self,
job_id: i32,
retry_count: i32,
max_retries: i32,
result: Result<(), JobError>,
duration: std::time::Duration,
) {
match result {
Ok(()) => {
debug!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
"Job completed successfully"
);
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
}
}
Err(JobError::Recoverable(e)) => {
self.handle_recoverable_error(job_id, retry_count, max_retries, e, duration)
.await;
}
Err(JobError::Unrecoverable(e)) => {
error!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
error = ?e,
"Job corrupted, deleting"
);
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
}
}
}
}
/// Handle recoverable errors by logging appropriately and unlocking the job
async fn handle_recoverable_error(
&self,
job_id: i32,
retry_count: i32,
max_retries: i32,
e: anyhow::Error,
duration: std::time::Duration,
) {
let next_attempt = retry_count.saturating_add(1);
let remaining_retries = max_retries.saturating_sub(next_attempt);
// Log the error appropriately based on type
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
warn!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
retry_attempt = next_attempt,
max_retries = max_retries,
remaining_retries = remaining_retries,
"Invalid session detected, will retry"
);
} else {
error!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
retry_attempt = next_attempt,
max_retries = max_retries,
remaining_retries = remaining_retries,
error = ?e,
"Failed to process job, will retry"
);
}
// Atomically unlock and increment retry count, checking if retry is allowed
match self.unlock_and_increment_retry(job_id, max_retries).await {
Ok(can_retry) if can_retry => {
info!(
worker_id = self.id,
job_id,
retry_attempt = next_attempt,
remaining_retries = remaining_retries,
"Job unlocked for retry"
);
}
Ok(_) => {
// Max retries exceeded (detected atomically)
error!(
worker_id = self.id,
job_id,
duration_ms = duration.as_millis(),
retry_count = next_attempt,
max_retries = max_retries,
error = ?e,
"Job failed permanently (max retries exceeded), deleting"
);
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete failed job");
}
}
Err(e) => {
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock and increment retry count");
}
}
}
}

View File

@@ -7,12 +7,16 @@ use serenity::Client;
use serenity::all::{ActivityData, ClientBuilder, GatewayIntents};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, warn};
use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
/// Discord bot service implementation
pub struct BotService {
client: Client,
shard_manager: Arc<serenity::gateway::ShardManager>,
status_task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
status_shutdown_tx: Option<broadcast::Sender<()>>,
}
impl BotService {
@@ -20,6 +24,8 @@ impl BotService {
pub async fn create_client(
config: &Config,
app_state: AppState,
status_task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
status_shutdown_rx: broadcast::Receiver<()>,
) -> Result<Client, anyhow::Error> {
let intents = GatewayIntents::non_privileged();
let bot_target_guild = config.bot_target_guild;
@@ -74,6 +80,7 @@ impl BotService {
})
.setup(move |ctx, _ready, framework| {
let app_state = app_state.clone();
let status_task_handle = status_task_handle.clone();
Box::pin(async move {
poise::builtins::register_in_guild(
ctx,
@@ -83,8 +90,13 @@ impl BotService {
.await?;
poise::builtins::register_globally(ctx, &framework.options().commands).await?;
// Start status update task
Self::start_status_update_task(ctx.clone(), app_state.clone()).await;
// Start status update task with shutdown support
let handle = Self::start_status_update_task(
ctx.clone(),
app_state.clone(),
status_shutdown_rx,
);
*status_task_handle.lock().await = Some(handle);
Ok(Data { app_state })
})
@@ -96,8 +108,12 @@ impl BotService {
.await?)
}
/// Start the status update task for the Discord bot
async fn start_status_update_task(ctx: serenity::client::Context, app_state: AppState) {
/// Start the status update task for the Discord bot with graceful shutdown support
fn start_status_update_task(
ctx: serenity::client::Context,
app_state: AppState,
mut shutdown_rx: broadcast::Receiver<()>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let max_interval = Duration::from_secs(300); // 5 minutes
let base_interval = Duration::from_secs(30);
@@ -106,59 +122,72 @@ impl BotService {
// This runs once immediately on startup, then with adaptive intervals
loop {
interval.tick().await;
// Get the course count, update the activity if it has changed/hasn't been set this session
let course_count = app_state.get_course_count().await.unwrap();
if previous_course_count.is_none() || previous_course_count != Some(course_count) {
ctx.set_activity(Some(ActivityData::playing(format!(
"Querying {:} classes",
course_count.to_formatted_string(&Locale::en)
))));
}
// Increase or reset the interval
interval = tokio::time::interval(
// Avoid logging the first 'change'
if course_count != previous_course_count.unwrap_or(0) {
if previous_course_count.is_some() {
debug!(
new_course_count = course_count,
last_interval = interval.period().as_secs(),
"Course count changed, resetting interval"
);
tokio::select! {
_ = interval.tick() => {
// Get the course count, update the activity if it has changed/hasn't been set this session
let course_count = app_state.get_course_count().await.unwrap();
if previous_course_count.is_none() || previous_course_count != Some(course_count) {
ctx.set_activity(Some(ActivityData::playing(format!(
"Querying {:} classes",
course_count.to_formatted_string(&Locale::en)
))));
}
// Record the new course count
previous_course_count = Some(course_count);
// Increase or reset the interval
interval = tokio::time::interval(
// Avoid logging the first 'change'
if course_count != previous_course_count.unwrap_or(0) {
if previous_course_count.is_some() {
debug!(
new_course_count = course_count,
last_interval = interval.period().as_secs(),
"Course count changed, resetting interval"
);
}
// Reset to base interval
base_interval
} else {
// Increase interval by 10% (up to maximum)
let new_interval = interval.period().mul_f32(1.1).min(max_interval);
debug!(
current_course_count = course_count,
last_interval = interval.period().as_secs(),
new_interval = new_interval.as_secs(),
"Course count unchanged, increasing interval"
// Record the new course count
previous_course_count = Some(course_count);
// Reset to base interval
base_interval
} else {
// Increase interval by 10% (up to maximum)
let new_interval = interval.period().mul_f32(1.1).min(max_interval);
debug!(
current_course_count = course_count,
last_interval = interval.period().as_secs(),
new_interval = new_interval.as_secs(),
"Course count unchanged, increasing interval"
);
new_interval
},
);
new_interval
},
);
// Reset the interval, otherwise it will tick again immediately
interval.reset();
// Reset the interval, otherwise it will tick again immediately
interval.reset();
}
_ = shutdown_rx.recv() => {
info!("Status update task received shutdown signal");
break;
}
}
}
});
})
}
pub fn new(client: Client) -> Self {
pub fn new(
client: Client,
status_task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
status_shutdown_tx: broadcast::Sender<()>,
) -> Self {
let shard_manager = client.shard_manager.clone();
Self {
client,
shard_manager,
status_task_handle,
status_shutdown_tx: Some(status_shutdown_tx),
}
}
}
@@ -183,6 +212,28 @@ impl Service for BotService {
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
// Signal status update task to stop
if let Some(status_shutdown_tx) = self.status_shutdown_tx.take() {
let _ = status_shutdown_tx.send(());
}
// Wait for status update task to complete (with timeout)
let handle = self.status_task_handle.lock().await.take();
if let Some(handle) = handle {
match tokio::time::timeout(Duration::from_secs(2), handle).await {
Ok(Ok(())) => {
debug!("Status update task completed gracefully");
}
Ok(Err(e)) => {
warn!(error = ?e, "Status update task panicked");
}
Err(_) => {
warn!("Status update task did not complete within 2s timeout");
}
}
}
// Shutdown Discord shards
self.shard_manager.shutdown_all().await;
Ok(())
}

View File

@@ -1,15 +1,16 @@
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, info, trace, warn};
use crate::services::{Service, ServiceResult, run_service};
/// Manages multiple services and their lifecycle
pub struct ServiceManager {
registered_services: HashMap<String, Box<dyn Service>>,
running_services: HashMap<String, JoinHandle<ServiceResult>>,
service_handles: HashMap<String, tokio::task::AbortHandle>,
completion_rx: Option<mpsc::UnboundedReceiver<(String, ServiceResult)>>,
completion_tx: mpsc::UnboundedSender<(String, ServiceResult)>,
shutdown_tx: broadcast::Sender<()>,
}
@@ -22,9 +23,13 @@ impl Default for ServiceManager {
impl ServiceManager {
pub fn new() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
let (completion_tx, completion_rx) = mpsc::unbounded_channel();
Self {
registered_services: HashMap::new(),
running_services: HashMap::new(),
service_handles: HashMap::new(),
completion_rx: Some(completion_rx),
completion_tx,
shutdown_tx,
}
}
@@ -46,9 +51,20 @@ impl ServiceManager {
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
let completion_tx = self.completion_tx.clone();
let name_clone = name.clone();
// Spawn service task
let handle = tokio::spawn(async move {
let result = run_service(service, shutdown_rx).await;
// Send completion notification
let _ = completion_tx.send((name_clone, result));
});
// Store abort handle for shutdown control
self.service_handles
.insert(name.clone(), handle.abort_handle());
debug!(service = name, id = ?handle.id(), "service spawned");
self.running_services.insert(name, handle);
}
info!(
@@ -62,7 +78,7 @@ impl ServiceManager {
/// Run all services until one completes or fails
/// Returns the first service that completes and its result
pub async fn run(&mut self) -> (String, ServiceResult) {
if self.running_services.is_empty() {
if self.service_handles.is_empty() {
return (
"none".to_string(),
ServiceResult::Error(anyhow::anyhow!("No services to run")),
@@ -71,99 +87,134 @@ impl ServiceManager {
info!(
"servicemanager running {} services",
self.running_services.len()
self.service_handles.len()
);
// Wait for any service to complete
loop {
let mut completed_services = Vec::new();
// Wait for any service to complete via the channel
let completion_rx = self
.completion_rx
.as_mut()
.expect("completion_rx should be available");
for (name, handle) in &mut self.running_services {
if handle.is_finished() {
completed_services.push(name.clone());
}
}
if let Some(completed_name) = completed_services.first() {
let handle = self.running_services.remove(completed_name).unwrap();
match handle.await {
Ok(result) => {
return (completed_name.clone(), result);
}
Err(e) => {
error!(service = completed_name, "service task panicked: {e}");
return (
completed_name.clone(),
ServiceResult::Error(anyhow::anyhow!("Task panic: {e}")),
);
}
}
}
// Small delay to prevent busy-waiting
tokio::time::sleep(Duration::from_millis(10)).await;
}
completion_rx
.recv()
.await
.map(|(name, result)| {
self.service_handles.remove(&name);
(name, result)
})
.unwrap_or_else(|| {
(
"channel_closed".to_string(),
ServiceResult::Error(anyhow::anyhow!("Completion channel closed")),
)
})
}
/// Shutdown all services gracefully with a timeout.
///
/// If any service fails to shutdown, it will return an error containing the names of the services that failed to shutdown.
/// If all services shutdown successfully, the function will return the duration elapsed.
/// All services receive the shutdown signal simultaneously and shut down in parallel.
/// Each service gets the full timeout duration (they don't share/consume from a budget).
/// If any service fails to shutdown within the timeout, it will be aborted.
///
/// Returns the elapsed time if all succeed, or a list of failed service names.
pub async fn shutdown(&mut self, timeout: Duration) -> Result<Duration, Vec<String>> {
let service_count = self.running_services.len();
let service_names: Vec<_> = self.running_services.keys().cloned().collect();
let service_count = self.service_handles.len();
let service_names: Vec<_> = self.service_handles.keys().cloned().collect();
info!(
service_count,
services = ?service_names,
timeout = format!("{:.2?}", timeout),
"shutting down {} services with {:?} timeout",
"shutting down {} services in parallel with {:?} timeout each",
service_count,
timeout
);
// Send shutdown signal to all services
if service_count == 0 {
return Ok(Duration::ZERO);
}
// Send shutdown signal to all services simultaneously
let _ = self.shutdown_tx.send(());
// Wait for all services to complete
let start_time = std::time::Instant::now();
let mut pending_services = Vec::new();
for (name, handle) in self.running_services.drain() {
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(_)) => {
trace!(service = name, "service shutdown completed");
// Collect results from all services with timeout
let completion_rx = self
.completion_rx
.as_mut()
.expect("completion_rx should be available");
// Collect all completion results with a single timeout
let collect_future = async {
let mut collected: Vec<Option<(String, ServiceResult)>> = Vec::new();
for _ in 0..service_count {
if let Some(result) = completion_rx.recv().await {
collected.push(Some(result));
} else {
collected.push(None);
}
Ok(Err(e)) => {
warn!(service = name, error = ?e, "service shutdown failed");
pending_services.push(name);
}
Err(_) => {
warn!(service = name, "service shutdown timed out");
pending_services.push(name);
}
collected
};
let results = match tokio::time::timeout(timeout, collect_future).await {
Ok(results) => results,
Err(_) => {
// Timeout exceeded - abort all remaining services
warn!(
timeout = format!("{:.2?}", timeout),
"shutdown timeout exceeded - aborting all remaining services"
);
let failed: Vec<String> = self.service_handles.keys().cloned().collect();
for handle in self.service_handles.values() {
handle.abort();
}
self.service_handles.clear();
return Err(failed);
}
};
// Process results and identify failures
let mut failed_services = Vec::new();
for (name, service_result) in results.into_iter().flatten() {
self.service_handles.remove(&name);
if matches!(service_result, ServiceResult::GracefulShutdown) {
trace!(service = name, "service shutdown completed");
} else {
warn!(
service = name,
result = ?service_result,
"service shutdown with non-graceful result"
);
failed_services.push(name);
}
}
let elapsed = start_time.elapsed();
if pending_services.is_empty() {
if failed_services.is_empty() {
info!(
service_count,
elapsed = format!("{:.2?}", elapsed),
"services shutdown completed: {}",
"all services shutdown successfully: {}",
service_names.join(", ")
);
Ok(elapsed)
} else {
warn!(
pending_count = pending_services.len(),
pending_services = ?pending_services,
failed_count = failed_services.len(),
failed_services = ?failed_services,
elapsed = format!("{:.2?}", elapsed),
"services shutdown completed with {} pending: {}",
pending_services.len(),
pending_services.join(", ")
"{} service(s) failed to shutdown gracefully: {}",
failed_services.len(),
failed_services.join(", ")
);
Err(pending_services)
Err(failed_services)
}
}
}

View File

@@ -23,7 +23,11 @@ pub trait Service: Send + Sync {
/// Gracefully shutdown the service
///
/// An 'Ok' result does not mean the service has completed shutdown, it merely means that the service shutdown was initiated.
/// Implementations should initiate shutdown and MAY wait for completion.
/// Services are expected to respond to this call and begin cleanup promptly.
/// When managed by ServiceManager, the configured timeout (default 8s) applies to
/// ALL services combined, not per-service. Services should complete shutdown as
/// quickly as possible to avoid timeout.
async fn shutdown(&mut self) -> Result<(), anyhow::Error>;
}

View File

@@ -3,7 +3,7 @@ use crate::web::{BannerState, create_router};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{info, warn, trace};
use tracing::{info, trace, warn};
/// Web server service implementation
pub struct WebService {
@@ -33,16 +33,12 @@ impl Service for WebService {
let app = create_router(self.banner_state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
info!(
service = "web",
link = format!("http://localhost:{}", addr.port()),
"starting web server",
);
let listener = TcpListener::bind(addr).await?;
info!(
service = "web",
address = %addr,
link = format!("http://localhost:{}", addr.port()),
"web server listening"
);
@@ -61,13 +57,16 @@ impl Service for WebService {
})
.await?;
trace!(service = "web", "graceful shutdown completed");
info!(service = "web", "web server stopped");
Ok(())
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
trace!(service = "web", "sent shutdown signal to axum");
} else {
warn!(
service = "web",

1
src/utils/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod shutdown;

32
src/utils/shutdown.rs Normal file
View File

@@ -0,0 +1,32 @@
use tokio::task::JoinHandle;
use tracing::warn;
/// Helper for joining multiple task handles with proper error handling.
///
/// This function waits for all tasks to complete and reports any that panicked.
/// Returns an error if any task panicked, otherwise returns Ok.
pub async fn join_tasks(handles: Vec<JoinHandle<()>>) -> Result<(), anyhow::Error> {
let results = futures::future::join_all(handles).await;
let failed = results.iter().filter(|r| r.is_err()).count();
if failed > 0 {
warn!(failed_count = failed, "Some tasks panicked during shutdown");
Err(anyhow::anyhow!("{} task(s) panicked", failed))
} else {
Ok(())
}
}
/// Helper for joining multiple task handles with a timeout.
///
/// Waits for all tasks to complete within the specified timeout.
/// If timeout occurs, remaining tasks are aborted.
pub async fn join_tasks_with_timeout(
handles: Vec<JoinHandle<()>>,
timeout: std::time::Duration,
) -> Result<(), anyhow::Error> {
match tokio::time::timeout(timeout, join_tasks(handles)).await {
Ok(result) => result,
Err(_) => Err(anyhow::anyhow!("Task join timed out after {:?}", timeout)),
}
}

39
tests/basic_test.rs Normal file
View File

@@ -0,0 +1,39 @@
use banner::utils::shutdown::join_tasks;
use tokio::task::JoinHandle;
#[tokio::test]
async fn test_join_tasks_success() {
// Create some tasks that complete successfully
let handles: Vec<JoinHandle<()>> = vec![
tokio::spawn(async { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await }),
tokio::spawn(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await }),
tokio::spawn(async { /* immediate completion */ }),
];
// All tasks should complete successfully
let result = join_tasks(handles).await;
assert!(
result.is_ok(),
"Expected all tasks to complete successfully"
);
}
#[tokio::test]
async fn test_join_tasks_with_panic() {
// Create some tasks, including one that panics
let handles: Vec<JoinHandle<()>> = vec![
tokio::spawn(async { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await }),
tokio::spawn(async { panic!("intentional test panic") }),
tokio::spawn(async { /* immediate completion */ }),
];
// Should return an error because one task panicked
let result = join_tasks(handles).await;
assert!(result.is_err(), "Expected an error when a task panics");
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("1 task(s) panicked"),
"Error message should mention panicked tasks"
);
}

30
web/biome.json Normal file
View File

@@ -0,0 +1,30 @@
{
"$schema": "https://biomejs.dev/schemas/1.9.4/schema.json",
"vcs": {
"enabled": true,
"clientKind": "git",
"useIgnoreFile": true
},
"files": {
"ignoreUnknown": false,
"ignore": ["dist/", "node_modules/", ".tanstack/"]
},
"formatter": {
"enabled": true,
"indentStyle": "space",
"indentWidth": 2,
"lineWidth": 100,
"lineEnding": "lf"
},
"javascript": {
"formatter": {
"quoteStyle": "double",
"trailingCommas": "es5",
"semicolons": "always",
"arrowParentheses": "always"
}
},
"linter": {
"enabled": false
}
}

1297
web/bun.lock Normal file
View File

File diff suppressed because it is too large Load Diff

60
web/eslint.config.js Normal file
View File

@@ -0,0 +1,60 @@
import js from "@eslint/js";
import tseslint from "typescript-eslint";
import react from "eslint-plugin-react";
import reactHooks from "eslint-plugin-react-hooks";
import reactRefresh from "eslint-plugin-react-refresh";
export default tseslint.config(
// Ignore generated files and build outputs
{
ignores: ["dist", "node_modules", "src/routeTree.gen.ts", "*.config.js"],
},
// Base configs
js.configs.recommended,
...tseslint.configs.recommendedTypeChecked,
// React plugin configuration
{
files: ["**/*.{ts,tsx}"],
plugins: {
react,
"react-hooks": reactHooks,
"react-refresh": reactRefresh,
},
languageOptions: {
parserOptions: {
project: true,
tsconfigRootDir: import.meta.dirname,
ecmaFeatures: {
jsx: true,
},
},
},
settings: {
react: {
version: "19.0",
},
},
rules: {
// React rules
...react.configs.recommended.rules,
...react.configs["jsx-runtime"].rules,
...reactHooks.configs.recommended.rules,
// React Refresh
"react-refresh/only-export-components": ["warn", { allowConstantExport: true }],
// TypeScript overrides
"@typescript-eslint/no-unused-vars": [
"error",
{
argsIgnorePattern: "^_",
varsIgnorePattern: "^_",
},
],
"@typescript-eslint/no-explicit-any": "warn",
// Disable prop-types since we're using TypeScript
"react/prop-types": "off",
},
}
);

View File

@@ -7,7 +7,11 @@
"start": "vite --port 3000",
"build": "vite build && tsc",
"serve": "vite preview",
"test": "vitest run"
"test": "vitest run",
"lint": "tsc && eslint . --ext .ts,.tsx",
"typecheck": "tsc --noEmit",
"format": "biome format --write .",
"format:check": "biome format ."
},
"dependencies": {
"@radix-ui/themes": "^3.2.1",
@@ -23,14 +27,21 @@
"recharts": "^3.2.0"
},
"devDependencies": {
"@biomejs/biome": "^1.9.4",
"@eslint/js": "^9.39.0",
"@testing-library/dom": "^10.4.0",
"@testing-library/react": "^16.2.0",
"@types/node": "^24.3.3",
"@types/react": "^19.0.8",
"@types/react-dom": "^19.0.3",
"@vitejs/plugin-react": "^4.3.4",
"eslint": "^9.39.0",
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-react-hooks": "^7.0.1",
"eslint-plugin-react-refresh": "^0.4.24",
"jsdom": "^26.0.0",
"typescript": "^5.7.2",
"typescript-eslint": "^8.46.2",
"vite": "^6.3.5",
"vitest": "^3.0.5",
"web-vitals": "^4.2.4"

4620
web/pnpm-lock.yaml generated
View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,6 @@
.App {
min-height: 100vh;
font-family:
-apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
"Cantarell", "Fira Sans", "Droid Sans", "Helvetica Neue", sans-serif;
background-color: var(--color-background);
color: var(--color-text);

View File

@@ -38,12 +38,10 @@ export class BannerApiClient {
const response = await fetch(`${this.baseUrl}${endpoint}`);
if (!response.ok) {
throw new Error(
`API request failed: ${response.status} ${response.statusText}`
);
throw new Error(`API request failed: ${response.status} ${response.statusText}`);
}
return response.json();
return (await response.json()) as T;
}
async getHealth(): Promise<HealthResponse> {

View File

@@ -1,13 +1,13 @@
const reportWebVitals = (onPerfEntry?: () => void) => {
if (onPerfEntry && onPerfEntry instanceof Function) {
import('web-vitals').then(({ onCLS, onINP, onFCP, onLCP, onTTFB }) => {
onCLS(onPerfEntry)
onINP(onPerfEntry)
onFCP(onPerfEntry)
onLCP(onPerfEntry)
onTTFB(onPerfEntry)
})
void import("web-vitals").then(({ onCLS, onINP, onFCP, onLCP, onTTFB }) => {
onCLS(onPerfEntry);
onINP(onPerfEntry);
onFCP(onPerfEntry);
onLCP(onPerfEntry);
onTTFB(onPerfEntry);
});
}
}
};
export default reportWebVitals
export default reportWebVitals;

View File

@@ -8,52 +8,52 @@
// You should NOT make any changes in this file as it will be overwritten.
// Additionally, you should also exclude this file from your linter and/or formatter to prevent it from being checked or modified.
import { Route as rootRouteImport } from './routes/__root'
import { Route as IndexRouteImport } from './routes/index'
import { Route as rootRouteImport } from "./routes/__root";
import { Route as IndexRouteImport } from "./routes/index";
const IndexRoute = IndexRouteImport.update({
id: '/',
path: '/',
id: "/",
path: "/",
getParentRoute: () => rootRouteImport,
} as any)
} as any);
export interface FileRoutesByFullPath {
'/': typeof IndexRoute
"/": typeof IndexRoute;
}
export interface FileRoutesByTo {
'/': typeof IndexRoute
"/": typeof IndexRoute;
}
export interface FileRoutesById {
__root__: typeof rootRouteImport
'/': typeof IndexRoute
__root__: typeof rootRouteImport;
"/": typeof IndexRoute;
}
export interface FileRouteTypes {
fileRoutesByFullPath: FileRoutesByFullPath
fullPaths: '/'
fileRoutesByTo: FileRoutesByTo
to: '/'
id: '__root__' | '/'
fileRoutesById: FileRoutesById
fileRoutesByFullPath: FileRoutesByFullPath;
fullPaths: "/";
fileRoutesByTo: FileRoutesByTo;
to: "/";
id: "__root__" | "/";
fileRoutesById: FileRoutesById;
}
export interface RootRouteChildren {
IndexRoute: typeof IndexRoute
IndexRoute: typeof IndexRoute;
}
declare module '@tanstack/react-router' {
declare module "@tanstack/react-router" {
interface FileRoutesByPath {
'/': {
id: '/'
path: '/'
fullPath: '/'
preLoaderRoute: typeof IndexRouteImport
parentRoute: typeof rootRouteImport
}
"/": {
id: "/";
path: "/";
fullPath: "/";
preLoaderRoute: typeof IndexRouteImport;
parentRoute: typeof rootRouteImport;
};
}
}
const rootRouteChildren: RootRouteChildren = {
IndexRoute: IndexRoute,
}
};
export const routeTree = rootRouteImport
._addFileChildren(rootRouteChildren)
._addFileTypes<FileRouteTypes>()
._addFileTypes<FileRouteTypes>();

View File

@@ -101,13 +101,11 @@ const getOverallHealth = (state: StatusState): Status | "Unreachable" => {
const getServices = (state: StatusState): Service[] => {
if (state.mode !== "response") return [];
return Object.entries(state.status.services).map(
([serviceId, serviceInfo]) => ({
name: serviceInfo.name,
status: serviceInfo.status,
icon: SERVICE_ICONS[serviceId] || SERVICE_ICONS.default,
})
);
return Object.entries(state.status.services).map(([serviceId, serviceInfo]) => ({
name: serviceInfo.name,
status: serviceInfo.status,
icon: SERVICE_ICONS[serviceId] || SERVICE_ICONS.default,
}));
};
const StatusDisplay = ({ status }: { status: Status | "Unreachable" }) => {
@@ -197,17 +195,11 @@ function App() {
// Create a timeout promise
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error("Request timeout")),
REQUEST_TIMEOUT
);
setTimeout(() => reject(new Error("Request timeout")), REQUEST_TIMEOUT);
});
// Race between the API call and timeout
const statusData = await Promise.race([
client.getStatus(),
timeoutPromise,
]);
const statusData = await Promise.race([client.getStatus(), timeoutPromise]);
const endTime = Date.now();
const responseTime = endTime - startTime;
@@ -219,8 +211,7 @@ function App() {
lastFetch: new Date(),
});
} catch (err) {
const errorMessage =
err instanceof Error ? err.message : "Failed to fetch data";
const errorMessage = err instanceof Error ? err.message : "Failed to fetch data";
// Check if it's a timeout error
if (errorMessage === "Request timeout") {
@@ -237,11 +228,11 @@ function App() {
}
// Schedule the next request after the current one completes
timeoutId = setTimeout(fetchData, REFRESH_INTERVAL);
timeoutId = setTimeout(() => void fetchData(), REFRESH_INTERVAL);
};
// Start the first request immediately
fetchData();
void fetchData();
return () => {
if (timeoutId) {
@@ -302,12 +293,8 @@ function App() {
<Flex direction="column" gap="3" style={{ marginTop: "16px" }}>
{shouldShowSkeleton
? // Show skeleton for 3 services during initial loading only
Array.from({ length: 3 }).map((_, index) => (
<SkeletonService key={index} />
))
: services.map((service) => (
<ServiceStatus key={service.name} service={service} />
))}
Array.from({ length: 3 }).map((_, index) => <SkeletonService key={index} />)
: services.map((service) => <ServiceStatus key={service.name} service={service} />)}
</Flex>
<Flex direction="column" gap="2" style={BORDER_STYLES}>
@@ -326,17 +313,11 @@ function App() {
{shouldShowLastFetch ? (
<TimingRow icon={Clock} name="Last Updated">
{isLoading ? (
<Text
size="2"
style={{ paddingBottom: "2px" }}
color="gray"
>
<Text size="2" style={{ paddingBottom: "2px" }} color="gray">
Loading...
</Text>
) : (
<Tooltip
content={`as of ${state.lastFetch.toLocaleTimeString()}`}
>
<Tooltip content={`as of ${state.lastFetch.toLocaleTimeString()}`}>
<abbr
style={{
cursor: "pointer",
@@ -363,12 +344,7 @@ function App() {
</Flex>
</Flex>
</Card>
<Flex
justify="center"
style={{ marginTop: "12px" }}
gap="2"
align="center"
>
<Flex justify="center" style={{ marginTop: "12px" }} gap="2" align="center">
{__APP_VERSION__ && (
<Text
size="1"

View File

@@ -2,14 +2,12 @@
body {
margin: 0;
font-family:
-apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen", "Ubuntu",
"Cantarell", "Fira Sans", "Droid Sans", "Helvetica Neue", sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
code {
font-family:
source-code-pro, Menlo, Monaco, Consolas, "Courier New", monospace;
font-family: source-code-pro, Menlo, Monaco, Consolas, "Courier New", monospace;
}

View File

@@ -11,6 +11,7 @@
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"verbatimModuleSyntax": true,
"isolatedModules": true,
"noEmit": true,
/* Linting */
@@ -22,7 +23,7 @@
"noUncheckedSideEffectImports": true,
"baseUrl": ".",
"paths": {
"@/*": ["./src/*"],
"@/*": ["./src/*"]
}
}
}

View File

@@ -7,10 +7,7 @@ import { readFileSync, existsSync } from "node:fs";
// Extract version from Cargo.toml
function getVersion() {
const filename = "Cargo.toml";
const paths = [
resolve(__dirname, filename),
resolve(__dirname, "..", filename),
];
const paths = [resolve(__dirname, filename), resolve(__dirname, "..", filename)];
for (const path of paths) {
try {