refactor: implement comprehensive graceful shutdown across all services

Implements graceful shutdown with broadcast channels and proper timeout handling
for scraper workers, scheduler, bot service, and status update tasks. Introduces
centralized shutdown utilities and improves service manager to handle parallel
shutdown with per-service timeouts instead of shared timeout budgets.

Key changes:
- Add utils module with shutdown helper functions
- Update ScraperService to return errors on shutdown failures
- Refactor scheduler with cancellable work tasks and 5s grace period
- Extract worker shutdown logic into helper methods for clarity
- Add broadcast channel shutdown support to BotService and status task
- Improve ServiceManager to shutdown services in parallel with individual timeouts
This commit is contained in:
Ryan Walters
2025-11-03 02:10:01 -06:00
parent 8af9b0a1a2
commit 47c23459f1
9 changed files with 281 additions and 203 deletions

View File

@@ -3,16 +3,15 @@ pub mod scheduler;
pub mod worker;
use crate::banner::BannerApi;
use crate::services::Service;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{info, warn};
use self::scheduler::Scheduler;
use self::worker::Worker;
use crate::services::Service;
/// The main service that will be managed by the application's `ServiceManager`.
///
@@ -91,6 +90,7 @@ impl Service for ScraperService {
let _ = shutdown_tx.send(());
} else {
warn!("No shutdown channel found for scraper service");
return Err(anyhow::anyhow!("No shutdown channel available"));
}
// Collect all handles
@@ -100,31 +100,15 @@ impl Service for ScraperService {
}
all_handles.append(&mut self.worker_handles);
// Wait for all tasks to complete with a timeout
let timeout_duration = Duration::from_secs(5);
match tokio::time::timeout(
timeout_duration,
futures::future::join_all(all_handles),
)
.await
{
Ok(results) => {
let failed = results.iter().filter(|r| r.is_err()).count();
if failed > 0 {
warn!(failed_count = failed, "Some scraper tasks failed during shutdown");
} else {
info!("All scraper tasks shutdown gracefully");
}
}
Err(_) => {
warn!(
timeout = format!("{:.2?}", timeout_duration),
"Scraper service shutdown timed out"
);
}
// Wait for all tasks to complete (no internal timeout - let ServiceManager handle it)
let results = futures::future::join_all(all_handles).await;
let failed = results.iter().filter(|r| r.is_err()).count();
if failed > 0 {
warn!(failed_count = failed, "Some scraper tasks panicked during shutdown");
return Err(anyhow::anyhow!("{} task(s) panicked", failed));
}
info!("All scraper tasks shutdown gracefully");
Ok(())
}
}

View File

