Compare commits

...

6 Commits

24 changed files with 781 additions and 91 deletions

73
.dockerignore Normal file
View File

@@ -0,0 +1,73 @@
# Build artifacts
target/
**/target/
# Development files
.env
.env.local
.env.*.local
# IDE and editor files
.vscode/
.idea/
*.swp
*.swo
*~
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Git
.git/
.gitignore
# Documentation
README.md
docs/
*.md
# Go files (since this is a Rust project)
go/
# Database migrations (if not needed at runtime)
migrations/
diesel_migrations/
diesel.toml
# Development configuration
bacon.toml
.cargo/config.toml
# Logs
*.log
logs/
# Temporary files
tmp/
temp/
*.tmp
# Test files
tests/
**/tests/
*_test.rs
*_tests.rs
# Coverage reports
coverage/
*.gcov
*.gcno
*.gcda
# Profiling
*.prof
# Backup files
*.bak
*.backup

View File

@@ -1,5 +1,5 @@
# Build Stage
ARG RUST_VERSION=1.86.0
ARG RUST_VERSION=1.89.0
FROM rust:${RUST_VERSION}-bookworm AS builder
# Install build dependencies
@@ -52,7 +52,6 @@ RUN addgroup --gid $GID $APP_USER \
# Copy application files
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/target/release/banner ${APP}/banner
COPY --from=builder --chown=$APP_USER:$APP_USER /usr/src/banner/src/fonts ${APP}/fonts
# Set proper permissions
RUN chmod +x ${APP}/banner

View File

