refactor(scraper): implement graceful shutdown with broadcast channels

Replace task abortion with broadcast-based graceful shutdown for scheduler and workers. Implement cancellation tokens for in-progress work with 5s timeout. Add tokio-util dependency for CancellationToken support. Update ServiceManager to use completion channels and abort handles for better service lifecycle control.
This commit is contained in:
Ryan Walters
2025-11-03 01:22:12 -06:00
parent 020a00254f
commit 8af9b0a1a2
9 changed files with 343 additions and 172 deletions

View File

@@ -1,15 +1,16 @@
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, info, trace, warn};
use crate::services::{Service, ServiceResult, run_service};
/// Manages multiple services and their lifecycle
pub struct ServiceManager {
registered_services: HashMap<String, Box<dyn Service>>,
running_services: HashMap<String, JoinHandle<ServiceResult>>,
service_handles: HashMap<String, tokio::task::AbortHandle>,
completion_rx: Option<mpsc::UnboundedReceiver<(String, ServiceResult)>>,
completion_tx: mpsc::UnboundedSender<(String, ServiceResult)>,
shutdown_tx: broadcast::Sender<()>,
}
@@ -22,9 +23,13 @@ impl Default for ServiceManager {
impl ServiceManager {
pub fn new() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
let (completion_tx, completion_rx) = mpsc::unbounded_channel();
Self {
registered_services: HashMap::new(),
running_services: HashMap::new(),
service_handles: HashMap::new(),
completion_rx: Some(completion_rx),
completion_tx,
shutdown_tx,
}
}
@@ -46,9 +51,19 @@ impl ServiceManager {
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
let completion_tx = self.completion_tx.clone();
let name_clone = name.clone();
// Spawn service task
let handle = tokio::spawn(async move {
let result = run_service(service, shutdown_rx).await;
// Send completion notification
let _ = completion_tx.send((name_clone, result));
});
// Store abort handle for shutdown control
self.service_handles.insert(name.clone(), handle.abort_handle());
debug!(service = name, id = ?handle.id(), "service spawned");
self.running_services.insert(name, handle);
}
info!(
@@ -62,7 +77,7 @@ impl ServiceManager {
/// Run all services until one completes or fails
/// Returns the first service that completes and its result
pub async fn run(&mut self) -> (String, ServiceResult) {
if self.running_services.is_empty() {
if self.service_handles.is_empty() {
return (
"none".to_string(),
ServiceResult::Error(anyhow::anyhow!("No services to run")),
@@ -71,82 +86,112 @@ impl ServiceManager {
info!(
"servicemanager running {} services",
self.running_services.len()
self.service_handles.len()
);
// Wait for any service to complete
loop {
let mut completed_services = Vec::new();
// Wait for any service to complete via the channel
let completion_rx = self
.completion_rx
.as_mut()
.expect("completion_rx should be available");
for (name, handle) in &mut self.running_services {
if handle.is_finished() {
completed_services.push(name.clone());
}
}
if let Some(completed_name) = completed_services.first() {
let handle = self.running_services.remove(completed_name).unwrap();
match handle.await {
Ok(result) => {
return (completed_name.clone(), result);
}
Err(e) => {
error!(service = completed_name, "service task panicked: {e}");
return (
completed_name.clone(),
ServiceResult::Error(anyhow::anyhow!("Task panic: {e}")),
);
}
}
}
// Small delay to prevent busy-waiting
tokio::time::sleep(Duration::from_millis(10)).await;
}
completion_rx
.recv()
.await
.map(|(name, result)| {
self.service_handles.remove(&name);
(name, result)
})
.unwrap_or_else(|| {
(
"channel_closed".to_string(),
ServiceResult::Error(anyhow::anyhow!("Completion channel closed")),
)
})
}
/// Shutdown all services gracefully with a timeout.
///
/// If any service fails to shutdown, it will return an error containing the names of the services that failed to shutdown.
/// All services receive the shutdown signal simultaneously and must complete within the
/// specified timeout (combined, not per-service). If any service fails to shutdown within
/// the timeout, it will be aborted and included in the error result.
///
/// If all services shutdown successfully, the function will return the duration elapsed.
pub async fn shutdown(&mut self, timeout: Duration) -> Result<Duration, Vec<String>> {
let service_count = self.running_services.len();
let service_names: Vec<_> = self.running_services.keys().cloned().collect();
let service_count = self.service_handles.len();
let service_names: Vec<_> = self.service_handles.keys().cloned().collect();
info!(
service_count,
services = ?service_names,
timeout = format!("{:.2?}", timeout),
"shutting down {} services with {:?} timeout",
"shutting down {} services with {:?} total timeout",
service_count,
timeout
);
// Send shutdown signal to all services
if service_count == 0 {
return Ok(Duration::ZERO);
}
// Send shutdown signal to all services simultaneously
let _ = self.shutdown_tx.send(());
// Wait for all services to complete
let start_time = std::time::Instant::now();
let mut pending_services = Vec::new();
let mut completed = 0;
let mut failed_services = Vec::new();
for (name, handle) in self.running_services.drain() {
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(_)) => {
trace!(service = name, "service shutdown completed");
// Borrow the receiver mutably (don't take ownership to allow reuse)
let completion_rx = self
.completion_rx
.as_mut()
.expect("completion_rx should be available");
// Wait for all services to complete with timeout
while completed < service_count {
match tokio::time::timeout(
timeout.saturating_sub(start_time.elapsed()),
completion_rx.recv(),
)
.await
{
Ok(Some((name, result))) => {
completed += 1;
self.service_handles.remove(&name);
if matches!(result, ServiceResult::GracefulShutdown) {
trace!(service = name, "service shutdown completed");
} else {
warn!(service = name, "service shutdown with non-graceful result");
failed_services.push(name);
}
}
Ok(Err(e)) => {
warn!(service = name, error = ?e, "service shutdown failed");
pending_services.push(name);
Ok(None) => {
// Channel closed - shouldn't happen but handle it
warn!("completion channel closed during shutdown");
break;
}
Err(_) => {
warn!(service = name, "service shutdown timed out");
pending_services.push(name);
// Timeout - abort all remaining services
warn!(
timeout = format!("{:.2?}", timeout),
elapsed = format!("{:.2?}", start_time.elapsed()),
remaining = service_count - completed,
"shutdown timeout - aborting remaining services"
);
for (name, handle) in self.service_handles.drain() {
handle.abort();
failed_services.push(name);
}
break;
}
}
}
let elapsed = start_time.elapsed();
if pending_services.is_empty() {
if failed_services.is_empty() {
info!(
service_count,
elapsed = format!("{:.2?}", elapsed),
@@ -156,14 +201,14 @@ impl ServiceManager {
Ok(elapsed)
} else {
warn!(
pending_count = pending_services.len(),
pending_services = ?pending_services,
failed_count = failed_services.len(),
failed_services = ?failed_services,
elapsed = format!("{:.2?}", elapsed),
"services shutdown completed with {} pending: {}",
pending_services.len(),
pending_services.join(", ")
"services shutdown completed with {} failed: {}",
failed_services.len(),
failed_services.join(", ")
);
Err(pending_services)
Err(failed_services)
}
}
}