@@ -25,7 +25,15 @@ impl Scheduler {
}
}
/// Runs the scheduler's main loop.
/// Runs the scheduler's main loop with graceful shutdown support.
///
/// The scheduler wakes up every 60 seconds to analyze data and enqueue jobs.
/// When a shutdown signal is received:
/// 1. Any in-progress scheduling work is gracefully cancelled via CancellationToken
/// 2. The scheduler waits up to 5 seconds for work to complete
/// 3. If timeout occurs, the task is abandoned (it will be aborted when dropped)
///
/// This ensures that shutdown is responsive even if scheduling work is blocked.
pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) {
info!("Scheduler service started");
@@ -35,19 +43,17 @@ impl Scheduler {
loop {
tokio::select! {
// Sleep until next scheduled run - instantly cancellable
_ = time::sleep_until(next_run) => {
// Create cancellation token for graceful task cancellation
let cancel_token = CancellationToken::new();
// Spawn scheduling work in a separate task for cancellability
// Spawn work in separate task to allow graceful cancellation during shutdown.
// Without this, shutdown would have to wait for the full scheduling cycle.
let work_handle = tokio::spawn({
let db_pool = self.db_pool.clone();
let banner_api = self.banner_api.clone();
let cancel_token = cancel_token.clone();
async move {
// Check for cancellation while running
tokio::select! {
result = Self::schedule_jobs_impl(&db_pool, &banner_api) => {
if let Err(e) = result {
@@ -67,19 +73,14 @@ impl Scheduler {
_ = shutdown_rx.recv() => {
info!("Scheduler received shutdown signal");
// Gracefully cancel any in-progress work
if let Some((handle, cancel_token)) = current_work.take() {
// Signal cancellation
cancel_token.cancel();
// Wait for graceful completion with timeout
match time::timeout(Duration::from_secs(5), handle).await {
Ok(_) => {
debug!("Scheduling work completed gracefully");
}
Err(_) => {
warn!("Scheduling work did not complete within 5s timeout, may have been aborted");
}
// Wait briefly for graceful completion
if tokio::time::timeout(Duration::from_secs(5), handle).await.is_err() {
warn!("Scheduling work did not complete within 5s, abandoning");
} else {
debug!("Scheduling work completed gracefully");
}
}
@@ -90,7 +91,14 @@ impl Scheduler {
}
}
/// The core logic for deciding what jobs to create.
/// Core scheduling logic that analyzes data and creates scrape jobs.
///
/// Strategy:
/// 1. Fetch all subjects for the current term from Banner API
/// 2. Query existing jobs in a single batch query
/// 3. Create jobs only for subjects that don't have pending jobs
///
/// This is a static method (not &self) to allow it to be called from spawned tasks.
async fn schedule_jobs_impl(db_pool: &PgPool, banner_api: &BannerApi) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API.

View File

@@ -30,28 +30,25 @@ impl Worker {
/// Runs the worker's main loop.
pub async fn run(&self, mut shutdown_rx: broadcast::Receiver<()>) {
info!(worker_id = self.id, "Worker started.");
info!(worker_id = self.id, "Worker started");
loop {
// Fetch and lock a job, racing against shutdown signal
let job = tokio::select! {
_ = shutdown_rx.recv() => {
info!(worker_id = self.id, "Worker received shutdown signal");
info!(worker_id = self.id, "Worker exiting gracefully");
info!(worker_id = self.id, "Worker received shutdown signal, exiting gracefully");
break;
}
result = self.fetch_and_lock_job() => {
match result {
Ok(Some(job)) => job,
Ok(None) => {
// No job found, wait for a bit before polling again
trace!(worker_id = self.id, "No jobs available, waiting");
time::sleep(Duration::from_secs(5)).await;
continue;
}
Err(e) => {
warn!(worker_id = self.id, error = ?e, "Failed to fetch job");
// Wait before retrying to avoid spamming errors
warn!(worker_id = self.id, error = ?e, "Failed to fetch job, waiting");
time::sleep(Duration::from_secs(10)).await;
continue;
}
@@ -65,63 +62,14 @@ impl Worker {
// Process the job, racing against shutdown signal
let process_result = tokio::select! {
_ = shutdown_rx.recv() => {
info!(worker_id = self.id, job_id, "Shutdown received during job processing");
// Unlock the job so it can be retried
if let Err(e) = self.unlock_job(job_id).await {
warn!(
worker_id = self.id,
job_id,
error = ?e,
"Failed to unlock job during shutdown"
);
} else {
debug!(worker_id = self.id, job_id, "Job unlocked during shutdown");
}
info!(worker_id = self.id, "Worker exiting gracefully");
self.handle_shutdown_during_processing(job_id).await;
break;
}
result = self.process_job(job) => {
result
}
result = self.process_job(job) => result
};
// Handle the job processing result
match process_result {
Ok(()) => {
debug!(worker_id = self.id, job_id, "Job completed");
// If successful, delete the job
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete job"
);
}
}
Err(JobError::Recoverable(e)) => {
self.handle_recoverable_error(job_id, e).await;
}
Err(JobError::Unrecoverable(e)) => {
error!(
worker_id = self.id,
job_id,
error = ?e,
"Job corrupted, deleting"
);
// Parse errors are unrecoverable - delete the job
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?delete_err,
"Failed to delete corrupted job"
);
}
}
}
self.handle_job_result(job_id, process_result).await;
}
}
@@ -191,24 +139,58 @@ impl Worker {
Ok(())
}
/// Handle shutdown signal received during job processing
async fn handle_shutdown_during_processing(&self, job_id: i32) {
info!(worker_id = self.id, job_id, "Shutdown received during job processing");
if let Err(e) = self.unlock_job(job_id).await {
warn!(
worker_id = self.id,
job_id,
error = ?e,
"Failed to unlock job during shutdown"
);
} else {
debug!(worker_id = self.id, job_id, "Job unlocked during shutdown");
}
info!(worker_id = self.id, "Worker exiting gracefully");
}
/// Handle the result of job processing
async fn handle_job_result(&self, job_id: i32, result: Result<(), JobError>) {
match result {
Ok(()) => {
debug!(worker_id = self.id, job_id, "Job completed successfully");
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete completed job");
}
}
Err(JobError::Recoverable(e)) => {
self.handle_recoverable_error(job_id, e).await;
}
Err(JobError::Unrecoverable(e)) => {
error!(worker_id = self.id, job_id, error = ?e, "Job corrupted, deleting");
if let Err(e) = self.delete_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to delete corrupted job");
}
}
}
}
/// Handle recoverable errors by logging appropriately and unlocking the job
async fn handle_recoverable_error(&self, job_id: i32, e: anyhow::Error) {
if let Some(BannerApiError::InvalidSession(_)) = e.downcast_ref::<BannerApiError>() {
warn!(
worker_id = self.id,
job_id, "Invalid session detected. Forcing session refresh."
job_id, "Invalid session detected, forcing session refresh"
);
} else {
error!(worker_id = self.id, job_id, error = ?e, "Failed to process job");
}
if let Err(unlock_err) = self.unlock_job(job_id).await {
error!(
worker_id = self.id,
job_id,
?unlock_err,
"Failed to unlock job"
);
if let Err(e) = self.unlock_job(job_id).await {
error!(worker_id = self.id, job_id, error = ?e, "Failed to unlock job");
}
}
}