feat: implement simple web service, improve ServiceManager encapsulation

This commit is contained in:
2025-08-27 11:58:57 -05:00
parent 2ec899cf25
commit 9972357cf6
9 changed files with 235 additions and 35 deletions

View File

@@ -4,11 +4,12 @@ use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use crate::services::ServiceResult;
use crate::services::{Service, ServiceResult, run_service};
/// Manages multiple services and their lifecycle
pub struct ServiceManager {
services: HashMap<String, JoinHandle<ServiceResult>>,
registered_services: HashMap<String, Box<dyn Service>>,
running_services: HashMap<String, JoinHandle<ServiceResult>>,
shutdown_tx: broadcast::Sender<()>,
}
@@ -16,45 +17,54 @@ impl ServiceManager {
pub fn new() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
Self {
services: HashMap::new(),
registered_services: HashMap::new(),
running_services: HashMap::new(),
shutdown_tx,
}
}
/// Add a service to be managed
pub fn add_service(&mut self, name: String, handle: JoinHandle<ServiceResult>) {
self.services.insert(name, handle);
/// Register a service to be managed (not yet spawned)
pub fn register_service(&mut self, name: &str, service: Box<dyn Service>) {
self.registered_services.insert(name.to_string(), service);
}
/// Get a shutdown receiver for services to subscribe to
pub fn subscribe(&self) -> broadcast::Receiver<()> {
self.shutdown_tx.subscribe()
/// Spawn all registered services
pub fn spawn_all(&mut self) {
for (name, service) in self.registered_services.drain() {
let shutdown_rx = self.shutdown_tx.subscribe();
let handle = tokio::spawn(run_service(service, shutdown_rx));
self.running_services.insert(name, handle);
}
info!("Spawned {} services", self.running_services.len());
}
/// Run all services until one completes or fails
/// Returns the first service that completes and its result
pub async fn run(&mut self) -> (String, ServiceResult) {
if self.services.is_empty() {
if self.running_services.is_empty() {
return (
"none".to_string(),
ServiceResult::Error(anyhow::anyhow!("No services to run")),
);
}
info!("ServiceManager running {} services", self.services.len());
info!(
"ServiceManager running {} services",
self.running_services.len()
);
// Wait for any service to complete
loop {
let mut completed_services = Vec::new();
for (name, handle) in &mut self.services {
for (name, handle) in &mut self.running_services {
if handle.is_finished() {
completed_services.push(name.clone());
}
}
if let Some(completed_name) = completed_services.first() {
let handle = self.services.remove(completed_name).unwrap();
let handle = self.running_services.remove(completed_name).unwrap();
match handle.await {
Ok(result) => {
return (completed_name.clone(), result);
@@ -77,14 +87,14 @@ impl ServiceManager {
/// Shutdown all services gracefully with a timeout
/// Returns Ok(()) if all services shut down, or Err(Vec<String>) with names of services that timed out
pub async fn shutdown(mut self, timeout: Duration) -> Result<(), Vec<String>> {
if self.services.is_empty() {
if self.running_services.is_empty() {
info!("No services to shutdown");
return Ok(());
}
info!(
"Shutting down {} services with {}s timeout",
self.services.len(),
self.running_services.len(),
timeout.as_secs()
);
@@ -96,17 +106,17 @@ impl ServiceManager {
let mut completed = Vec::new();
let mut failed = Vec::new();
while !self.services.is_empty() {
while !self.running_services.is_empty() {
let mut to_remove = Vec::new();
for (name, handle) in &mut self.services {
for (name, handle) in &mut self.running_services {
if handle.is_finished() {
to_remove.push(name.clone());
}
}
for name in to_remove {
let handle = self.services.remove(&name).unwrap();
let handle = self.running_services.remove(&name).unwrap();
match handle.await {
Ok(ServiceResult::GracefulShutdown) => {
completed.push(name);
@@ -126,7 +136,7 @@ impl ServiceManager {
}
}
if !self.services.is_empty() {
if !self.running_services.is_empty() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
@@ -147,7 +157,7 @@ impl ServiceManager {
}
Err(_) => {
// Timeout occurred - return names of services that didn't complete
let pending_services: Vec<String> = self.services.keys().cloned().collect();
let pending_services: Vec<String> = self.running_services.keys().cloned().collect();
Err(pending_services)
}
}

View File

@@ -3,6 +3,7 @@ use tracing::{error, info, warn};
pub mod bot;
pub mod manager;
pub mod web;
#[derive(Debug)]
pub enum ServiceResult {
@@ -21,6 +22,8 @@ pub trait Service: Send + Sync {
async fn run(&mut self) -> Result<(), anyhow::Error>;
/// Gracefully shutdown the service
///
/// An 'Ok' result does not mean the service has completed shutdown, it merely means that the service shutdown was initiated.
async fn shutdown(&mut self) -> Result<(), anyhow::Error>;
}

79
src/services/web.rs Normal file
View File

@@ -0,0 +1,79 @@
use super::Service;
use crate::web::routes::{BannerState, create_banner_router};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
/// Web server service implementation
pub struct WebService {
port: u16,
banner_state: BannerState,
shutdown_tx: Option<broadcast::Sender<()>>,
}
impl WebService {
pub fn new(port: u16, banner_state: BannerState) -> Self {
Self {
port,
banner_state,
shutdown_tx: None,
}
}
}
#[async_trait::async_trait]
impl Service for WebService {
fn name(&self) -> &'static str {
"web"
}
async fn run(&mut self) -> Result<(), anyhow::Error> {
// Create the main router with Banner API routes
let app = create_banner_router(self.banner_state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
info!(
service = "web",
link = format!("http://localhost:{}", addr.port()),
"Starting web server",
);
let listener = TcpListener::bind(addr).await?;
debug!(
service = "web",
"Web server listening on {}",
format!("http://{}", addr)
);
// Create internal shutdown channel for axum graceful shutdown
let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1);
self.shutdown_tx = Some(shutdown_tx);
// Use axum's graceful shutdown with the internal shutdown signal
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
debug!(
service = "web",
"Received shutdown signal, starting graceful shutdown"
);
})
.await?;
info!(service = "web", "Web server stopped");
Ok(())
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
} else {
warn!(
service = "web",
"No shutdown channel found, cannot trigger graceful shutdown"
);
}
Ok(())
}
}