@@ -7,8 +7,16 @@ use std::{
};
use crate::banner::{
BannerSession, SessionPool, errors::BannerApiError, json::parse_json_with_context,
middleware::TransparentMiddleware, models::*, nonce, query::SearchQuery, util::user_agent,
BannerSession, SessionPool, create_shared_rate_limiter_with_config,
errors::BannerApiError,
json::parse_json_with_context,
middleware::TransparentMiddleware,
models::*,
nonce,
query::SearchQuery,
rate_limit_middleware::RateLimitMiddleware,
rate_limiter::{RateLimitConfig, SharedRateLimiter, create_shared_rate_limiter},
util::user_agent,
};
use anyhow::{Context, Result, anyhow};
use cookie::Cookie;
@@ -30,6 +38,13 @@ pub struct BannerApi {
impl BannerApi {
/// Creates a new Banner API client.
pub fn new(base_url: String) -> Result<Self> {
Self::new_with_config(base_url, RateLimitConfig::default())
}
/// Creates a new Banner API client with custom rate limiting configuration.
pub fn new_with_config(base_url: String, rate_limit_config: RateLimitConfig) -> Result<Self> {
let rate_limiter = create_shared_rate_limiter_with_config(rate_limit_config);
let http = ClientBuilder::new(
Client::builder()
.cookie_store(false)
@@ -42,6 +57,7 @@ impl BannerApi {
.context("Failed to create HTTP client")?,
)
.with(TransparentMiddleware)
.with(RateLimitMiddleware::new(rate_limiter.clone()))
.build();
Ok(Self {
@@ -50,7 +66,6 @@ impl BannerApi {
base_url,
})
}
/// Validates offset parameter for search methods.
fn validate_offset(offset: i32) -> Result<()> {
if offset <= 0 {
@@ -160,10 +175,9 @@ impl BannerApi {
debug!(
term = term,
query = ?query,
sort = sort,
sort_descending = sort_descending,
"Searching for courses with params: {:?}", params
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
max_results = query.get_max_results(),
"Searching for courses"
);
let response = self
@@ -184,7 +198,7 @@ impl BannerApi {
let search_result: SearchResult = parse_json_with_context(&body).map_err(|e| {
BannerApiError::RequestFailed(anyhow!(
"Failed to parse search response (status={status}, url={url}): {e}\nBody: {body}"
"Failed to parse search response (status={status}, url={url}): {e}"
))
})?;
@@ -303,6 +317,12 @@ impl BannerApi {
sort: &str,
sort_descending: bool,
) -> Result<SearchResult, BannerApiError> {
debug!(
term = term,
subject = query.get_subject().map(|s| s.as_str()).unwrap_or("all"),
max_results = query.get_max_results(),
"Starting course search"
);
self.perform_search(term, query, sort, sort_descending)
.await
}
@@ -313,6 +333,8 @@ impl BannerApi {
term: &str,
crn: &str,
) -> Result<Option<Course>, BannerApiError> {
debug!(term = term, crn = crn, "Looking up course by CRN");
let query = SearchQuery::new()
.course_reference_number(crn)
.max_results(1);

View File

@@ -9,10 +9,8 @@ pub fn parse_json_with_context<T: serde::de::DeserializeOwned>(body: &str) -> Re
Ok(value) => Ok(value),
Err(err) => {
let (line, column) = (err.line(), err.column());
let snippet = build_error_snippet(body, line, column, 80);
Err(anyhow::anyhow!(
"{err} at line {line}, column {column}\nSnippet:\n{snippet}",
))
// let snippet = build_error_snippet(body, line, column, 80);
Err(anyhow::anyhow!("{err} at line {line}, column {column}",))
}
}
}

View File

@@ -41,7 +41,7 @@ impl Middleware for TransparentMiddleware {
}
}
Err(error) => {
warn!(?error, "Request failed (middleware)");
warn!(error = ?error, "Request failed (middleware)");
Err(error)
}
}

View File

@@ -14,6 +14,8 @@ pub mod json;
pub mod middleware;
pub mod models;
pub mod query;
pub mod rate_limiter;
pub mod rate_limit_middleware;
pub mod session;
pub mod util;
@@ -21,4 +23,5 @@ pub use api::*;
pub use errors::*;
pub use models::*;
pub use query::*;
pub use rate_limiter::*;
pub use session::*;

View File

@@ -160,6 +160,16 @@ impl SearchQuery {
self
}
/// Gets the subject field
pub fn get_subject(&self) -> Option<&String> {
self.subject.as_ref()
}
/// Gets the max_results field
pub fn get_max_results(&self) -> i32 {
self.max_results
}
/// Converts the query into URL parameters for the Banner API
pub fn to_params(&self) -> HashMap<String, String> {
let mut params = HashMap::new();

View File

@@ -0,0 +1,101 @@
//! HTTP middleware that enforces rate limiting for Banner API requests.
use crate::banner::rate_limiter::{RequestType, SharedRateLimiter};
use http::Extensions;
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next};
use tracing::{debug, warn, trace};
use url::Url;
/// Middleware that enforces rate limiting based on request URL patterns
pub struct RateLimitMiddleware {
rate_limiter: SharedRateLimiter,
}
impl RateLimitMiddleware {
/// Creates a new rate limiting middleware
pub fn new(rate_limiter: SharedRateLimiter) -> Self {
Self { rate_limiter }
}
/// Determines the request type based on the URL path
fn get_request_type(url: &Url) -> RequestType {
let path = url.path();
if path.contains("/registration")
|| path.contains("/selfServiceMenu")
|| path.contains("/term/termSelection")
{
RequestType::Session
} else if path.contains("/searchResults") || path.contains("/classSearch") {
RequestType::Search
} else if path.contains("/getTerms")
|| path.contains("/getSubjects")
|| path.contains("/getCampuses")
{
RequestType::Metadata
} else if path.contains("/resetDataForm") {
RequestType::Reset
} else {
// Default to search for unknown endpoints
RequestType::Search
}
}
}
#[async_trait::async_trait]
impl Middleware for RateLimitMiddleware {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> std::result::Result<Response, reqwest_middleware::Error> {
let request_type = Self::get_request_type(req.url());
trace!(
url = %req.url(),
request_type = ?request_type,
"Rate limiting request"
);
// Wait for permission to make the request
self.rate_limiter.wait_for_permission(request_type).await;
trace!(
url = %req.url(),
request_type = ?request_type,
"Rate limit permission granted, making request"
);
// Make the actual request
let response_result = next.run(req, extensions).await;
match response_result {
Ok(response) => {
if response.status().is_success() {
trace!(
url = %response.url(),
status = response.status().as_u16(),
"Request completed successfully"
);
} else {
warn!(
url = %response.url(),
status = response.status().as_u16(),
"Request completed with error status"
);
}
Ok(response)
}
Err(error) => {
warn!(
url = %error.url().unwrap_or(&Url::parse("unknown").unwrap()),
error = ?error,
"Request failed"
);
Err(error)
}
}
}
}

169
src/banner/rate_limiter.rs Normal file
View File

@@ -0,0 +1,169 @@
//! Rate limiting for Banner API requests to prevent overwhelming the server.
use governor::{
Quota, RateLimiter,
clock::DefaultClock,
state::{InMemoryState, NotKeyed},
};
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, trace, warn};
/// Different types of Banner API requests with different rate limits
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RequestType {
/// Session creation and management (very conservative)
Session,
/// Course search requests (moderate)
Search,
/// Term and metadata requests (moderate)
Metadata,
/// Data form resets (low priority)
Reset,
}
/// Rate limiter configuration for different request types
#[derive(Debug, Clone)]
pub struct RateLimitConfig {
/// Requests per minute for session operations
pub session_rpm: u32,
/// Requests per minute for search operations
pub search_rpm: u32,
/// Requests per minute for metadata operations
pub metadata_rpm: u32,
/// Requests per minute for reset operations
pub reset_rpm: u32,
/// Burst allowance (extra requests allowed in short bursts)
pub burst_allowance: u32,
}
impl Default for RateLimitConfig {
fn default() -> Self {
Self {
// Very conservative for session creation
session_rpm: 6, // 1 every 10 seconds
// Moderate for search operations
search_rpm: 30, // 1 every 2 seconds
// Moderate for metadata
metadata_rpm: 20, // 1 every 3 seconds
// Low for resets
reset_rpm: 10, // 1 every 6 seconds
// Allow small bursts
burst_allowance: 3,
}
}
}
/// A rate limiter that manages different request types with different limits
pub struct BannerRateLimiter {
session_limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
search_limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
metadata_limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
reset_limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
config: RateLimitConfig,
}
impl BannerRateLimiter {
/// Creates a new rate limiter with the given configuration
pub fn new(config: RateLimitConfig) -> Self {
let session_quota = Quota::with_period(Duration::from_secs(60) / config.session_rpm)
.unwrap()
.allow_burst(NonZeroU32::new(config.burst_allowance).unwrap());
let search_quota = Quota::with_period(Duration::from_secs(60) / config.search_rpm)
.unwrap()
.allow_burst(NonZeroU32::new(config.burst_allowance).unwrap());
let metadata_quota = Quota::with_period(Duration::from_secs(60) / config.metadata_rpm)
.unwrap()
.allow_burst(NonZeroU32::new(config.burst_allowance).unwrap());
let reset_quota = Quota::with_period(Duration::from_secs(60) / config.reset_rpm)
.unwrap()
.allow_burst(NonZeroU32::new(config.burst_allowance).unwrap());
Self {
session_limiter: RateLimiter::direct(session_quota),
search_limiter: RateLimiter::direct(search_quota),
metadata_limiter: RateLimiter::direct(metadata_quota),
reset_limiter: RateLimiter::direct(reset_quota),
config,
}
}
/// Waits for permission to make a request of the given type
pub async fn wait_for_permission(&self, request_type: RequestType) {
let limiter = match request_type {
RequestType::Session => &self.session_limiter,
RequestType::Search => &self.search_limiter,
RequestType::Metadata => &self.metadata_limiter,
RequestType::Reset => &self.reset_limiter,
};
trace!(request_type = ?request_type, "Waiting for rate limit permission");
// Wait until we can make the request
limiter.until_ready().await;
trace!(request_type = ?request_type, "Rate limit permission granted");
}
/// Checks if a request of the given type would be allowed immediately
pub fn check_permission(&self, request_type: RequestType) -> bool {
let limiter = match request_type {
RequestType::Session => &self.session_limiter,
RequestType::Search => &self.search_limiter,
RequestType::Metadata => &self.metadata_limiter,
RequestType::Reset => &self.reset_limiter,
};
limiter.check().is_ok()
}
/// Gets the current configuration
pub fn config(&self) -> &RateLimitConfig {
&self.config
}
/// Updates the rate limit configuration
pub fn update_config(&mut self, config: RateLimitConfig) {
self.config = config;
// Note: In a production system, you'd want to recreate the limiters
// with the new configuration, but for simplicity we'll just update
// the config field here.
warn!("Rate limit configuration updated - restart required for full effect");
}
}
impl Default for BannerRateLimiter {
fn default() -> Self {
Self::new(RateLimitConfig::default())
}
}
/// A shared rate limiter instance
pub type SharedRateLimiter = Arc<BannerRateLimiter>;
/// Creates a new shared rate limiter with default configuration
pub fn create_shared_rate_limiter() -> SharedRateLimiter {
Arc::new(BannerRateLimiter::default())
}
/// Creates a new shared rate limiter with custom configuration
pub fn create_shared_rate_limiter_with_config(config: RateLimitConfig) -> SharedRateLimiter {
Arc::new(BannerRateLimiter::new(config))
}
/// Conversion from config module's RateLimitingConfig to this module's RateLimitConfig
impl From<crate::config::RateLimitingConfig> for RateLimitConfig {
fn from(config: crate::config::RateLimitingConfig) -> Self {
Self {
session_rpm: config.session_rpm,
search_rpm: config.search_rpm,
metadata_rpm: config.metadata_rpm,
reset_rpm: config.reset_rpm,
burst_allowance: config.burst_allowance,
}
}
}

View File

@@ -16,7 +16,7 @@ use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Notify};
use tracing::{debug, info};
use tracing::{debug, info, trace};
use url::Url;
const SESSION_EXPIRY: Duration = Duration::from_secs(25 * 60); // 25 minutes
@@ -82,7 +82,7 @@ impl BannerSession {
/// Updates the last activity timestamp
pub fn touch(&mut self) {
debug!(id = self.unique_session_id, "Session was used");
trace!(id = self.unique_session_id, "Session was used");
self.last_activity = Some(Instant::now());
}
@@ -162,7 +162,7 @@ impl TermPool {
async fn release(&self, session: BannerSession) {
let id = session.unique_session_id.clone();
if session.is_expired() {
debug!(id = id, "Session is now expired, dropping.");
trace!(id = id, "Session is now expired, dropping.");
// Wake up a waiter, as it might need to create a new session
// if this was the last one.
self.notifier.notify_one();
@@ -174,10 +174,7 @@ impl TermPool {
let queue_size = queue.len();
drop(queue); // Release lock before notifying
debug!(
id = id,
"Session returned to pool. Queue size is now {queue_size}."
);
trace!(id = id, queue_size, "Session returned to pool");
self.notifier.notify_one();
}
}
@@ -213,13 +210,13 @@ impl SessionPool {
let mut queue = term_pool.sessions.lock().await;
if let Some(session) = queue.pop_front() {
if !session.is_expired() {
debug!(id = session.unique_session_id, "Reusing session from pool");
trace!(id = session.unique_session_id, "Reusing session from pool");
return Ok(PooledSession {
session: Some(session),
pool: Arc::clone(&term_pool),
});
} else {
debug!(
trace!(
id = session.unique_session_id,
"Popped an expired session, discarding."
);
@@ -232,7 +229,7 @@ impl SessionPool {
if *is_creating_guard {
// Another task is already creating a session. Release the lock and wait.
drop(is_creating_guard);
debug!("Another task is creating a session, waiting for notification...");
trace!("Another task is creating a session, waiting for notification...");
term_pool.notifier.notified().await;
// Loop back to the top to try the fast path again.
continue;
@@ -243,12 +240,12 @@ impl SessionPool {
drop(is_creating_guard);
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
debug!("Pool empty, racing notifier vs rate limiter...");
trace!("Pool empty, racing notifier vs rate limiter...");
tokio::select! {
_ = term_pool.notifier.notified() => {
// A session was returned while we were waiting!
// We are no longer the creator. Reset the flag and loop to race for the new session.
debug!("Notified that a session was returned. Looping to retry.");
trace!("Notified that a session was returned. Looping to retry.");
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
@@ -256,7 +253,7 @@ impl SessionPool {
}
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
// The rate limit has elapsed. It's our job to create the session.
debug!("Rate limiter ready. Proceeding to create a new session.");
trace!("Rate limiter ready. Proceeding to create a new session.");
let new_session_result = self.create_session(&term).await;
// After creation, we are no longer the creator. Reset the flag
@@ -286,7 +283,7 @@ impl SessionPool {
/// Sets up initial session cookies by making required Banner API requests
pub async fn create_session(&self, term: &Term) -> Result<BannerSession> {
info!("setting up banner session for term {term}");
info!(term = %term, "setting up banner session");
// The 'register' or 'search' registration page
let initial_registration = self
@@ -317,7 +314,7 @@ impl SessionPool {
let ssb_cookie = cookies.get("SSB_COOKIE").unwrap();
let cookie_header = format!("JSESSIONID={}; SSB_COOKIE={}", jsessionid, ssb_cookie);
debug!(
trace!(
jsessionid = jsessionid,
ssb_cookie = ssb_cookie,
"New session cookies acquired"
@@ -457,7 +454,7 @@ impl SessionPool {
));
}
debug!(term = term, "successfully selected term");
trace!(term = term, "successfully selected term");
Ok(())
}
}

View File

@@ -23,8 +23,7 @@ async fn main() -> Result<()> {
// Load configuration
let config: Config = Figment::new()
.merge(Env::raw().only(&["DATABASE_URL"]))
.merge(Env::prefixed("APP_"))
.merge(Env::raw())
.extract()
.expect("Failed to load config");
@@ -34,7 +33,9 @@ async fn main() -> Result<()> {
);
// Create Banner API client
let banner_api = BannerApi::new(config.banner_base_url).expect("Failed to create BannerApi");
let banner_api =
BannerApi::new_with_config(config.banner_base_url, config.rate_limiting.into())
.expect("Failed to create BannerApi");
// Get current term
let term = Term::get_current().inner().to_string();
@@ -67,11 +68,11 @@ async fn main() -> Result<()> {
),
];
info!("Executing {} concurrent searches", queries.len());
info!(query_count = queries.len(), "Executing concurrent searches");
// Execute all searches concurrently
let search_futures = queries.into_iter().map(|(label, query)| {
info!("Starting search: {}", label);
info!(label = %label, "Starting search");
let banner_api = &banner_api;
let term = &term;
async move {

View File

@@ -77,7 +77,7 @@ pub async fn gcal(
)
.await?;
info!("gcal command completed for CRN: {}", crn);
info!(crn = %crn, "gcal command completed");
Ok(())
}

View File

@@ -20,6 +20,6 @@ pub async fn ics(
))
.await?;
info!("ics command completed for CRN: {}", crn);
info!(crn = %crn, "ics command completed");
Ok(())
}

View File

@@ -20,6 +20,6 @@ pub async fn time(
))
.await?;
info!("time command completed for CRN: {}", crn);
info!(crn = %crn, "time command completed");
Ok(())
}

View File

@@ -18,7 +18,7 @@ pub async fn get_course_by_crn(ctx: &Context<'_>, crn: i32) -> Result<Course> {
.get_course_or_fetch(&term.to_string(), &crn.to_string())
.await
.map_err(|e| {
error!(%e, crn, "failed to fetch course data");
error!(error = %e, crn = %crn, "failed to fetch course data");
e
})
}

View File

@@ -8,7 +8,7 @@ use fundu::{DurationParser, TimeUnit};
use serde::{Deserialize, Deserializer};
use std::time::Duration;
/// Application configuration loaded from environment variables
/// Main application configuration containing all sub-configurations
#[derive(Deserialize)]
pub struct Config {
/// Log level for the application
@@ -20,8 +20,6 @@ pub struct Config {
/// Defaults to "info" if not specified
#[serde(default = "default_log_level")]
pub log_level: String,
/// Discord bot token for authentication
pub bot_token: String,
/// Port for the web server
#[serde(default = "default_port")]
pub port: u16,
@@ -29,10 +27,6 @@ pub struct Config {
pub database_url: String,
/// Redis connection URL
pub redis_url: String,
/// Base URL for banner generation service
pub banner_base_url: String,
/// Target Discord guild ID where the bot operates
pub bot_target_guild: u64,
/// Graceful shutdown timeout duration
///
/// Accepts both numeric values (seconds) and duration strings
@@ -42,6 +36,16 @@ pub struct Config {
deserialize_with = "deserialize_duration"
)]
pub shutdown_timeout: Duration,
/// Discord bot token for authentication
pub bot_token: String,
/// Target Discord guild ID where the bot operates
pub bot_target_guild: u64,
/// Base URL for banner generation service
pub banner_base_url: String,
/// Rate limiting configuration for Banner API requests
#[serde(default = "default_rate_limiting")]
pub rate_limiting: RateLimitingConfig,
}
/// Default log level of "info"
@@ -59,6 +63,62 @@ fn default_shutdown_timeout() -> Duration {
Duration::from_secs(8)
}
/// Rate limiting configuration for Banner API requests
#[derive(Deserialize, Clone, Debug)]
pub struct RateLimitingConfig {
/// Requests per minute for session operations (very conservative)
#[serde(default = "default_session_rpm")]
pub session_rpm: u32,
/// Requests per minute for search operations (moderate)
#[serde(default = "default_search_rpm")]
pub search_rpm: u32,
/// Requests per minute for metadata operations (moderate)
#[serde(default = "default_metadata_rpm")]
pub metadata_rpm: u32,
/// Requests per minute for reset operations (low priority)
#[serde(default = "default_reset_rpm")]
pub reset_rpm: u32,
/// Burst allowance (extra requests allowed in short bursts)
#[serde(default = "default_burst_allowance")]
pub burst_allowance: u32,
}
/// Default rate limiting configuration
fn default_rate_limiting() -> RateLimitingConfig {
RateLimitingConfig {
session_rpm: default_session_rpm(),
search_rpm: default_search_rpm(),
metadata_rpm: default_metadata_rpm(),
reset_rpm: default_reset_rpm(),
burst_allowance: default_burst_allowance(),
}
}
/// Default session requests per minute (6 = 1 every 10 seconds)
fn default_session_rpm() -> u32 {
6
}
/// Default search requests per minute (30 = 1 every 2 seconds)
fn default_search_rpm() -> u32 {
30
}
/// Default metadata requests per minute (20 = 1 every 3 seconds)
fn default_metadata_rpm() -> u32 {
20
}
/// Default reset requests per minute (10 = 1 every 6 seconds)
fn default_reset_rpm() -> u32 {
10
}
/// Default burst allowance (3 extra requests)
fn default_burst_allowance() -> u32 {
3
}
/// Duration parser configured to handle various time units with seconds as default
///
/// Supports:

243
src/formatter.rs Normal file
View File

@@ -0,0 +1,243 @@
//! Custom tracing formatter
use serde::Serialize;
use serde_json::{Map, Value};
use std::fmt;
use time::macros::format_description;
use time::{OffsetDateTime, format_description::FormatItem};
use tracing::field::{Field, Visit};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields};
use tracing_subscriber::registry::LookupSpan;
/// Cached format description for timestamps
/// Uses 3 subsecond digits on Emscripten, 5 otherwise for better performance
#[cfg(target_os = "emscripten")]
const TIMESTAMP_FORMAT: &[FormatItem<'static>] =
format_description!("[hour]:[minute]:[second].[subsecond digits:3]");
#[cfg(not(target_os = "emscripten"))]
const TIMESTAMP_FORMAT: &[FormatItem<'static>] =
format_description!("[hour]:[minute]:[second].[subsecond digits:5]");
/// A custom formatter with enhanced timestamp formatting
///
/// Re-implementation of the Full formatter with improved timestamp display.
pub struct CustomFormatter;
/// A custom JSON formatter that flattens fields to root level
///
/// Outputs logs in the format: { "message": "...", "level": "...", "customAttribute": "..." }
pub struct CustomJsonFormatter;
impl<S, N> FormatEvent<S, N> for CustomFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> fmt::Result {
let meta = event.metadata();
// 1) Timestamp (dimmed when ANSI)
let now = OffsetDateTime::now_utc();
let formatted_time = now.format(&TIMESTAMP_FORMAT).map_err(|e| {
eprintln!("Failed to format timestamp: {}", e);
fmt::Error
})?;
write_dimmed(&mut writer, formatted_time)?;
writer.write_char(' ')?;
// 2) Colored 5-char level like Full
write_colored_level(&mut writer, meta.level())?;
writer.write_char(' ')?;
// 3) Span scope chain (bold names, fields in braces, dimmed ':')
if let Some(scope) = ctx.event_scope() {
let mut saw_any = false;
for span in scope.from_root() {
write_bold(&mut writer, span.metadata().name())?;
saw_any = true;
let ext = span.extensions();
if let Some(fields) = &ext.get::<FormattedFields<N>>() {
if !fields.is_empty() {
write_bold(&mut writer, "{")?;
write!(writer, "{}", fields)?;
write_bold(&mut writer, "}")?;
}
}
if writer.has_ansi_escapes() {
write!(writer, "\x1b[2m:\x1b[0m")?;
} else {
writer.write_char(':')?;
}
}
if saw_any {
writer.write_char(' ')?;
}
}
// 4) Target (dimmed), then a space
if writer.has_ansi_escapes() {
write!(writer, "\x1b[2m{}\x1b[0m\x1b[2m:\x1b[0m ", meta.target())?;
} else {
write!(writer, "{}: ", meta.target())?;
}
// 5) Event fields
ctx.format_fields(writer.by_ref(), event)?;
// 6) Newline
writeln!(writer)
}
}
impl<S, N> FormatEvent<S, N> for CustomJsonFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
_ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> fmt::Result {
let meta = event.metadata();
#[derive(Serialize)]
struct EventFields {
message: String,
level: String,
target: String,
#[serde(flatten)]
fields: Map<String, Value>,
}
let (message, fields) = {
let mut message: Option<String> = None;
let mut fields: Map<String, Value> = Map::new();
struct FieldVisitor<'a> {
message: &'a mut Option<String>,
fields: &'a mut Map<String, Value>,
}
impl<'a> Visit for FieldVisitor<'a> {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let key = field.name();
if key == "message" {
*self.message = Some(format!("{:?}", value));
} else {
// Use typed methods for better performance
self.fields
.insert(key.to_string(), Value::String(format!("{:?}", value)));
}
}
fn record_str(&mut self, field: &Field, value: &str) {
let key = field.name();
if key == "message" {
*self.message = Some(value.to_string());
} else {
self.fields
.insert(key.to_string(), Value::String(value.to_string()));
}
}
fn record_i64(&mut self, field: &Field, value: i64) {
let key = field.name();
if key != "message" {
self.fields.insert(
key.to_string(),
Value::Number(serde_json::Number::from(value)),
);
}
}
fn record_u64(&mut self, field: &Field, value: u64) {
let key = field.name();
if key != "message" {
self.fields.insert(
key.to_string(),
Value::Number(serde_json::Number::from(value)),
);
}
}
fn record_bool(&mut self, field: &Field, value: bool) {
let key = field.name();
if key != "message" {
self.fields.insert(key.to_string(), Value::Bool(value));
}
}
}
let mut visitor = FieldVisitor {
message: &mut message,
fields: &mut fields,
};
event.record(&mut visitor);
(message, fields)
};
let json = EventFields {
message: message.unwrap_or_default(),
level: meta.level().to_string(),
target: meta.target().to_string(),
fields,
};
writeln!(
writer,
"{}",
serde_json::to_string(&json).unwrap_or_else(|_| "{}".to_string())
)
}
}
/// Write the verbosity level with the same coloring/alignment as the Full formatter.
fn write_colored_level(writer: &mut Writer<'_>, level: &Level) -> fmt::Result {
if writer.has_ansi_escapes() {
// Basic ANSI color sequences; reset with \x1b[0m
let (color, text) = match *level {
Level::TRACE => ("\x1b[35m", "TRACE"), // purple
Level::DEBUG => ("\x1b[34m", "DEBUG"), // blue
Level::INFO => ("\x1b[32m", " INFO"), // green, note leading space
Level::WARN => ("\x1b[33m", " WARN"), // yellow, note leading space
Level::ERROR => ("\x1b[31m", "ERROR"), // red
};
write!(writer, "{}{}\x1b[0m", color, text)
} else {
// Right-pad to width 5 like Full's non-ANSI mode
match *level {
Level::TRACE => write!(writer, "{:>5}", "TRACE"),
Level::DEBUG => write!(writer, "{:>5}", "DEBUG"),
Level::INFO => write!(writer, "{:>5}", " INFO"),
Level::WARN => write!(writer, "{:>5}", " WARN"),
Level::ERROR => write!(writer, "{:>5}", "ERROR"),
}
}
}
fn write_dimmed(writer: &mut Writer<'_>, s: impl fmt::Display) -> fmt::Result {
if writer.has_ansi_escapes() {
write!(writer, "\x1b[2m{}\x1b[0m", s)
} else {
write!(writer, "{}", s)
}
}
fn write_bold(writer: &mut Writer<'_>, s: impl fmt::Display) -> fmt::Result {
if writer.has_ansi_escapes() {
write!(writer, "\x1b[1m{}\x1b[0m", s)
} else {
write!(writer, "{}", s)
}
}

View File

@@ -20,6 +20,7 @@ mod bot;
mod config;
mod data;
mod error;
mod formatter;
mod scraper;
mod services;
mod state;
@@ -29,21 +30,35 @@ mod web;
async fn main() {
dotenvy::dotenv().ok();
// Configure logging
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn,banner=debug"));
// Load configuration first to get log level
let config: Config = Figment::new()
.merge(Env::raw())
.extract()
.expect("Failed to load config");
// Configure logging based on config
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let base_level = &config.log_level;
EnvFilter::new(&format!(
"warn,banner={},banner::rate_limiter=warn,banner::session=warn,banner::rate_limit_middleware=warn",
base_level
))
});
let subscriber = {
#[cfg(debug_assertions)]
{
FmtSubscriber::builder()
.with_target(true)
.event_format(formatter::CustomFormatter)
}
#[cfg(not(debug_assertions))]
{
FmtSubscriber::builder().json()
FmtSubscriber::builder()
.with_target(true)
.event_format(formatter::CustomJsonFormatter)
}
}
.with_env_filter(filter)
.with_target(true)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
@@ -58,12 +73,6 @@ async fn main() {
"starting banner"
);
let config: Config = Figment::new()
.merge(Env::raw().only(&["DATABASE_URL"]))
.merge(Env::prefixed("APP_"))
.extract()
.expect("Failed to load config");
// Create database connection pool
let db_pool = PgPoolOptions::new()
.max_connections(10)
@@ -79,8 +88,11 @@ async fn main() {
);
// Create BannerApi and AppState
let banner_api =
BannerApi::new(config.banner_base_url.clone()).expect("Failed to create BannerApi");
let banner_api = BannerApi::new_with_config(
config.banner_base_url.clone(),
config.rate_limiting.clone().into(),
)
.expect("Failed to create BannerApi");
let banner_api_arc = Arc::new(banner_api);
let app_state = AppState::new(banner_api_arc.clone(), &config.redis_url)
@@ -138,7 +150,7 @@ async fn main() {
on_error: |error| {
Box::pin(async move {
if let Err(e) = poise::builtins::on_error(error).await {
tracing::error!("Fatal error while sending error message: {}", e);
tracing::error!(error = %e, "Fatal error while sending error message");
}
// error!(error = ?error, "command error");
})

View File

@@ -35,14 +35,14 @@ impl ScraperService {
/// Starts the scheduler and a pool of workers.
pub fn start(&mut self) {
info!("ScraperService starting...");
info!("ScraperService starting");
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
let scheduler_handle = tokio::spawn(async move {
scheduler.run().await;
});
self.scheduler_handle = Some(scheduler_handle);
info!("Scheduler task spawned.");
info!("Scheduler task spawned");
let worker_count = 4; // This could be configurable
for i in 0..worker_count {
@@ -52,19 +52,22 @@ impl ScraperService {
});
self.worker_handles.push(worker_handle);
}
info!("Spawned {} worker tasks.", self.worker_handles.len());
info!(
worker_count = self.worker_handles.len(),
"Spawned worker tasks"
);
}
/// Signals all child tasks to gracefully shut down.
pub async fn shutdown(&mut self) {
info!("Shutting down scraper service...");
info!("Shutting down scraper service");
if let Some(handle) = self.scheduler_handle.take() {
handle.abort();
}
for handle in self.worker_handles.drain(..) {
handle.abort();
}
info!("Scraper service shutdown.");
info!("Scraper service shutdown");
}
}

View File

@@ -6,7 +6,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info};
use tracing::{error, info, debug, trace};
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
@@ -24,12 +24,12 @@ impl Scheduler {
/// Runs the scheduler's main loop.
pub async fn run(&self) {
info!("Scheduler service started.");
info!("Scheduler service started");
let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute
loop {
interval.tick().await;
info!("Scheduler waking up to analyze and schedule jobs...");
// Scheduler analyzing data...
if let Err(e) = self.schedule_jobs().await {
error!(error = ?e, "Failed to schedule jobs");
}
@@ -44,12 +44,10 @@ impl Scheduler {
// 3. If no job exists, create a new, low-priority job to be executed in the near future.
let term = Term::get_current().inner().to_string();
info!(
term = term,
"[Scheduler] Enqueuing baseline subject scrape jobs..."
);
debug!(term = term, "Enqueuing subject jobs");
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
debug!(subject_count = subjects.len(), "Retrieved subjects from API");
for subject in subjects {
let payload = json!({ "subject": subject.code });
@@ -63,6 +61,7 @@ impl Scheduler {
.await?;
if existing_job.is_some() {
trace!(subject = subject.code, "Job already exists, skipping");
continue;
}
@@ -76,10 +75,10 @@ impl Scheduler {
.execute(&self.db_pool)
.await?;
info!(subject = subject.code, "[Scheduler] Enqueued new job");
debug!(subject = subject.code, "New job enqueued for subject");
}
info!("[Scheduler] Job scheduling complete.");
debug!("Job scheduling complete");
Ok(())
}
}

View File

@@ -6,7 +6,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info, warn};
use tracing::{debug, error, info, trace, warn};
/// A single worker instance.
///
@@ -34,7 +34,7 @@ impl Worker {
match self.fetch_and_lock_job().await {
Ok(Some(job)) => {
let job_id = job.id;
info!(worker_id = self.id, job_id = job.id, "Processing job");
debug!(worker_id = self.id, job_id = job.id, "Processing job");
if let Err(e) = self.process_job(job).await {
// Check if the error is due to an invalid session
if let Some(BannerApiError::InvalidSession(_)) =
@@ -58,7 +58,7 @@ impl Worker {
);
}
} else {
info!(worker_id = self.id, job_id, "Job processed successfully");
debug!(worker_id = self.id, job_id, "Job completed");
// If successful, delete the job.
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
@@ -72,6 +72,7 @@ impl Worker {
}
Ok(None) => {
// No job found, wait for a bit before polling again.
trace!(worker_id = self.id, "No jobs available, waiting");
time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
@@ -127,7 +128,7 @@ impl Worker {
info!(
worker_id = self.id,
subject = subject_code,
"Processing subject job"
"Scraping subject"
);
let term = Term::get_current().inner().to_string();
@@ -143,7 +144,7 @@ impl Worker {
worker_id = self.id,
subject = subject_code,
count = courses_from_api.len(),
"Found courses to upsert"
"Found courses"
);
for course in courses_from_api {
self.upsert_course(&course).await?;
@@ -190,7 +191,6 @@ impl Worker {
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job deleted");
Ok(())
}
@@ -199,7 +199,7 @@ impl Worker {
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job unlocked after failure");
info!(worker_id = self.id, job_id, "Job unlocked for retry");
Ok(())
}
}

View File

@@ -1,7 +1,7 @@
use super::Service;
use serenity::Client;
use std::sync::Arc;
use tracing::{debug, error};
use tracing::{error, warn};
/// Discord bot service implementation
pub struct BotService {
@@ -28,7 +28,7 @@ impl Service for BotService {
async fn run(&mut self) -> Result<(), anyhow::Error> {
match self.client.start().await {
Ok(()) => {
debug!(service = "bot", "stopped early.");
warn!(service = "bot", "stopped early");
Err(anyhow::anyhow!("bot stopped early"))
}
Err(e) => {

View File

@@ -42,7 +42,7 @@ impl ServiceManager {
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
trace!(service = name, id = ?handle.id(), "service spawned",);
debug!(service = name, id = ?handle.id(), "service spawned");
self.running_services.insert(name, handle);
}
@@ -127,7 +127,7 @@ impl ServiceManager {
for (name, handle) in self.running_services.drain() {
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(_)) => {
debug!(service = name, "service shutdown completed");
trace!(service = name, "service shutdown completed");
}
Ok(Err(e)) => {
warn!(service = name, error = ?e, "service shutdown failed");

View File

@@ -3,7 +3,7 @@ use crate::web::{BannerState, create_router};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
use tracing::{info, warn, trace};
/// Web server service implementation
pub struct WebService {
@@ -40,10 +40,10 @@ impl Service for WebService {
);
let listener = TcpListener::bind(addr).await?;
debug!(
info!(
service = "web",
"web server listening on {}",
format!("http://{}", addr)
address = %addr,
"web server listening"
);
// Create internal shutdown channel for axum graceful shutdown
@@ -54,7 +54,7 @@ impl Service for WebService {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
debug!(
trace!(
service = "web",
"received shutdown signal, starting graceful shutdown"
);