mirror of
https://github.com/Xevion/xevion.dev.git
synced 2026-01-31 10:26:52 -06:00
feat: add health checks, OG image generation, and R2 integration
- Implement health check system with caching and singleflight pattern - Add OG image generation via Satori with R2 storage backend - Configure Railway deployment with health check endpoint - Add connection pooling and Unix socket support for Bun SSR - Block external access to internal routes (/internal/*)
This commit is contained in:
+2
-2
@@ -1,8 +1,8 @@
|
||||
use axum::{
|
||||
http::{header, StatusCode, Uri},
|
||||
http::{StatusCode, Uri, header},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use include_dir::{include_dir, Dir};
|
||||
use include_dir::{Dir, include_dir};
|
||||
|
||||
static CLIENT_ASSETS: Dir = include_dir!("$CARGO_MANIFEST_DIR/web/build/client");
|
||||
|
||||
|
||||
+1
-1
@@ -3,7 +3,7 @@ use serde::Serialize;
|
||||
use serde_json::{Map, Value};
|
||||
use std::fmt;
|
||||
use time::macros::format_description;
|
||||
use time::{format_description::FormatItem, OffsetDateTime};
|
||||
use time::{OffsetDateTime, format_description::FormatItem};
|
||||
use tracing::field::{Field, Visit};
|
||||
use tracing::{Event, Level, Subscriber};
|
||||
use tracing_subscriber::fmt::format::Writer;
|
||||
|
||||
+119
@@ -0,0 +1,119 @@
|
||||
use futures::future::{BoxFuture, FutureExt, Shared};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
/// The state of the health check system
|
||||
enum HealthCheckState {
|
||||
/// No check has ever been performed
|
||||
Initial,
|
||||
|
||||
/// A check is currently in progress, all requests await this future
|
||||
Checking {
|
||||
future: Shared<BoxFuture<'static, bool>>,
|
||||
},
|
||||
|
||||
/// We have a cached result from a completed check
|
||||
Cached { healthy: bool, checked_at: Instant },
|
||||
}
|
||||
|
||||
/// Inner state that can be shared across futures
|
||||
struct HealthCheckerInner {
|
||||
state: Mutex<HealthCheckState>,
|
||||
had_success: AtomicBool,
|
||||
}
|
||||
|
||||
/// Manages health check state with caching and singleflight behavior
|
||||
pub struct HealthChecker {
|
||||
inner: Arc<HealthCheckerInner>,
|
||||
check_fn: Arc<dyn Fn() -> BoxFuture<'static, bool> + Send + Sync>,
|
||||
}
|
||||
|
||||
impl HealthChecker {
|
||||
/// Create a new health checker with the given check function
|
||||
pub fn new<F, Fut>(check_fn: F) -> Self
|
||||
where
|
||||
F: Fn() -> Fut + Send + Sync + 'static,
|
||||
Fut: std::future::Future<Output = bool> + Send + 'static,
|
||||
{
|
||||
Self {
|
||||
inner: Arc::new(HealthCheckerInner {
|
||||
state: Mutex::new(HealthCheckState::Initial),
|
||||
had_success: AtomicBool::new(false),
|
||||
}),
|
||||
check_fn: Arc::new(move || check_fn().boxed()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a health check with caching and singleflight behavior
|
||||
pub async fn check(&self) -> bool {
|
||||
let mut state = self.inner.state.lock().await;
|
||||
|
||||
match &*state {
|
||||
HealthCheckState::Initial => {
|
||||
// Start first check, transition to Checking
|
||||
let future = self.create_check_future();
|
||||
*state = HealthCheckState::Checking {
|
||||
future: future.clone(),
|
||||
};
|
||||
drop(state);
|
||||
future.await
|
||||
}
|
||||
HealthCheckState::Checking { future } => {
|
||||
// Join existing check (singleflight)
|
||||
let future = future.clone();
|
||||
drop(state);
|
||||
future.await
|
||||
}
|
||||
HealthCheckState::Cached {
|
||||
healthy,
|
||||
checked_at,
|
||||
} => {
|
||||
// Determine cache window based on startup status
|
||||
let window = if self.inner.had_success.load(Ordering::Relaxed) {
|
||||
Duration::from_secs(15)
|
||||
} else {
|
||||
Duration::from_secs(1)
|
||||
};
|
||||
|
||||
if checked_at.elapsed() < window {
|
||||
// Serve from cache
|
||||
return *healthy;
|
||||
}
|
||||
|
||||
// Cache stale, start new check
|
||||
let future = self.create_check_future();
|
||||
*state = HealthCheckState::Checking {
|
||||
future: future.clone(),
|
||||
};
|
||||
drop(state);
|
||||
future.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a shared future that performs the check and updates state
|
||||
fn create_check_future(&self) -> Shared<BoxFuture<'static, bool>> {
|
||||
let inner = Arc::clone(&self.inner);
|
||||
let check_fn = Arc::clone(&self.check_fn);
|
||||
|
||||
async move {
|
||||
let result = (check_fn)().await;
|
||||
|
||||
// Transition: Checking → Cached
|
||||
*inner.state.lock().await = HealthCheckState::Cached {
|
||||
healthy: result,
|
||||
checked_at: Instant::now(),
|
||||
};
|
||||
|
||||
if result {
|
||||
inner.had_success.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
.boxed()
|
||||
.shared()
|
||||
}
|
||||
}
|
||||
+156
-32
@@ -9,16 +9,21 @@ use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tower_http::{cors::CorsLayer, limit::RequestBodyLimitLayer, trace::TraceLayer};
|
||||
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
mod assets;
|
||||
mod config;
|
||||
mod formatter;
|
||||
mod health;
|
||||
mod middleware;
|
||||
mod og;
|
||||
mod r2;
|
||||
use assets::serve_embedded_asset;
|
||||
use config::{Args, ListenAddr};
|
||||
use formatter::{CustomJsonFormatter, CustomPrettyFormatter};
|
||||
use health::HealthChecker;
|
||||
use middleware::RequestIdLayer;
|
||||
|
||||
fn init_tracing() {
|
||||
@@ -69,14 +74,68 @@ async fn main() {
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
// Create HTTP client for TCP connections with optimized pool settings
|
||||
let http_client = reqwest::Client::builder()
|
||||
.pool_max_idle_per_host(8)
|
||||
.pool_idle_timeout(Duration::from_secs(600)) // 10 minutes
|
||||
.tcp_keepalive(Some(Duration::from_secs(60)))
|
||||
.timeout(Duration::from_secs(5)) // Default timeout for SSR
|
||||
.connect_timeout(Duration::from_secs(3))
|
||||
.build()
|
||||
.expect("Failed to create HTTP client");
|
||||
|
||||
// Create Unix socket client if downstream is a Unix socket
|
||||
let unix_client = if args.downstream.starts_with('/') || args.downstream.starts_with("./") {
|
||||
let path = PathBuf::from(&args.downstream);
|
||||
Some(
|
||||
reqwest::Client::builder()
|
||||
.pool_max_idle_per_host(8)
|
||||
.pool_idle_timeout(Duration::from_secs(600)) // 10 minutes
|
||||
.timeout(Duration::from_secs(5)) // Default timeout for SSR
|
||||
.connect_timeout(Duration::from_secs(3))
|
||||
.unix_socket(path)
|
||||
.build()
|
||||
.expect("Failed to create Unix socket client"),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Create health checker
|
||||
let downstream_url_for_health = args.downstream.clone();
|
||||
let http_client_for_health = http_client.clone();
|
||||
let unix_client_for_health = unix_client.clone();
|
||||
|
||||
let health_checker = Arc::new(HealthChecker::new(move || {
|
||||
let downstream_url = downstream_url_for_health.clone();
|
||||
let http_client = http_client_for_health.clone();
|
||||
let unix_client = unix_client_for_health.clone();
|
||||
|
||||
async move { perform_health_check(downstream_url, http_client, unix_client).await }
|
||||
}));
|
||||
|
||||
let state = Arc::new(AppState {
|
||||
downstream_url: args.downstream.clone(),
|
||||
http_client,
|
||||
unix_client,
|
||||
health_checker,
|
||||
});
|
||||
|
||||
// Regenerate common OGP images on startup
|
||||
tokio::spawn({
|
||||
let state = state.clone();
|
||||
async move {
|
||||
og::regenerate_common_images(state).await;
|
||||
}
|
||||
});
|
||||
|
||||
let app = Router::new()
|
||||
.nest("/api", api_routes())
|
||||
.route("/api/", any(api_root_404_handler))
|
||||
.route("/_app/{*path}", axum::routing::get(serve_embedded_asset).head(serve_embedded_asset))
|
||||
.route(
|
||||
"/_app/{*path}",
|
||||
axum::routing::get(serve_embedded_asset).head(serve_embedded_asset),
|
||||
)
|
||||
.fallback(isr_handler)
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.layer(RequestIdLayer::new(args.trust_request_id.clone()))
|
||||
@@ -131,19 +190,24 @@ async fn main() {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
pub struct AppState {
|
||||
downstream_url: String,
|
||||
http_client: reqwest::Client,
|
||||
unix_client: Option<reqwest::Client>,
|
||||
health_checker: Arc<HealthChecker>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ProxyError {
|
||||
pub enum ProxyError {
|
||||
Network(reqwest::Error),
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ProxyError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ProxyError::Network(e) => write!(f, "Network error: {}", e),
|
||||
ProxyError::Other(s) => write!(f, "{}", s),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -178,8 +242,14 @@ fn is_page_route(path: &str) -> bool {
|
||||
fn api_routes() -> Router<Arc<AppState>> {
|
||||
Router::new()
|
||||
.route("/", any(api_root_404_handler))
|
||||
.route("/health", axum::routing::get(health_handler).head(health_handler))
|
||||
.route("/projects", axum::routing::get(projects_handler).head(projects_handler))
|
||||
.route(
|
||||
"/health",
|
||||
axum::routing::get(health_handler).head(health_handler),
|
||||
)
|
||||
.route(
|
||||
"/projects",
|
||||
axum::routing::get(projects_handler).head(projects_handler),
|
||||
)
|
||||
.fallback(api_404_and_method_handler)
|
||||
}
|
||||
|
||||
@@ -187,20 +257,30 @@ async fn api_root_404_handler(uri: axum::http::Uri) -> impl IntoResponse {
|
||||
api_404_handler(uri).await
|
||||
}
|
||||
|
||||
async fn health_handler() -> impl IntoResponse {
|
||||
(StatusCode::OK, "OK")
|
||||
async fn health_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
let healthy = state.health_checker.check().await;
|
||||
|
||||
if healthy {
|
||||
(StatusCode::OK, "OK")
|
||||
} else {
|
||||
(StatusCode::SERVICE_UNAVAILABLE, "Unhealthy")
|
||||
}
|
||||
}
|
||||
|
||||
async fn api_404_and_method_handler(req: Request) -> impl IntoResponse {
|
||||
let method = req.method();
|
||||
let uri = req.uri();
|
||||
let path = uri.path();
|
||||
|
||||
if method != axum::http::Method::GET && method != axum::http::Method::HEAD && method != axum::http::Method::OPTIONS {
|
||||
let content_type = req.headers()
|
||||
|
||||
if method != axum::http::Method::GET
|
||||
&& method != axum::http::Method::HEAD
|
||||
&& method != axum::http::Method::OPTIONS
|
||||
{
|
||||
let content_type = req
|
||||
.headers()
|
||||
.get(axum::http::header::CONTENT_TYPE)
|
||||
.and_then(|v| v.to_str().ok());
|
||||
|
||||
|
||||
if let Some(ct) = content_type {
|
||||
if !ct.starts_with("application/json") {
|
||||
return (
|
||||
@@ -209,9 +289,13 @@ async fn api_404_and_method_handler(req: Request) -> impl IntoResponse {
|
||||
"error": "Unsupported media type",
|
||||
"message": "API endpoints only accept application/json"
|
||||
})),
|
||||
).into_response();
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
} else if method == axum::http::Method::POST || method == axum::http::Method::PUT || method == axum::http::Method::PATCH {
|
||||
} else if method == axum::http::Method::POST
|
||||
|| method == axum::http::Method::PUT
|
||||
|| method == axum::http::Method::PATCH
|
||||
{
|
||||
// POST/PUT/PATCH require Content-Type header
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
@@ -219,10 +303,11 @@ async fn api_404_and_method_handler(req: Request) -> impl IntoResponse {
|
||||
"error": "Missing Content-Type header",
|
||||
"message": "Content-Type: application/json is required"
|
||||
})),
|
||||
).into_response();
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Route not found
|
||||
tracing::warn!(path = %path, method = %method, "API route not found");
|
||||
(
|
||||
@@ -231,7 +316,8 @@ async fn api_404_and_method_handler(req: Request) -> impl IntoResponse {
|
||||
"error": "Not found",
|
||||
"path": path
|
||||
})),
|
||||
).into_response()
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
async fn api_404_handler(uri: axum::http::Uri) -> impl IntoResponse {
|
||||
@@ -239,7 +325,7 @@ async fn api_404_handler(uri: axum::http::Uri) -> impl IntoResponse {
|
||||
.uri(uri)
|
||||
.body(axum::body::Body::empty())
|
||||
.unwrap();
|
||||
|
||||
|
||||
api_404_and_method_handler(req).await
|
||||
}
|
||||
|
||||
@@ -306,7 +392,7 @@ async fn isr_handler(State(state): State<Arc<AppState>>, req: Request) -> Respon
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
axum::http::header::ALLOW,
|
||||
axum::http::HeaderValue::from_static("GET, HEAD, OPTIONS")
|
||||
axum::http::HeaderValue::from_static("GET, HEAD, OPTIONS"),
|
||||
);
|
||||
return (
|
||||
StatusCode::METHOD_NOT_ALLOWED,
|
||||
@@ -315,19 +401,22 @@ async fn isr_handler(State(state): State<Arc<AppState>>, req: Request) -> Respon
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
|
||||
let is_head = method == axum::http::Method::HEAD;
|
||||
|
||||
if path.starts_with("/api/") {
|
||||
tracing::error!("API request reached ISR handler - routing bug!");
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Internal routing error",
|
||||
)
|
||||
.into_response();
|
||||
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal routing error").into_response();
|
||||
}
|
||||
|
||||
let bun_url = if state.downstream_url.starts_with('/') || state.downstream_url.starts_with("./") {
|
||||
// Block internal routes from external access
|
||||
if path.starts_with("/internal/") {
|
||||
tracing::warn!(path = %path, "Attempted access to internal route");
|
||||
return (StatusCode::NOT_FOUND, "Not found").into_response();
|
||||
}
|
||||
|
||||
let bun_url = if state.downstream_url.starts_with('/') || state.downstream_url.starts_with("./")
|
||||
{
|
||||
if query.is_empty() {
|
||||
format!("http://localhost{}", path)
|
||||
} else {
|
||||
@@ -415,14 +504,10 @@ async fn proxy_to_bun(
|
||||
url: &str,
|
||||
state: Arc<AppState>,
|
||||
) -> Result<(StatusCode, HeaderMap, String), ProxyError> {
|
||||
let client = if state.downstream_url.starts_with('/') || state.downstream_url.starts_with("./") {
|
||||
let path = PathBuf::from(&state.downstream_url);
|
||||
reqwest::Client::builder()
|
||||
.unix_socket(path)
|
||||
.build()
|
||||
.map_err(ProxyError::Network)?
|
||||
let client = if state.unix_client.is_some() {
|
||||
state.unix_client.as_ref().unwrap()
|
||||
} else {
|
||||
reqwest::Client::new()
|
||||
&state.http_client
|
||||
};
|
||||
|
||||
let response = client.get(url).send().await.map_err(ProxyError::Network)?;
|
||||
@@ -450,3 +535,42 @@ async fn proxy_to_bun(
|
||||
let body = response.text().await.map_err(ProxyError::Network)?;
|
||||
Ok((status, headers, body))
|
||||
}
|
||||
|
||||
async fn perform_health_check(
|
||||
downstream_url: String,
|
||||
http_client: reqwest::Client,
|
||||
unix_client: Option<reqwest::Client>,
|
||||
) -> bool {
|
||||
let url = if downstream_url.starts_with('/') || downstream_url.starts_with("./") {
|
||||
"http://localhost/internal/health".to_string()
|
||||
} else {
|
||||
format!("{}/internal/health", downstream_url)
|
||||
};
|
||||
|
||||
let client = if unix_client.is_some() {
|
||||
unix_client.as_ref().unwrap()
|
||||
} else {
|
||||
&http_client
|
||||
};
|
||||
|
||||
match tokio::time::timeout(Duration::from_secs(5), client.get(&url).send()).await {
|
||||
Ok(Ok(response)) => {
|
||||
let is_success = response.status().is_success();
|
||||
if !is_success {
|
||||
tracing::warn!(
|
||||
status = response.status().as_u16(),
|
||||
"Health check failed: Bun returned non-success status"
|
||||
);
|
||||
}
|
||||
is_success
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
tracing::error!(error = %err, "Health check failed: cannot reach Bun");
|
||||
false
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!("Health check failed: timeout after 5s");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+4
-7
@@ -1,9 +1,4 @@
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::Request,
|
||||
http::HeaderName,
|
||||
response::Response,
|
||||
};
|
||||
use axum::{body::Body, extract::Request, http::HeaderName, response::Response};
|
||||
use std::task::{Context, Poll};
|
||||
use tower::{Layer, Service};
|
||||
|
||||
@@ -44,7 +39,9 @@ where
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
type Future = std::pin::Pin<
|
||||
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{AppState, r2::R2Client};
|
||||
|
||||
/// Discriminated union matching TypeScript's OGImageSpec in web/src/lib/og-types.ts
|
||||
///
|
||||
/// IMPORTANT: Keep this in sync with the TypeScript definition.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "lowercase")]
|
||||
pub enum OGImageSpec {
|
||||
Index,
|
||||
Projects,
|
||||
Project { id: String },
|
||||
}
|
||||
|
||||
impl OGImageSpec {
|
||||
/// Get the R2 storage key for this spec
|
||||
pub fn r2_key(&self) -> String {
|
||||
match self {
|
||||
OGImageSpec::Index => "og/index.png".to_string(),
|
||||
OGImageSpec::Projects => "og/projects.png".to_string(),
|
||||
OGImageSpec::Project { id } => format!("og/project/{}.png", id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate an OG image by calling Bun's internal endpoint and upload to R2
|
||||
#[tracing::instrument(skip(state), fields(r2_key))]
|
||||
pub async fn generate_og_image(spec: &OGImageSpec, state: Arc<AppState>) -> Result<(), String> {
|
||||
let r2 = R2Client::get()
|
||||
.await
|
||||
.ok_or_else(|| "R2 client not available".to_string())?;
|
||||
|
||||
let r2_key = spec.r2_key();
|
||||
tracing::Span::current().record("r2_key", &r2_key);
|
||||
|
||||
// Call Bun's internal endpoint
|
||||
let bun_url = if state.downstream_url.starts_with('/') || state.downstream_url.starts_with("./")
|
||||
{
|
||||
"http://localhost/internal/ogp".to_string()
|
||||
} else {
|
||||
format!("{}/internal/ogp", state.downstream_url)
|
||||
};
|
||||
|
||||
let client = state.unix_client.as_ref().unwrap_or(&state.http_client);
|
||||
|
||||
let response = client
|
||||
.post(&bun_url)
|
||||
.json(spec)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to call Bun: {}", e))?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let error_text = response.text().await.unwrap_or_default();
|
||||
return Err(format!("Bun returned status {}: {}", status, error_text));
|
||||
}
|
||||
|
||||
let bytes = response
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to read response: {}", e))?
|
||||
.to_vec();
|
||||
|
||||
r2.put_object(&r2_key, bytes)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to upload to R2: {}", e))?;
|
||||
|
||||
tracing::info!(r2_key, "OG image generated and uploaded");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if an OG image exists in R2
|
||||
pub async fn og_image_exists(spec: &OGImageSpec) -> bool {
|
||||
if let Some(r2) = R2Client::get().await {
|
||||
r2.object_exists(&spec.r2_key()).await
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure an OG image exists, generating if necessary
|
||||
pub async fn ensure_og_image(spec: &OGImageSpec, state: Arc<AppState>) -> Result<(), String> {
|
||||
if og_image_exists(spec).await {
|
||||
tracing::debug!(r2_key = spec.r2_key(), "OG image already exists");
|
||||
return Ok(());
|
||||
}
|
||||
generate_og_image(spec, state).await
|
||||
}
|
||||
|
||||
/// Regenerate common OG images (index, projects) on server startup
|
||||
pub async fn regenerate_common_images(state: Arc<AppState>) {
|
||||
tracing::info!("Regenerating common OG images");
|
||||
|
||||
let specs = vec![OGImageSpec::Index, OGImageSpec::Projects];
|
||||
|
||||
for spec in specs {
|
||||
match generate_og_image(&spec, state.clone()).await {
|
||||
Ok(()) => {
|
||||
tracing::info!(r2_key = spec.r2_key(), "Successfully regenerated OG image");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(r2_key = spec.r2_key(), error = %e, "Failed to regenerate OG image");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Finished regenerating common OG images");
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
use aws_config::BehaviorVersion;
|
||||
use aws_sdk_s3::{
|
||||
Client,
|
||||
config::{Credentials, Region},
|
||||
primitives::ByteStream,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
static R2_CLIENT: OnceCell<Arc<R2Client>> = OnceCell::const_new();
|
||||
|
||||
pub struct R2Client {
|
||||
client: Client,
|
||||
bucket: String,
|
||||
}
|
||||
|
||||
impl R2Client {
|
||||
pub async fn new() -> Result<Self, String> {
|
||||
let account_id =
|
||||
std::env::var("R2_ACCOUNT_ID").map_err(|_| "R2_ACCOUNT_ID not set".to_string())?;
|
||||
let access_key_id = std::env::var("R2_ACCESS_KEY_ID")
|
||||
.map_err(|_| "R2_ACCESS_KEY_ID not set".to_string())?;
|
||||
let secret_access_key = std::env::var("R2_SECRET_ACCESS_KEY")
|
||||
.map_err(|_| "R2_SECRET_ACCESS_KEY not set".to_string())?;
|
||||
let bucket = std::env::var("R2_BUCKET").map_err(|_| "R2_BUCKET not set".to_string())?;
|
||||
|
||||
let endpoint = format!("https://{}.r2.cloudflarestorage.com", account_id);
|
||||
|
||||
let credentials_provider =
|
||||
Credentials::new(access_key_id, secret_access_key, None, None, "static");
|
||||
|
||||
let config = aws_config::defaults(BehaviorVersion::latest())
|
||||
.region(Region::new("auto"))
|
||||
.endpoint_url(endpoint)
|
||||
.credentials_provider(credentials_provider)
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let client = Client::new(&config);
|
||||
|
||||
Ok(Self { client, bucket })
|
||||
}
|
||||
|
||||
pub async fn get() -> Option<Arc<R2Client>> {
|
||||
R2_CLIENT
|
||||
.get_or_try_init(|| async {
|
||||
match R2Client::new().await {
|
||||
Ok(client) => Ok(Arc::new(client)),
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "Failed to initialize R2 client, OG images will not be cached");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.ok()
|
||||
.cloned()
|
||||
}
|
||||
|
||||
pub async fn get_object(&self, key: &str) -> Result<Vec<u8>, String> {
|
||||
let result = self
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to get object from R2: {}", e))?;
|
||||
|
||||
let bytes = result
|
||||
.body
|
||||
.collect()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to read object body: {}", e))?
|
||||
.into_bytes()
|
||||
.to_vec();
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
pub async fn put_object(&self, key: &str, body: Vec<u8>) -> Result<(), String> {
|
||||
self.client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(key)
|
||||
.body(ByteStream::from(body))
|
||||
.content_type("image/png")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to put object to R2: {}", e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn object_exists(&self, key: &str) -> bool {
|
||||
self.client
|
||||
.head_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user