feat: add request ID propagation from Rust to Bun with structured logging

- Forward x-request-id header through proxy and API calls
- Store RequestId in request extensions for downstream access
- Add AsyncLocalStorage context to correlate logs across async boundaries
- Improve migration logging to show pending changes before applying
- Reduce noise in logs (common OG images, health checks)
This commit is contained in:
2026-01-13 16:42:14 -06:00
parent 6d8766d3a6
commit a6cc0b8e66
13 changed files with 173 additions and 53 deletions
+2 -1
View File
@@ -188,8 +188,9 @@ impl IsrCache {
/// Invalidate all cached entries /// Invalidate all cached entries
pub async fn invalidate_all(&self) { pub async fn invalidate_all(&self) {
let previous_count = self.cache.entry_count();
self.cache.invalidate_all(); self.cache.invalidate_all();
tracing::info!("All cache entries invalidated"); tracing::info!(previous_count, "All cache entries invalidated");
} }
/// Get cache statistics /// Get cache statistics
+35 -6
View File
@@ -1,8 +1,9 @@
use clap::Parser; use clap::Parser;
use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tower_http::{cors::CorsLayer, limit::RequestBodyLimitLayer, trace::TraceLayer}; use tower_http::{cors::CorsLayer, limit::RequestBodyLimitLayer};
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
mod assets; mod assets;
@@ -87,14 +88,43 @@ async fn main() {
.await .await
.expect("Failed to connect to database"); .expect("Failed to connect to database");
// Run migrations on startup // Check and run migrations on startup
tracing::info!("Running database migrations"); let migrator = sqlx::migrate!();
sqlx::migrate!().run(&pool).await.unwrap_or_else(|e| {
// Query applied migrations directly from the database
let applied_versions: HashSet<i64> =
sqlx::query_scalar::<_, i64>("SELECT version FROM _sqlx_migrations ORDER BY version")
.fetch_all(&pool)
.await
.unwrap_or_default()
.into_iter()
.collect();
let pending: Vec<_> = migrator
.iter()
.filter(|m| !m.migration_type.is_down_migration())
.filter(|m| !applied_versions.contains(&m.version))
.map(|m| m.description.as_ref())
.collect();
if pending.is_empty() {
let last_version = applied_versions.iter().max();
let last_name = last_version
.and_then(|v| migrator.iter().find(|m| m.version == *v))
.map(|m| m.description.as_ref());
tracing::debug!(last_migration = ?last_name, "Database schema is current");
} else {
tracing::warn!(migrations = ?pending, "Pending database migrations");
}
migrator.run(&pool).await.unwrap_or_else(|e| {
tracing::error!(error = %e, "Migration failed"); tracing::error!(error = %e, "Migration failed");
std::process::exit(1); std::process::exit(1);
}); });
tracing::info!("Migrations applied successfully"); if !pending.is_empty() {
tracing::info!(count = pending.len(), "Migrations applied");
}
// Ensure admin user exists // Ensure admin user exists
auth::ensure_admin_user(&pool) auth::ensure_admin_user(&pool)
@@ -188,7 +218,6 @@ async fn main() {
trust_request_id: Option<String>, trust_request_id: Option<String>,
) -> axum::Router<Arc<AppState>> { ) -> axum::Router<Arc<AppState>> {
router router
.layer(TraceLayer::new_for_http())
.layer(RequestIdLayer::new(trust_request_id)) .layer(RequestIdLayer::new(trust_request_id))
.layer(CorsLayer::permissive()) .layer(CorsLayer::permissive())
.layer(RequestBodyLimitLayer::new(1_048_576)) .layer(RequestBodyLimitLayer::new(1_048_576))
+1 -1
View File
@@ -1,3 +1,3 @@
pub mod request_id; pub mod request_id;
pub use request_id::RequestIdLayer; pub use request_id::{RequestId, RequestIdLayer};
+39 -2
View File
@@ -1,7 +1,12 @@
use axum::{body::Body, extract::Request, http::HeaderName, response::Response}; use axum::{body::Body, extract::Request, http::HeaderName, response::Response};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Instant;
use tower::{Layer, Service}; use tower::{Layer, Service};
/// Request ID stored in request extensions for downstream access
#[derive(Clone)]
pub struct RequestId(pub String);
#[derive(Clone)] #[derive(Clone)]
pub struct RequestIdLayer { pub struct RequestIdLayer {
trust_header: Option<HeaderName>, trust_header: Option<HeaderName>,
@@ -36,6 +41,7 @@ impl<S> Service<Request> for RequestIdService<S>
where where
S: Service<Request, Response = Response<Body>> + Send + 'static, S: Service<Request, Response = Response<Body>> + Send + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
S::Error: std::fmt::Debug,
{ {
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
@@ -47,7 +53,7 @@ where
self.inner.poll_ready(cx) self.inner.poll_ready(cx)
} }
fn call(&mut self, req: Request) -> Self::Future { fn call(&mut self, mut req: Request) -> Self::Future {
let req_id = self let req_id = self
.trust_header .trust_header
.as_ref() .as_ref()
@@ -61,13 +67,44 @@ where
let span = tracing::info_span!("request", req_id = %req_id); let span = tracing::info_span!("request", req_id = %req_id);
let _enter = span.enter(); let _enter = span.enter();
let method = req.method().clone();
let path = req.uri().path().to_string();
let start = Instant::now();
tracing::debug!(method = %method, path = %path, "Request");
// Store request ID in extensions for downstream use (e.g., proxying to Bun)
req.extensions_mut().insert(RequestId(req_id));
let span_clone = span.clone(); let span_clone = span.clone();
let future = self.inner.call(req); let future = self.inner.call(req);
Box::pin(async move { Box::pin(async move {
let _enter = span_clone.enter(); let _enter = span_clone.enter();
future.await let result = future.await;
let duration_ms = start.elapsed().as_millis() as u64;
match &result {
Ok(response) => {
let status = response.status();
match status.as_u16() {
200..=399 => {
tracing::debug!(status = status.as_u16(), duration_ms, "Response")
}
400..=499 => {
tracing::info!(status = status.as_u16(), duration_ms, "Response")
}
_ => tracing::warn!(status = status.as_u16(), duration_ms, "Response"),
}
}
Err(e) => {
tracing::error!(error = ?e, duration_ms, "Request failed");
}
}
result
}) })
} }
} }
+8 -8
View File
@@ -89,19 +89,19 @@ pub async fn regenerate_common_images(state: Arc<AppState>) {
// Wait 2 seconds before starting // Wait 2 seconds before starting
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
tracing::info!("Ensuring common OG images exist"); tracing::debug!("Checking common OG images");
let specs = vec![OGImageSpec::Index, OGImageSpec::Projects]; let specs = vec![OGImageSpec::Index, OGImageSpec::Projects];
for spec in specs { let mut ready = Vec::new();
match ensure_og_image(&spec, state.clone()).await {
Ok(()) => { for spec in &specs {
tracing::info!(r2_key = spec.r2_key(), "Common OG image ready"); match ensure_og_image(spec, state.clone()).await {
} Ok(()) => ready.push(spec.r2_key()),
Err(e) => { Err(e) => {
tracing::error!(r2_key = spec.r2_key(), error = %e, "Failed to ensure OG image"); tracing::error!(r2_key = spec.r2_key(), error = %e, "OG image failed");
} }
} }
} }
tracing::info!("Finished ensuring common OG images"); tracing::info!(images = ?ready, "Common OG images ready");
} }
+9 -3
View File
@@ -15,7 +15,6 @@ use crate::{
}; };
/// ISR handler - serves pages through Bun SSR with caching and session validation /// ISR handler - serves pages through Bun SSR with caching and session validation
#[tracing::instrument(skip(state, req), fields(path = %req.uri().path(), method = %req.method()))]
pub async fn isr_handler(State(state): State<Arc<AppState>>, req: Request) -> Response { pub async fn isr_handler(State(state): State<Arc<AppState>>, req: Request) -> Response {
let method = req.method().clone(); let method = req.method().clone();
let uri = req.uri(); let uri = req.uri();
@@ -69,7 +68,7 @@ pub async fn isr_handler(State(state): State<Arc<AppState>>, req: Request) -> Re
let is_head = method == axum::http::Method::HEAD; let is_head = method == axum::http::Method::HEAD;
if path.starts_with("/api/") { if path.starts_with("/api/") {
tracing::error!("API request reached ISR handler - routing bug!"); tracing::error!(path = %path, "API request reached ISR handler - routing bug");
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal routing error").into_response(); return (StatusCode::INTERNAL_SERVER_ERROR, "Internal routing error").into_response();
} }
@@ -104,6 +103,13 @@ pub async fn isr_handler(State(state): State<Arc<AppState>>, req: Request) -> Re
let mut forward_headers = HeaderMap::new(); let mut forward_headers = HeaderMap::new();
let mut is_authenticated = false; let mut is_authenticated = false;
// Forward request ID to Bun (set by RequestIdLayer)
if let Some(request_id) = req.extensions().get::<crate::middleware::RequestId>() {
if let Ok(header_value) = axum::http::HeaderValue::from_str(&request_id.0) {
forward_headers.insert("x-request-id", header_value);
}
}
// SECURITY: Strip any X-Session-User header from incoming request to prevent spoofing // SECURITY: Strip any X-Session-User header from incoming request to prevent spoofing
// Extract and validate session from cookie // Extract and validate session from cookie
@@ -382,7 +388,7 @@ pub async fn perform_health_check(
false false
} }
Err(_) => { Err(_) => {
tracing::error!("Health check failed: timeout after 5s"); tracing::error!(timeout_sec = 5, "Health check timed out");
false false
} }
}; };
+13 -6
View File
@@ -6,13 +6,20 @@ const API_SOCKET = "/tmp/api.sock";
const PORT = process.env.PORT || "8080"; const PORT = process.env.PORT || "8080";
const LOG_JSON = process.env.LOG_JSON || "true"; const LOG_JSON = process.env.LOG_JSON || "true";
function tryUnlink(path: string) {
try {
unlinkSync(path);
} catch (e) {
// ENOENT is expected (socket doesn't exist yet), other errors are unexpected
if (e instanceof Error && "code" in e && e.code !== "ENOENT") {
console.error(`Failed to cleanup ${path}: ${e.message}`);
}
}
}
function cleanup() { function cleanup() {
try { tryUnlink(BUN_SOCKET);
unlinkSync(BUN_SOCKET); tryUnlink(API_SOCKET);
} catch {}
try {
unlinkSync(API_SOCKET);
} catch {}
} }
// Cleanup on signals // Cleanup on signals
+15
View File
@@ -1,6 +1,7 @@
import type { Handle, HandleServerError } from "@sveltejs/kit"; import type { Handle, HandleServerError } from "@sveltejs/kit";
import { dev } from "$app/environment"; import { dev } from "$app/environment";
import { initLogger } from "$lib/logger"; import { initLogger } from "$lib/logger";
import { requestContext } from "$lib/server/context";
import { preCacheCollections } from "$lib/server/icons"; import { preCacheCollections } from "$lib/server/icons";
import { getLogger } from "@logtape/logtape"; import { getLogger } from "@logtape/logtape";
import { minify } from "html-minifier-terser"; import { minify } from "html-minifier-terser";
@@ -13,6 +14,18 @@ await preCacheCollections();
const logger = getLogger(["ssr", "error"]); const logger = getLogger(["ssr", "error"]);
export const handle: Handle = async ({ event, resolve }) => { export const handle: Handle = async ({ event, resolve }) => {
// Extract request ID from Rust proxy (should always be present in production)
const requestId = event.request.headers.get("x-request-id");
if (!requestId) {
const reqLogger = getLogger(["ssr", "request"]);
reqLogger.warn(
"Missing x-request-id header - request not routed through Rust proxy",
{
path: event.url.pathname,
},
);
}
if ( if (
dev && dev &&
event.url.pathname === "/.well-known/appspecific/com.chrome.devtools.json" event.url.pathname === "/.well-known/appspecific/com.chrome.devtools.json"
@@ -20,6 +33,7 @@ export const handle: Handle = async ({ event, resolve }) => {
return new Response(undefined, { status: 404 }); return new Response(undefined, { status: 404 });
} }
return requestContext.run({ requestId: requestId ?? undefined }, async () => {
const response = await resolve(event, { const response = await resolve(event, {
transformPageChunk: !dev transformPageChunk: !dev
? ({ html }) => ? ({ html }) =>
@@ -45,6 +59,7 @@ export const handle: Handle = async ({ event, resolve }) => {
}); });
return response; return response;
});
}; };
export const handleError: HandleServerError = async ({ export const handleError: HandleServerError = async ({
+10
View File
@@ -1,5 +1,6 @@
import { getLogger } from "@logtape/logtape"; import { getLogger } from "@logtape/logtape";
import { env } from "$env/dynamic/private"; import { env } from "$env/dynamic/private";
import { requestContext } from "$lib/server/context";
const logger = getLogger(["ssr", "lib", "api"]); const logger = getLogger(["ssr", "lib", "api"]);
@@ -39,6 +40,15 @@ function createSmartFetch(upstreamUrl: string) {
// Remove custom fetch property from options (not part of standard RequestInit) // Remove custom fetch property from options (not part of standard RequestInit)
delete (fetchOptions as Record<string, unknown>).fetch; delete (fetchOptions as Record<string, unknown>).fetch;
// Forward request ID to Rust API
const ctx = requestContext.getStore();
if (ctx?.requestId) {
fetchOptions.headers = {
...fetchOptions.headers,
"x-request-id": ctx.requestId,
};
}
// Add Unix socket path if needed // Add Unix socket path if needed
if (isUnixSocket) { if (isUnixSocket) {
fetchOptions.unix = upstreamUrl; fetchOptions.unix = upstreamUrl;
@@ -122,7 +122,10 @@
</h2> </h2>
<!-- USERNAME ROW: gap-1.5 controls spacing between elements --> <!-- USERNAME ROW: gap-1.5 controls spacing between elements -->
<div class="flex items-center gap-1.5 text-sm"> <div class="flex items-center gap-1.5 text-sm">
<span class="font-mono text-xs px-1.5 py-0.5 rounded border border-zinc-300 dark:border-zinc-700 bg-zinc-200/50 dark:bg-zinc-800/50 text-zinc-600 dark:text-zinc-400">{username}</span> <span
class="font-mono text-xs px-1.5 py-0.5 rounded border border-zinc-300 dark:border-zinc-700 bg-zinc-200/50 dark:bg-zinc-800/50 text-zinc-600 dark:text-zinc-400"
>{username}</span
>
<button <button
onclick={copyUsername} onclick={copyUsername}
class="p-0.5 rounded hover:bg-zinc-200 dark:hover:bg-zinc-800 transition-colors" class="p-0.5 rounded hover:bg-zinc-200 dark:hover:bg-zinc-800 transition-colors"
+3
View File
@@ -1,5 +1,6 @@
import { dev } from "$app/environment"; import { dev } from "$app/environment";
import { configure, getConsoleSink, type LogRecord } from "@logtape/logtape"; import { configure, getConsoleSink, type LogRecord } from "@logtape/logtape";
import { requestContext } from "$lib/server/context";
interface RailwayLogEntry { interface RailwayLogEntry {
timestamp: string; timestamp: string;
@@ -10,12 +11,14 @@ interface RailwayLogEntry {
} }
function railwayFormatter(record: LogRecord): string { function railwayFormatter(record: LogRecord): string {
const ctx = requestContext.getStore();
const categoryTarget = record.category.join(":"); const categoryTarget = record.category.join(":");
const entry: RailwayLogEntry = { const entry: RailwayLogEntry = {
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
level: record.level.toLowerCase(), level: record.level.toLowerCase(),
message: record.message.join(" "), message: record.message.join(" "),
target: categoryTarget ? `bun:${categoryTarget}` : "bun", target: categoryTarget ? `bun:${categoryTarget}` : "bun",
...(ctx?.requestId && { req_id: ctx.requestId }),
}; };
if (record.properties && Object.keys(record.properties).length > 0) { if (record.properties && Object.keys(record.properties).length > 0) {
+7
View File
@@ -0,0 +1,7 @@
import { AsyncLocalStorage } from "node:async_hooks";
export interface RequestContext {
requestId?: string;
}
export const requestContext = new AsyncLocalStorage<RequestContext>();
+3 -1
View File
@@ -29,6 +29,9 @@ function stripAnsi(str: string): string {
return str.replace(/\u001b\[[0-9;]*m/g, "").trim(); return str.replace(/\u001b\[[0-9;]*m/g, "").trim();
} }
// Module-level flag to prevent reconfiguration across plugin instantiations
let loggerConfigured = false;
export function jsonLogger(): Plugin { export function jsonLogger(): Plugin {
const useJsonLogs = const useJsonLogs =
process.env.LOG_JSON === "true" || process.env.LOG_JSON === "1"; process.env.LOG_JSON === "true" || process.env.LOG_JSON === "1";
@@ -39,7 +42,6 @@ export function jsonLogger(): Plugin {
}; };
} }
let loggerConfigured = false;
const configureLogger = async () => { const configureLogger = async () => {
if (loggerConfigured) return; if (loggerConfigured) return;
await configure({ await configure({