feat: implement r2 image upload for avatars

This commit is contained in:
Ryan Walters
2025-09-18 13:18:14 -05:00
parent 56e02e7253
commit 7f9d3e9158
8 changed files with 1932 additions and 114 deletions

View File

@@ -3,7 +3,7 @@ name = "pacman-server"
version = "0.4.0"
authors.workspace = true
edition.workspace = true
rust-version = "1.86.0"
rust-version = "1.87.0"
description = "A leaderboard API for the Pac-Man game"
readme.workspace = true
homepage.workspace = true
@@ -35,6 +35,12 @@ async-trait = "0.1"
jsonwebtoken = { version = "9.3", default-features = false }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] }
tracing-futures = { version = "0.2.5", features = ["tokio"] }
time = { version = "0.3", features = ["macros", "formatting"] }
yansi = "1"
s3-tokio = { version = "0.39.6", default-features = false }
rustls = { version = "0.23", features = ["ring"] }
fast_image_resize = { version = "5.3", features = ["image"] }
image = { version = "0.25", features = ["png", "jpeg"] }
sha2 = "0.10"
# validator = { version = "0.16", features = ["derive"] }

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::pool::PgPool;
use crate::{auth::AuthRegistry, config::Config};
use crate::{auth::AuthRegistry, config::Config, image::ImageStorage};
#[derive(Debug, Clone)]
pub struct Health {
@@ -35,27 +35,37 @@ impl Health {
#[derive(Clone)]
pub struct AppState {
pub config: Arc<Config>,
pub auth: Arc<AuthRegistry>,
pub sessions: Arc<DashMap<String, crate::auth::provider::AuthUser>>,
pub jwt_encoding_key: Arc<EncodingKey>,
pub jwt_decoding_key: Arc<DecodingKey>,
pub db: Arc<PgPool>,
pub health: Arc<RwLock<Health>>,
pub image_storage: Arc<ImageStorage>,
}
impl AppState {
pub fn new(config: Config, auth: AuthRegistry, db: PgPool) -> Self {
let jwt_secret = config.jwt_secret.clone();
// Initialize image storage
let image_storage = match ImageStorage::from_config(&config) {
Ok(storage) => Arc::new(storage),
Err(e) => {
tracing::warn!(error = %e, "Failed to initialize image storage, avatar processing will be disabled");
// Create a dummy storage that will fail gracefully
Arc::new(ImageStorage::new(&config, "dummy").unwrap_or_else(|_| panic!("Failed to create dummy image storage")))
}
};
Self {
config: Arc::new(config),
auth: Arc::new(auth),
sessions: Arc::new(DashMap::new()),
jwt_encoding_key: Arc::new(EncodingKey::from_secret(jwt_secret.as_bytes())),
jwt_decoding_key: Arc::new(DecodingKey::from_secret(jwt_secret.as_bytes())),
db: Arc::new(db),
health: Arc::new(RwLock::new(Health::new())),
image_storage,
}
}
}

View File

@@ -16,8 +16,8 @@ pub struct Config {
// S3 Credentials
pub s3_access_key: String,
pub s3_secret_access_key: String,
pub s3_endpoint: String,
pub s3_region: String,
pub s3_bucket_name: String,
pub s3_public_base_url: String,
// Server Details
#[serde(default = "default_port")]
pub port: u16,

183
pacman-server/src/image.rs Normal file
View File

@@ -0,0 +1,183 @@
use std::sync::Arc;
use image::codecs::png::PngEncoder;
use s3::Bucket;
use sha2::Digest;
use tracing::trace;
use crate::config::Config;
/// Minimal S3-backed image storage. This keeps things intentionally simple for now:
/// - construct from existing `Config`
/// - upload raw bytes under a key
/// - upload a local file by path (reads whole file into memory)
/// - generate a simple presigned GET URL
/// - process avatars with resizing and upload
///
/// Backed by `s3-tokio` (hyper 1 + rustls) and compatible with S3/R2/MinIO endpoints.
#[derive(Clone)]
pub struct ImageStorage {
bucket: Arc<s3::Bucket>,
public_base_url: String,
}
impl ImageStorage {
/// Create a new storage for a specific `bucket_name` using settings from `Config`.
///
/// This uses a custom region + endpoint so it works across AWS S3 and compatible services
/// such as Cloudflare R2 and MinIO.
pub fn new(config: &Config, bucket_name: impl Into<String>) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let credentials = s3::creds::Credentials::new(
Some(&config.s3_access_key),
Some(&config.s3_secret_access_key),
None, // security token
None, // session token
None, // profile
)?;
let bucket = Bucket::new(
&bucket_name.into(),
s3::Region::R2 {
account_id: "f188bf93079278e7bbc58de9b3d80693".to_string(),
},
credentials,
)?
.with_path_style();
Ok(Self {
bucket: Arc::new(bucket),
public_base_url: config.s3_public_base_url.clone(),
})
}
/// Upload a byte slice to `key` with optional content type.
///
/// Returns the ETag (if present) from the server response.
pub async fn upload_bytes(
&self,
key: &str,
bytes: impl AsRef<[u8]>,
content_type: Option<&str>,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let data = bytes.as_ref();
let content_type = content_type.unwrap_or("application/octet-stream");
// Prefer the content-type variant for correct metadata
let status = {
let response = self.bucket.put_object_with_content_type(key, data, content_type).await?;
response.status_code()
};
if (200..300).contains(&status) {
// s3-tokio returns headers separately; attempt to pull the ETag if available
// Note: the current API returns (status, headers) where headers is `http::HeaderMap`.
// Some providers omit ETag on PUT; we handle that by returning `None`.
Ok(None)
} else {
Err(format!("upload failed with status {}", status).into())
}
}
/// Generate a simple presigned GET URL valid for `expires_in_seconds`.
#[allow(dead_code)]
pub fn presign_get(&self, key: &str, expires_in_seconds: u32) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let url = self.bucket.presign_get(key, expires_in_seconds, None)?;
Ok(url)
}
/// Process and upload an avatar from a URL.
///
/// Downloads the image, resizes it to 512x512 (original) and 32x32 (mini),
/// then uploads both versions to S3. Returns the public URLs for both images.
pub async fn process_avatar(
&self,
user_public_id: &str,
avatar_url: &str,
) -> Result<AvatarUrls, Box<dyn std::error::Error + Send + Sync>> {
// Download the avatar image
let response = reqwest::get(avatar_url).await?;
if !response.status().is_success() {
return Err(format!("Failed to download avatar: {}", response.status()).into());
}
let image_bytes = response.bytes().await?;
trace!(bytes = image_bytes.len(), "Downloaded avatar");
// Decode the image
let img = image::load_from_memory(&image_bytes)?;
let img_rgba = img.to_rgba8();
// Generate a simple hash for the avatar (using the URL for now)
let avatar_hash = format!("{:x}", sha2::Sha256::digest(avatar_url.as_bytes()));
trace!(
width = img_rgba.width(),
height = img_rgba.height(),
hash = avatar_hash,
"Avatar image decoded"
);
// Process original (512x512 max, square)
let original_key = format!("avatars/{}/{}.original.png", user_public_id, avatar_hash);
let original_png = self.resize_to_square_png(&img_rgba, 512)?;
self.upload_bytes(&original_key, &original_png, Some("image/png")).await?;
trace!(key = original_key, "Uploaded original avatar");
// Process mini (32x32)
let mini_key = format!("avatars/{}/{}.mini.png", user_public_id, avatar_hash);
let mini_png = self.resize_to_square_png(&img_rgba, 32)?;
self.upload_bytes(&mini_key, &mini_png, Some("image/png")).await?;
trace!(key = mini_key, "Uploaded mini avatar");
Ok(AvatarUrls {
original_url: format!("{}/{}", self.public_base_url, original_key),
mini_url: format!("{}/{}", self.public_base_url, mini_key),
})
}
/// Resize an RGBA image to a square of the specified size, maintaining aspect ratio.
fn resize_to_square_png(
&self,
img: &image::RgbaImage,
target_size: u32,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
let (width, height) = img.dimensions();
// Calculate dimensions for square crop (center crop)
let size = width.min(height);
let start_x = (width - size) / 2;
let start_y = (height - size) / 2;
// Crop to square
let cropped = image::imageops::crop_imm(img, start_x, start_y, size, size).to_image();
// Resize to target size
let resized = image::imageops::resize(&cropped, target_size, target_size, image::imageops::FilterType::Lanczos3);
// Encode as PNG
let mut bytes: Vec<u8> = Vec::new();
let cursor = std::io::Cursor::new(&mut bytes);
// Write the resized image to the cursor
resized.write_with_encoder(PngEncoder::new(cursor))?;
Ok(bytes)
}
}
/// URLs for processed avatar images
#[derive(Debug, Clone)]
pub struct AvatarUrls {
pub original_url: String,
pub mini_url: String,
}
impl ImageStorage {
/// Create a new storage using the default bucket from `Config`.
pub fn from_config(config: &Config) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Self::new(config, &config.s3_bucket_name)
}
}
// References:
// - Example (R2): https://github.com/FemLolStudio/s3-tokio/blob/master/examples/r2-tokio.rs
// - Crate docs: https://lib.rs/crates/s3-tokio

