feat: normalize provider details into oauth_accounts table, auth linking intent, provider array in profile response

This commit is contained in:
Ryan Walters
2025-09-17 11:17:31 -05:00
parent 1cf3b901e8
commit c12dc11d8f
6 changed files with 316 additions and 40 deletions

View File

@@ -0,0 +1,18 @@
-- OAuth accounts linked to a single user
CREATE TABLE IF NOT EXISTS oauth_accounts (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
provider TEXT NOT NULL,
provider_user_id TEXT NOT NULL,
email TEXT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (provider, provider_user_id)
);
-- Ensure we can look up by email efficiently
CREATE INDEX IF NOT EXISTS idx_oauth_accounts_email ON oauth_accounts (email);
-- Optional: ensure users email uniqueness if desired; keep NULLs allowed
ALTER TABLE users
ADD CONSTRAINT users_email_unique UNIQUE (email);

View File

@@ -0,0 +1,15 @@
-- Move provider-specific profile fields from users to oauth_accounts
-- Add provider profile fields to oauth_accounts
ALTER TABLE oauth_accounts
ADD COLUMN IF NOT EXISTS username TEXT,
ADD COLUMN IF NOT EXISTS display_name TEXT NULL,
ADD COLUMN IF NOT EXISTS avatar_url TEXT NULL;
-- Drop provider-specific fields from users (keep email as canonical)
ALTER TABLE users
DROP COLUMN IF EXISTS provider,
DROP COLUMN IF EXISTS provider_user_id,
DROP COLUMN IF EXISTS username,
DROP COLUMN IF EXISTS display_name,
DROP COLUMN IF EXISTS avatar_url;

View File

