feat: improve logging, solve lints, improve implementations, remove unused code, standardize things

This commit is contained in:
2025-08-27 12:13:09 -05:00
parent 9972357cf6
commit ac70306c04
10 changed files with 168 additions and 121 deletions

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use crate::services::{Service, ServiceResult, run_service};
@@ -30,12 +30,21 @@ impl ServiceManager {
/// Spawn all registered services
pub fn spawn_all(&mut self) {
let service_count = self.registered_services.len();
let service_names: Vec<_> = self.registered_services.keys().cloned().collect();
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
self.running_services.insert(name, handle);
}
info!("Spawned {} services", self.running_services.len());
info!(
service_count,
services = ?service_names,
"spawned {} services",
service_count
);
}
/// Run all services until one completes or fails
@@ -49,7 +58,7 @@ impl ServiceManager {
}
info!(
"ServiceManager running {} services",
"servicemanager running {} services",
self.running_services.len()
);
@@ -70,7 +79,7 @@ impl ServiceManager {
return (completed_name.clone(), result);
}
Err(e) => {
error!(service = completed_name, "Service task panicked: {e}");
error!(service = completed_name, "service task panicked: {e}");
return (
completed_name.clone(),
ServiceResult::Error(anyhow::anyhow!("Task panic: {e}")),
@@ -84,82 +93,65 @@ impl ServiceManager {
}
}
/// Shutdown all services gracefully with a timeout
/// Returns Ok(()) if all services shut down, or Err(Vec<String>) with names of services that timed out
pub async fn shutdown(mut self, timeout: Duration) -> Result<(), Vec<String>> {
if self.running_services.is_empty() {
info!("No services to shutdown");
return Ok(());
}
/// Shutdown all services gracefully with a timeout.
///
/// If any service fails to shutdown, it will return an error containing the names of the services that failed to shutdown.
/// If all services shutdown successfully, the function will return the duration elapsed.
pub async fn shutdown(&mut self, timeout: Duration) -> Result<Duration, Vec<String>> {
let service_count = self.running_services.len();
let service_names: Vec<_> = self.running_services.keys().cloned().collect();
info!(
"Shutting down {} services with {}s timeout",
self.running_services.len(),
timeout.as_secs()
service_count,
services = ?service_names,
timeout = format!("{:.2?}", timeout),
"shutting down {} services with {:?} timeout",
service_count,
timeout
);
// Signal all services to shutdown
// Send shutdown signal to all services
let _ = self.shutdown_tx.send(());
// Wait for all services to complete with timeout
let shutdown_result = tokio::time::timeout(timeout, async {
let mut completed = Vec::new();
let mut failed = Vec::new();
// Wait for all services to complete
let start_time = std::time::Instant::now();
let mut pending_services = Vec::new();
while !self.running_services.is_empty() {
let mut to_remove = Vec::new();
for (name, handle) in &mut self.running_services {
if handle.is_finished() {
to_remove.push(name.clone());
}
for (name, handle) in self.running_services.drain() {
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(_)) => {
debug!(service = name, "service shutdown completed");
}
for name in to_remove {
let handle = self.running_services.remove(&name).unwrap();
match handle.await {
Ok(ServiceResult::GracefulShutdown) => {
completed.push(name);
}
Ok(ServiceResult::NormalCompletion) => {
warn!(service = name, "Service completed normally during shutdown");
completed.push(name);
}
Ok(ServiceResult::Error(e)) => {
error!(service = name, "Service error during shutdown: {e}");
failed.push(name);
}
Err(e) => {
error!(service = name, "Service panic during shutdown: {e}");
failed.push(name);
}
}
Ok(Err(e)) => {
warn!(service = name, error = ?e, "service shutdown failed");
pending_services.push(name);
}
if !self.running_services.is_empty() {
tokio::time::sleep(Duration::from_millis(10)).await;
Err(_) => {
warn!(service = name, "service shutdown timed out");
pending_services.push(name);
}
}
}
(completed, failed)
})
.await;
match shutdown_result {
Ok((completed, failed)) => {
if !completed.is_empty() {
info!("Services shutdown completed: {}", completed.join(", "));
}
if !failed.is_empty() {
warn!("Services had errors during shutdown: {}", failed.join(", "));
}
Ok(())
}
Err(_) => {
// Timeout occurred - return names of services that didn't complete
let pending_services: Vec<String> = self.running_services.keys().cloned().collect();
Err(pending_services)
}
let elapsed = start_time.elapsed();
if pending_services.is_empty() {
info!(
service_count,
elapsed = format!("{:.2?}", elapsed),
"services shutdown completed: {}",
service_names.join(", ")
);
Ok(elapsed)
} else {
warn!(
pending_count = pending_services.len(),
pending_services = ?pending_services,
elapsed = format!("{:.2?}", elapsed),
"services shutdown completed with {} pending: {}",
pending_services.len(),
pending_services.join(", ")
);
Err(pending_services)
}
}
}