feat: asynchronous, rate limited term session acquisition

This commit is contained in:
2025-09-12 20:35:12 -05:00
parent 353c36bcf2
commit e9a0558535
4 changed files with 259 additions and 71 deletions
+13 -9
View File
@@ -279,7 +279,19 @@ impl BannerApi {
) -> Result<SearchResult, BannerApiError> {
// self.sessions.reset_data_form().await?;
let session = self.sessions.acquire(term.parse()?).await?;
let mut session = self.sessions.acquire(term.parse()?).await?;
if session.been_used() {
self.http
.post(&format!("{}/classSearch/resetDataForm", self.base_url))
.header("Cookie", session.cookie())
.send()
.await
.map_err(|e| BannerApiError::RequestFailed(e.into()))?;
}
session.touch();
let mut params = query.to_params();
// Add additional parameters
@@ -293,14 +305,6 @@ impl BannerApi {
params.insert("startDatepicker".to_string(), String::new());
params.insert("endDatepicker".to_string(), String::new());
if session.been_used() {
self.http
.post(&format!("{}/classSearch/resetDataForm", self.base_url))
.send()
.await
.map_err(|e| BannerApiError::RequestFailed(e.into()))?;
}
debug!(
term = term,
query = ?query,
+134 -62
View File
@@ -5,18 +5,28 @@ use crate::banner::models::Term;
use anyhow::{Context, Result};
use cookie::Cookie;
use dashmap::DashMap;
use governor::state::InMemoryState;
use governor::{Quota, RateLimiter};
use once_cell::sync::Lazy;
use rand::distr::{Alphanumeric, SampleString};
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use std::collections::{HashMap, VecDeque};
use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Notify};
use tracing::{debug, info};
use url::Url;
const SESSION_EXPIRY: Duration = Duration::from_secs(25 * 60); // 25 minutes
// A global rate limiter to ensure we only try to create one new session every 10 seconds,
// preventing us from overwhelming the server with session creation requests.
static SESSION_CREATION_RATE_LIMITER: Lazy<
RateLimiter<governor::state::direct::NotKeyed, InMemoryState, governor::clock::DefaultClock>,
> = Lazy::new(|| RateLimiter::direct(Quota::with_period(Duration::from_secs(10)).unwrap()));
/// Represents an active anonymous session within the Banner API.
/// Identified by multiple persistent cookies, as well as a client-generated "unique session ID".
#[derive(Debug, Clone)]
@@ -97,8 +107,8 @@ impl BannerSession {
/// A smart pointer that returns a BannerSession to the pool when dropped.
pub struct PooledSession {
session: Option<BannerSession>,
// This Arc points directly to the queue the session belongs to.
pool: Arc<Mutex<VecDeque<BannerSession>>>,
// This Arc points directly to the term-specific pool.
pool: Arc<TermPool>,
}
impl PooledSession {
@@ -125,33 +135,55 @@ impl DerefMut for PooledSession {
impl Drop for PooledSession {
fn drop(&mut self) {
if let Some(session) = self.session.take() {
// Don't return expired sessions to the pool.
if session.is_expired() {
debug!(
id = session.unique_session_id,
"Session is now expired, dropping."
);
return;
}
// This is a synchronous lock, so it's allowed in drop().
// It blocks the current thread briefly to return the session.
let mut queue = self.pool.lock().unwrap();
let id = session.unique_session_id.clone();
queue.push_back(session);
debug!(
id = id,
"Session returned to pool. Queue size is now {queue_size}.",
queue_size = queue.len(),
);
let pool = self.pool.clone();
// Since drop() cannot be async, we spawn a task to return the session.
tokio::spawn(async move {
pool.release(session).await;
});
}
}
}
pub struct TermPool {
sessions: Mutex<VecDeque<BannerSession>>,
notifier: Notify,
is_creating: Mutex<bool>,
}
impl TermPool {
fn new() -> Self {
Self {
sessions: Mutex::new(VecDeque::new()),
notifier: Notify::new(),
is_creating: Mutex::new(false),
}
}
async fn release(&self, session: BannerSession) {
let id = session.unique_session_id.clone();
if session.is_expired() {
debug!(id = id, "Session is now expired, dropping.");
// Wake up a waiter, as it might need to create a new session
// if this was the last one.
self.notifier.notify_one();
return;
}
let mut queue = self.sessions.lock().await;
queue.push_back(session);
let queue_size = queue.len();
drop(queue); // Release lock before notifying
debug!(
id = id,
"Session returned to pool. Queue size is now {queue_size}."
);
self.notifier.notify_one();
}
}
pub struct SessionPool {
sessions: DashMap<Term, Arc<Mutex<VecDeque<BannerSession>>>>,
sessions: DashMap<Term, Arc<TermPool>>,
http: ClientWithMiddleware,
base_url: String,
}
@@ -166,48 +198,88 @@ impl SessionPool {
}
/// Acquires a session from the pool.
/// If no sessions are available, a new one is created on demand.
/// If no sessions are available, a new one is created on demand,
/// respecting the global rate limit.
pub async fn acquire(&self, term: Term) -> Result<PooledSession> {
// Get the queue for the given term, or insert a new empty one.
let pool_entry = self
let term_pool = self
.sessions
.entry(term.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.or_insert_with(|| Arc::new(TermPool::new()))
.clone();
loop {
// Lock the specific queue for this term
let session_option = {
let mut queue = pool_entry.lock().unwrap();
queue.pop_front() // Try to get a session
};
if let Some(mut session) = session_option {
// We got a session, check if it's expired.
if !session.is_expired() {
debug!(id = session.unique_session_id, "Reusing session");
session.touch();
return Ok(PooledSession {
session: Some(session),
pool: pool_entry,
});
} else {
debug!(
id = session.unique_session_id,
"Popped an expired session, discarding.",
);
// The session is expired, so we loop again to try and get another one.
// Fast path: Try to get an existing, non-expired session.
{
let mut queue = term_pool.sessions.lock().await;
if let Some(session) = queue.pop_front() {
if !session.is_expired() {
debug!(id = session.unique_session_id, "Reusing session from pool");
return Ok(PooledSession {
session: Some(session),
pool: Arc::clone(&term_pool),
});
} else {
debug!(
id = session.unique_session_id,
"Popped an expired session, discarding."
);
}
}
} else {
// Queue was empty, so we create a new session.
let mut new_session = self.create_session(&term).await?;
new_session.touch();
} // MutexGuard is dropped, lock is released.
return Ok(PooledSession {
session: Some(new_session),
pool: pool_entry,
});
// Slow path: No sessions available. We must either wait or become the creator.
let mut is_creating_guard = term_pool.is_creating.lock().await;
if *is_creating_guard {
// Another task is already creating a session. Release the lock and wait.
drop(is_creating_guard);
debug!("Another task is creating a session, waiting for notification...");
term_pool.notifier.notified().await;
// Loop back to the top to try the fast path again.
continue;
}
// This task is now the designated creator.
*is_creating_guard = true;
drop(is_creating_guard);
// Race: wait for a session to be returned OR for the rate limiter to allow a new one.
debug!("Pool empty, racing notifier vs rate limiter...");
tokio::select! {
_ = term_pool.notifier.notified() => {
// A session was returned while we were waiting!
// We are no longer the creator. Reset the flag and loop to race for the new session.
debug!("Notified that a session was returned. Looping to retry.");
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
continue;
}
_ = SESSION_CREATION_RATE_LIMITER.until_ready() => {
// The rate limit has elapsed. It's our job to create the session.
debug!("Rate limiter ready. Proceeding to create a new session.");
let new_session_result = self.create_session(&term).await;
// After creation, we are no longer the creator. Reset the flag
// and notify all other waiting tasks.
let mut guard = term_pool.is_creating.lock().await;
*guard = false;
drop(guard);
term_pool.notifier.notify_waiters();
match new_session_result {
Ok(new_session) => {
debug!(id = new_session.unique_session_id, "Successfully created new session");
return Ok(PooledSession {
session: Some(new_session),
pool: term_pool,
});
}
Err(e) => {
// Propagate the error if session creation failed.
return Err(e.context("Failed to create new session in pool"));
}
}
}
}
}
}
@@ -269,7 +341,7 @@ impl SessionPool {
.context("Failed to get term selection page")?;
// TOOD: Validate success
/*let terms = self.get_terms("", 1, 10).await?;
let terms = self.get_terms("", 1, 10).await?;
if !terms.iter().any(|t| t.code == term.to_string()) {
return Err(anyhow::anyhow!("Failed to get term search response"));
}
@@ -280,7 +352,7 @@ impl SessionPool {
.any(|t| t.code == term.to_string())
{
return Err(anyhow::anyhow!("Failed to get term search response"));
}*/
}
let unique_session_id = generate_session_id();
self.select_term(&term.to_string(), &unique_session_id, &cookie_header)