@@ -4,49 +4,115 @@ use sqlx::FromRow;
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct User {
pub id: i64,
pub email: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct OAuthAccount {
pub id: i64,
pub user_id: i64,
pub provider: String,
pub provider_user_id: String,
pub username: String,
pub display_name: Option<String>,
pub email: Option<String>,
pub username: Option<String>,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
pub async fn upsert_user(
pub async fn get_user_by_email(pool: &sqlx::PgPool, email: &str) -> Result<Option<User>, sqlx::Error> {
sqlx::query_as::<_, User>(
r#"
SELECT id, email, created_at, updated_at
FROM users WHERE email = $1
"#,
)
.bind(email)
.fetch_optional(pool)
.await
}
pub async fn link_oauth_account(
pool: &sqlx::PgPool,
user_id: i64,
provider: &str,
provider_user_id: &str,
email: Option<&str>,
username: Option<&str>,
display_name: Option<&str>,
avatar_url: Option<&str>,
) -> Result<OAuthAccount, sqlx::Error> {
sqlx::query_as::<_, OAuthAccount>(
r#"
INSERT INTO oauth_accounts (user_id, provider, provider_user_id, email, username, display_name, avatar_url)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (provider, provider_user_id)
DO UPDATE SET email = EXCLUDED.email, username = EXCLUDED.username, display_name = EXCLUDED.display_name, avatar_url = EXCLUDED.avatar_url, user_id = EXCLUDED.user_id, updated_at = NOW()
RETURNING id, user_id, provider, provider_user_id, email, username, display_name, avatar_url, created_at, updated_at
"#,
)
.bind(user_id)
.bind(provider)
.bind(provider_user_id)
.bind(email)
.bind(username)
.bind(display_name)
.bind(avatar_url)
.fetch_one(pool)
.await
}
pub async fn create_user(
pool: &sqlx::PgPool,
username: &str,
display_name: Option<&str>,
email: Option<&str>,
avatar_url: Option<&str>,
provider: &str,
provider_user_id: &str,
) -> Result<User, sqlx::Error> {
let rec = sqlx::query_as::<_, User>(
let user = sqlx::query_as::<_, User>(
r#"
INSERT INTO users (provider, provider_user_id, username, display_name, email, avatar_url)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (provider, provider_user_id)
DO UPDATE SET
username = EXCLUDED.username,
display_name = EXCLUDED.display_name,
email = EXCLUDED.email,
avatar_url = EXCLUDED.avatar_url,
updated_at = NOW()
RETURNING id, provider, provider_user_id, username, display_name, email, avatar_url, created_at, updated_at
INSERT INTO users (email)
VALUES ($1)
RETURNING id, email, created_at, updated_at
"#,
)
.bind(provider)
.bind(provider_user_id)
.bind(username)
.bind(display_name)
.bind(email)
.bind(avatar_url)
.fetch_one(pool)
.await?;
Ok(rec)
// Create oauth link
let _ = link_oauth_account(
pool,
user.id,
provider,
provider_user_id,
email,
Some(username),
display_name,
avatar_url,
)
.await?;
Ok(user)
}
pub async fn get_oauth_account_count_for_user(pool: &sqlx::PgPool, user_id: i64) -> Result<i64, sqlx::Error> {
let rec: (i64,) = sqlx::query_as(
r#"
SELECT COUNT(*)::BIGINT AS count
FROM oauth_accounts
WHERE user_id = $1
"#,
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(rec.0)
}
pub async fn get_user_by_provider_id(
@@ -56,9 +122,10 @@ pub async fn get_user_by_provider_id(
) -> Result<Option<User>, sqlx::Error> {
let rec = sqlx::query_as::<_, User>(
r#"
SELECT id, provider, provider_user_id, username, display_name, email, avatar_url, created_at, updated_at
FROM users
WHERE provider = $1 AND provider_user_id = $2
SELECT u.id, u.email, u.created_at, u.updated_at
FROM users u
JOIN oauth_accounts oa ON oa.user_id = u.id
WHERE oa.provider = $1 AND oa.provider_user_id = $2
"#,
)
.bind(provider)
@@ -67,3 +134,28 @@ pub async fn get_user_by_provider_id(
.await?;
Ok(rec)
}
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct ProviderPublic {
pub provider: String,
pub provider_user_id: String,
pub email: Option<String>,
pub username: Option<String>,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
}
pub async fn list_user_providers(pool: &sqlx::PgPool, user_id: i64) -> Result<Vec<ProviderPublic>, sqlx::Error> {
let recs = sqlx::query_as::<_, ProviderPublic>(
r#"
SELECT provider, provider_user_id, email, username, display_name, avatar_url
FROM oauth_accounts
WHERE user_id = $1
ORDER BY provider
"#,
)
.bind(user_id)
.fetch_all(pool)
.await?;
Ok(recs)
}

View File

@@ -38,6 +38,7 @@ async fn main() {
let shutdown_timeout = std::time::Duration::from_secs(config.shutdown_timeout_seconds as u64);
let auth = AuthRegistry::new(&config).expect("auth initializer");
let db = data::pool::create_pool(&config.database_url, 10).await;
// Run database migrations at startup
if let Err(e) = sqlx::migrate!("./migrations").run(&db).await {
panic!("failed to run database migrations: {}", e);

View File

@@ -18,16 +18,33 @@ pub struct AuthQuery {
pub error_description: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
pub struct AuthorizeQuery {
pub link: Option<bool>,
}
#[instrument(skip_all, fields(provider = %provider))]
pub async fn oauth_authorize_handler(
State(app_state): State<AppState>,
Path(provider): Path<String>,
Query(aq): Query<AuthorizeQuery>,
cookie: CookieManager,
) -> axum::response::Response {
let Some(prov) = app_state.auth.get(&provider) else {
warn!(%provider, "Unknown OAuth provider");
return ErrorResponse::bad_request("invalid_provider", Some(provider)).into_response();
};
trace!("Starting OAuth authorization");
// Persist link intent using a short-lived cookie; callbacks won't carry our query params.
if aq.link == Some(true) {
cookie.add(
axum_cookie::cookie::Cookie::builder("link", "1")
.http_only(true)
.same_site(axum_cookie::prelude::SameSite::Lax)
.path("/")
.build(),
);
}
let resp = prov.authorize().await;
trace!("Redirecting to provider authorization page");
resp
@@ -62,23 +79,125 @@ pub async fn oauth_callback_handler(
return e.into_response();
}
};
// Persist or update in database
match user_repo::upsert_user(
// Linking or sign-in flow. Determine link intent from cookie (set at authorize time)
let link_cookie = cookie.get("link").map(|c| c.value().to_string());
if link_cookie.is_some() {
cookie.remove("link");
}
let email = user.email.as_deref();
let _db_user = if link_cookie.as_deref() == Some("1") {
// Must be logged in already to link
let Some(session_token) = session::get_session_token(&cookie) else {
return ErrorResponse::bad_request("invalid_request", Some("must be signed in to link provider".into()))
.into_response();
};
let Some(claims) = session::decode_jwt(&session_token, &app_state.jwt_decoding_key) else {
return ErrorResponse::bad_request("invalid_request", Some("invalid session token".into())).into_response();
};
// Resolve current user from session
let (cur_prov, cur_id) = claims.sub.split_once(':').unwrap_or(("", ""));
let current_user = match user_repo::get_user_by_provider_id(&app_state.db, cur_prov, cur_id).await {
Ok(Some(u)) => u,
Ok(None) => {
return ErrorResponse::bad_request("invalid_request", Some("current session user not found".into()))
.into_response();
}
Err(_) => {
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None).into_response();
}
};
// Link provider to current user
if let Err(e) = user_repo::link_oauth_account(
&app_state.db,
current_user.id,
&provider,
&user.id,
&user.username,
email,
Some(&user.username),
user.name.as_deref(),
user.email.as_deref(),
user.avatar_url.as_deref(),
)
.await
{
Ok(_db_user) => {}
warn!(error = %e, %provider, "Failed to link OAuth account");
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None).into_response();
}
current_user
} else {
// Normal sign-in: do NOT auto-link by email (security). If email exists, require linking flow.
if let Some(e) = email {
if let Ok(Some(existing)) = user_repo::get_user_by_email(&app_state.db, e).await {
// Only block if the user already has at least one linked provider.
// NOTE: We do not check whether providers are currently active. If a user has exactly one provider and it is inactive,
// this may lock them out until the provider is reactivated or a manual admin link is performed.
match user_repo::get_oauth_account_count_for_user(&app_state.db, existing.id).await {
Ok(count) if count > 0 => {
return ErrorResponse::bad_request(
"account_exists",
Some(format!(
"An account already exists for {}. Sign in with your existing provider, then visit /auth/{}?link=true to add this provider.",
e, provider
)),
)
.into_response();
}
Ok(_) => {
// No providers linked yet: safe to associate this provider
if let Err(e) = user_repo::link_oauth_account(
&app_state.db,
existing.id,
&provider,
&user.id,
email,
Some(&user.username),
user.name.as_deref(),
user.avatar_url.as_deref(),
)
.await
{
warn!(error = %e, %provider, "Failed to link OAuth account to existing user with no providers");
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None)
.into_response();
}
existing
}
Err(e) => {
warn!(error = %e, provider = %provider, "Failed to upsert user in database");
warn!(error = %e, "Failed to count oauth accounts for user");
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None)
.into_response();
}
}
} else {
// Create new user with email
match user_repo::create_user(
&app_state.db,
&user.username,
user.name.as_deref(),
email,
user.avatar_url.as_deref(),
&provider,
&user.id,
)
.await
{
Ok(u) => u,
Err(e) => {
warn!(error = %e, %provider, "Failed to create user");
return ErrorResponse::with_status(StatusCode::INTERNAL_SERVER_ERROR, "database_error", None)
.into_response();
}
}
}
} else {
// No email available: disallow sign-in for safety
return ErrorResponse::bad_request(
"invalid_request",
Some("account has no email; sign in with a different provider".into()),
)
.into_response();
}
};
let session_token = session::create_jwt_for_user(&provider, &user, &app_state.jwt_encoding_key);
session::set_session_cookie(&cookie, &session_token);
info!(%provider, "Signed in successfully");
@@ -104,7 +223,38 @@ pub async fn profile_handler(State(app_state): State<AppState>, cookie: CookieMa
}
};
match user_repo::get_user_by_provider_id(&app_state.db, prov, prov_user_id).await {
Ok(Some(db_user)) => axum::Json(db_user).into_response(),
Ok(Some(db_user)) => {
// Include linked providers in the profile payload
match user_repo::list_user_providers(&app_state.db, db_user.id).await {
Ok(providers) => {
#[derive(Serialize)]
struct ProfilePayload<T> {
id: i64,
email: Option<String>,
providers: Vec<T>,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
let body = ProfilePayload {
id: db_user.id,
email: db_user.email.clone(),
providers,
created_at: db_user.created_at,
updated_at: db_user.updated_at,
};
axum::Json(body).into_response()
}
Err(e) => {
warn!(error = %e, "Failed to list user providers");
ErrorResponse::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
"database_error",
Some("could not fetch providers".into()),
)
.into_response()
}
}
}
Ok(None) => {
debug!("User not found for session");
ErrorResponse::unauthorized("session not found").into_response()