View File

@@ -1,23 +1,24 @@
use crate::{app::AppState, auth::AuthRegistry, config::Config};
use axum::{routing::get, Router};
use axum_cookie::CookieLayer;
use std::time::Instant;
use std::{sync::Arc, time::Duration};
use tracing::{info, trace, warn};
use crate::{app::AppState, auth::AuthRegistry, config::Config};
mod formatter;
mod logging;
mod routes;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{watch, Notify};
mod app;
mod auth;
mod config;
mod data;
mod errors;
mod formatter;
mod image;
mod logging;
mod routes;
mod session;
use std::time::Instant;
use std::{sync::Arc, time::Duration};
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{watch, Notify};
use tracing::{info, trace, warn};
// Constant value for the Server header: "<crate>/<version>"
const SERVER_HEADER_VALUE: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));

View File

@@ -5,7 +5,7 @@ use axum::{
};
use axum_cookie::CookieManager;
use serde::Serialize;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, info, instrument, span, trace, warn};
use crate::data::user as user_repo;
use crate::{app::AppState, errors::ErrorResponse, session};
@@ -51,7 +51,6 @@ pub async fn oauth_authorize_handler(
resp
}
#[instrument(skip_all, fields(provider = %provider))]
pub async fn oauth_callback_handler(
State(app_state): State<AppState>,
Path(provider): Path<String>,
@@ -78,6 +77,8 @@ pub async fn oauth_callback_handler(
return ErrorResponse::bad_request("invalid_request", Some("missing state".into())).into_response();
};
span!(tracing::Level::DEBUG, "oauth_callback_handler", provider = %provider, code = %code, state = %state);
// Handle callback from provider
let user = match prov.handle_callback(code, state).await {
Ok(u) => u,
@@ -96,6 +97,8 @@ pub async fn oauth_callback_handler(
// Determine linking intent with a valid session
let is_link = if link_cookie.as_deref() == Some("1") {
debug!("Link intent present");
match session::get_session_token(&cookie).and_then(|t| session::decode_jwt(&t, &app_state.jwt_decoding_key)) {
Some(c) => {
// Perform linking with current session user
@@ -149,14 +152,35 @@ pub async fn oauth_callback_handler(
// this may lock them out until the provider is reactivated or a manual admin link is performed.
match user_repo::get_oauth_account_count_for_user(&app_state.db, existing.id).await {
Ok(count) if count > 0 => {
return ErrorResponse::bad_request(
"account_exists",
Some(format!(
"An account already exists for {}. Sign in with your existing provider, then visit /auth/{}?link=true to add this provider.",
e, provider
)),
)
.into_response();
// Check if the "new" provider is already linked to the user
match user_repo::find_user_by_provider_id(&app_state.db, &provider, &user.id).await {
Ok(Some(_)) => {
debug!(
%provider,
%existing.id,
"Provider already linked to user, signing in normally");
}
Ok(None) => {
debug!(
%provider,
%existing.id,
"Provider not linked to user, failing"
);
return ErrorResponse::bad_request(
"account_exists",
Some(format!(
"An account already exists for {}. Sign in with your existing provider, then visit /auth/{}?link=true to add this provider.",
e, provider
)),
)
.into_response();
}
Err(e) => {
warn!(error = %e, %provider, "Failed to find user by provider ID");
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None)
.into_response();
}
}
}
Ok(_) => {
// No providers linked yet: safe to associate this provider
@@ -176,7 +200,6 @@ pub async fn oauth_callback_handler(
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None)
.into_response();
}
existing
}
Err(e) => {
warn!(error = %e, "Failed to count oauth accounts for user");
@@ -203,7 +226,7 @@ pub async fn oauth_callback_handler(
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None)
.into_response();
}
}
};
}
} else {
// No email available: disallow sign-in for safety
@@ -220,10 +243,38 @@ pub async fn oauth_callback_handler(
session::set_session_cookie(&cookie, &session_token);
info!(%provider, "Signed in successfully");
// Process avatar asynchronously (don't block the response)
if let Some(avatar_url) = user.avatar_url.as_deref() {
let image_storage = app_state.image_storage.clone();
let user_public_id = user.id.clone();
let avatar_url = avatar_url.to_string();
debug!(%user_public_id, %avatar_url, "Processing avatar");
tokio::spawn(async move {
match image_storage.process_avatar(&user_public_id, &avatar_url).await {
Ok(avatar_urls) => {
info!(
user_id = %user_public_id,
original_url = %avatar_urls.original_url,
mini_url = %avatar_urls.mini_url,
"Avatar processed successfully"
);
}
Err(e) => {
warn!(
user_id = %user_public_id,
avatar_url = %avatar_url,
error = %e,
"Failed to process avatar"
);
}
}
});
}
(StatusCode::FOUND, Redirect::to("/profile")).into_response()
}
#[instrument(skip_all)]
pub async fn profile_handler(State(app_state): State<AppState>, cookie: CookieManager) -> axum::response::Response {
let Some(token_str) = session::get_session_token(&cookie) else {
debug!("Missing session cookie");