diff --git a/.sqlx/query-e35380efe500836a7a7bbcfcd54023b64bb20bbf58ef18acc37046b34996cefd.json b/.sqlx/query-a0ef356b59f222775d062f6ae929f1566c0a50501c4b5e894caa3d3eb5b739f0.json similarity index 90% rename from .sqlx/query-e35380efe500836a7a7bbcfcd54023b64bb20bbf58ef18acc37046b34996cefd.json rename to .sqlx/query-a0ef356b59f222775d062f6ae929f1566c0a50501c4b5e894caa3d3eb5b739f0.json index 8fb6678..4397ac2 100644 --- a/.sqlx/query-e35380efe500836a7a7bbcfcd54023b64bb20bbf58ef18acc37046b34996cefd.json +++ b/.sqlx/query-a0ef356b59f222775d062f6ae929f1566c0a50501c4b5e894caa3d3eb5b739f0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n id,\n slug,\n name,\n short_description,\n description,\n status as \"status: ProjectStatus\",\n github_repo,\n demo_url,\n last_github_activity,\n created_at\n FROM projects\n WHERE github_repo IS NOT NULL\n ORDER BY updated_at DESC\n ", + "query": "\n SELECT\n id,\n slug,\n name,\n short_description,\n description,\n status as \"status: ProjectStatus\",\n github_repo,\n demo_url,\n last_github_activity,\n created_at\n FROM projects\n WHERE github_repo IS NOT NULL\n ORDER BY last_github_activity DESC NULLS LAST\n ", "describe": { "columns": [ { @@ -82,5 +82,5 @@ false ] }, - "hash": "e35380efe500836a7a7bbcfcd54023b64bb20bbf58ef18acc37046b34996cefd" + "hash": "a0ef356b59f222775d062f6ae929f1566c0a50501c4b5e894caa3d3eb5b739f0" } diff --git a/Cargo.lock b/Cargo.lock index 853b152..955b31a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,6 +116,7 @@ dependencies = [ "mime_guess", "moka", "nu-ansi-term", + "parking_lot", "rand 0.9.2", "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index eaa5f00..43ffa82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ include_dir = "0.7.4" mime_guess = "2.0.5" moka = { version = "0.12.12", features = ["future"] } nu-ansi-term = "0.50.3" +parking_lot = "0.12.5" rand = "0.9.2" reqwest = { version = "0.13.1", default-features = false, features = ["rustls", "charset", "json", "stream"] } serde = { version = "1.0.228", features = ["derive"] } diff --git a/src/cli/serve.rs b/src/cli/serve.rs index 2154129..9bed405 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -152,62 +152,14 @@ pub async fn run( } }); - // Spawn GitHub activity sync background task (if GITHUB_TOKEN is set) + // Spawn GitHub activity sync scheduler (if GITHUB_TOKEN is set) + // Uses per-project dynamic intervals based on activity recency tokio::spawn({ let pool = pool.clone(); async move { - // Check if GitHub client is available (GITHUB_TOKEN set) - if github::GitHubClient::get().await.is_none() { - return; - } - - let interval_secs: u64 = std::env::var("GITHUB_SYNC_INTERVAL_SEC") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(900); // Default: 15 minutes - - // Brief delay to let server finish initializing, then run initial sync + // Brief delay to let server finish initializing tokio::time::sleep(Duration::from_secs(2)).await; - - tracing::info!( - interval_sec = interval_secs, - "Running initial GitHub activity sync" - ); - - match github::sync_github_activity(&pool).await { - Ok(stats) => { - tracing::info!( - synced = stats.synced, - skipped = stats.skipped, - errors = stats.errors, - "Initial GitHub activity sync completed" - ); - } - Err(e) => { - tracing::error!(error = %e, "Initial GitHub sync failed"); - } - } - - // Regular interval sync - let mut interval = tokio::time::interval(Duration::from_secs(interval_secs)); - interval.tick().await; // Skip the first tick (already ran initial sync) - - loop { - interval.tick().await; - match github::sync_github_activity(&pool).await { - Ok(stats) => { - tracing::info!( - synced = stats.synced, - skipped = stats.skipped, - errors = stats.errors, - "GitHub activity sync completed" - ); - } - Err(e) => { - tracing::error!(error = %e, "GitHub sync failed"); - } - } - } + github::run_scheduler(pool).await; } }); diff --git a/src/db/projects.rs b/src/db/projects.rs index f393f99..355f242 100644 --- a/src/db/projects.rs +++ b/src/db/projects.rs @@ -452,7 +452,10 @@ pub async fn get_admin_stats(pool: &PgPool) -> Result { }) } -/// Get all projects that have a github_repo set (for GitHub sync) +/// Get all projects that have a github_repo set (for GitHub sync). +/// +/// Orders by most recent activity first (NULLS LAST) so that projects with +/// the shortest check intervals are processed first by the scheduler. pub async fn get_projects_with_github_repo(pool: &PgPool) -> Result, sqlx::Error> { query_as!( DbProject, @@ -470,7 +473,7 @@ pub async fn get_projects_with_github_repo(pool: &PgPool) -> Result>> = OnceCell::const_new(); -/// Statistics from a sync run +// Interval bounds (configurable via environment variables) +fn min_interval() -> Duration { + let secs: u64 = std::env::var("GITHUB_SYNC_MIN_INTERVAL_SEC") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(15 * 60); // 15 minutes default + Duration::from_secs(secs) +} + +fn max_interval() -> Duration { + let secs: u64 = std::env::var("GITHUB_SYNC_MAX_INTERVAL_SEC") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(24 * 60 * 60); // 24 hours default + Duration::from_secs(secs) +} + +/// Days of inactivity after which the maximum interval is used +const DAYS_TO_MAX: f64 = 90.0; + +/// How often the scheduler checks for due items (hardcoded) +pub const SCHEDULER_TICK_INTERVAL: Duration = Duration::from_secs(30); + +/// Calculate the check interval based on how recently the project had activity. +/// +/// Uses a logarithmic curve that starts at MIN_INTERVAL for today's activity +/// and approaches MAX_INTERVAL as activity ages toward DAYS_TO_MAX. +/// +/// Examples (with default 15min/24hr bounds): +/// - 0 days (today): 15 min +/// - 1 day: ~24 min +/// - 7 days: ~49 min +/// - 30 days: ~1.7 hr +/// - 90+ days: 24 hr +pub fn calculate_check_interval(last_activity: Option) -> Duration { + let min = min_interval(); + let max = max_interval(); + + let days_since = last_activity + .map(|t| (OffsetDateTime::now_utc() - t).whole_days().max(0) as f64) + .unwrap_or(DAYS_TO_MAX); // Default to max interval if no activity recorded + + // Logarithmic scaling: ln(1+days) / ln(1+90) gives 0..1 range + let scale = (1.0 + days_since).ln() / (1.0 + DAYS_TO_MAX).ln(); + let scale = scale.clamp(0.0, 1.0); + + let interval_secs = min.as_secs_f64() + scale * (max.as_secs_f64() - min.as_secs_f64()); + + Duration::from_secs(interval_secs as u64) +} + +/// Statistics from scheduler operations #[derive(Debug, Default)] pub struct SyncStats { pub synced: u32, @@ -23,6 +87,267 @@ pub struct SyncStats { pub errors: u32, } +/// A project scheduled for GitHub activity checking +#[derive(Debug, Clone)] +pub struct ScheduledProject { + /// When to next check this project + pub next_check: Instant, + /// Project database ID + pub project_id: Uuid, + /// GitHub repo in "owner/repo" format + pub github_repo: String, + /// Last known activity timestamp + pub last_activity: Option, + /// Consecutive error count (for exponential backoff) + pub error_count: u32, +} + +impl ScheduledProject { + fn new(project: &DbProject, next_check: Instant) -> Option { + Some(Self { + next_check, + project_id: project.id, + github_repo: project.github_repo.clone()?, + last_activity: project.last_github_activity, + error_count: 0, + }) + } + + /// Create a scheduled project for immediate checking + pub fn immediate(project_id: Uuid, github_repo: String) -> Self { + Self { + next_check: Instant::now(), + project_id, + github_repo, + last_activity: None, + error_count: 0, + } + } +} + +// Ordering for BinaryHeap (we use Reverse<> for min-heap behavior) +impl Eq for ScheduledProject {} + +impl PartialEq for ScheduledProject { + fn eq(&self, other: &Self) -> bool { + self.project_id == other.project_id + } +} + +impl Ord for ScheduledProject { + fn cmp(&self, other: &Self) -> Ordering { + // Order by next_check time (earliest first when wrapped in Reverse) + self.next_check.cmp(&other.next_check) + } +} + +impl PartialOrd for ScheduledProject { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// GitHub activity sync scheduler with priority queue +pub struct GitHubScheduler { + /// Min-heap of scheduled projects (earliest check first) + queue: Mutex>>, + /// Running statistics + pub total_synced: AtomicU32, + pub total_skipped: AtomicU32, + pub total_errors: AtomicU32, +} + +impl GitHubScheduler { + /// Create a new scheduler with projects from the database. + /// + /// Projects are sorted by activity recency and their initial checks are + /// staggered across their calculated intervals to prevent clumping. + pub fn new(mut projects: Vec) -> Self { + // Sort by activity recency (most recent first = shortest intervals first) + projects.sort_by(|a, b| { + let a_time = a.last_github_activity.unwrap_or(a.created_at); + let b_time = b.last_github_activity.unwrap_or(b.created_at); + b_time.cmp(&a_time) // Descending (most recent first) + }); + + let now = Instant::now(); + let total = projects.len().max(1) as f64; + + let scheduled: Vec<_> = projects + .into_iter() + .enumerate() + .filter_map(|(i, project)| { + let interval = calculate_check_interval(project.last_github_activity); + + // Stagger initial checks: position i of n gets (i/n * interval) offset + // This spreads checks evenly across each project's interval + let offset_secs = (i as f64 / total) * interval.as_secs_f64(); + let next_check = now + Duration::from_secs_f64(offset_secs); + + ScheduledProject::new(&project, next_check) + }) + .map(Reverse) + .collect(); + + let queue = BinaryHeap::from(scheduled); + + Self { + queue: Mutex::new(queue), + total_synced: AtomicU32::new(0), + total_skipped: AtomicU32::new(0), + total_errors: AtomicU32::new(0), + } + } + + /// Number of projects in the scheduler + pub fn len(&self) -> usize { + self.queue.lock().len() + } + + /// Check if the scheduler has no projects + pub fn is_empty(&self) -> bool { + self.queue.lock().is_empty() + } + + /// Pop the next due project, if any. + /// + /// Returns None if the queue is empty or the next project isn't due yet. + pub fn pop_if_due(&self) -> Option { + let mut queue = self.queue.lock(); + let now = Instant::now(); + + // Peek to check if the next item is due + if queue.peek().is_some_and(|p| p.0.next_check <= now) { + queue.pop().map(|r| r.0) + } else { + None + } + } + + /// Reschedule a project after a successful sync. + pub fn reschedule(&self, mut project: ScheduledProject, new_activity: Option) { + project.last_activity = new_activity.or(project.last_activity); + project.error_count = 0; // Reset error count on success + + let interval = calculate_check_interval(project.last_activity); + project.next_check = Instant::now() + interval; + + self.queue.lock().push(Reverse(project)); + self.total_synced.fetch_add(1, AtomicOrdering::Relaxed); + } + + /// Reschedule a project after an error with exponential backoff. + /// + /// Each consecutive error doubles the wait time, capped at max_interval. + pub fn reschedule_with_error(&self, mut project: ScheduledProject) { + project.error_count += 1; + + // Exponential backoff: base_interval * 2^error_count, capped at max + let base = calculate_check_interval(project.last_activity); + let multiplier = 2u32.saturating_pow(project.error_count.min(6)); // Cap at 2^6 = 64x + let backoff = base.saturating_mul(multiplier); + let interval = backoff.min(max_interval()); + + project.next_check = Instant::now() + interval; + + tracing::debug!( + repo = %project.github_repo, + error_count = project.error_count, + backoff_mins = interval.as_secs() / 60, + "Rescheduled with error backoff" + ); + + self.queue.lock().push(Reverse(project)); + self.total_errors.fetch_add(1, AtomicOrdering::Relaxed); + } + + /// Mark a project as skipped (no update needed). + pub fn reschedule_skipped(&self, mut project: ScheduledProject) { + project.error_count = 0; // Reset on successful check (even if skipped) + + let interval = calculate_check_interval(project.last_activity); + project.next_check = Instant::now() + interval; + + self.queue.lock().push(Reverse(project)); + self.total_skipped.fetch_add(1, AtomicOrdering::Relaxed); + } + + /// Back off all projects by a fixed duration (e.g., after rate limiting). + pub fn backoff_all(&self, backoff: Duration) { + let mut queue = self.queue.lock(); + let now = Instant::now(); + + // Drain and re-add with updated times + let projects: Vec<_> = queue.drain().collect(); + for mut p in projects { + // Only push back projects that aren't already further out + if p.0.next_check < now + backoff { + p.0.next_check = now + backoff; + } + queue.push(p); + } + } + + /// Add a new project to the scheduler for immediate checking. + pub fn add_project(&self, project_id: Uuid, github_repo: String) { + let project = ScheduledProject::immediate(project_id, github_repo); + self.queue.lock().push(Reverse(project)); + } + + /// Remove a project from the scheduler. + pub fn remove_project(&self, project_id: Uuid) { + let mut queue = self.queue.lock(); + let projects: Vec<_> = queue + .drain() + .filter(|p| p.0.project_id != project_id) + .collect(); + queue.extend(projects); + } + + /// Get current statistics snapshot + pub fn stats(&self) -> SyncStats { + SyncStats { + synced: self.total_synced.load(AtomicOrdering::Relaxed), + skipped: self.total_skipped.load(AtomicOrdering::Relaxed), + errors: self.total_errors.load(AtomicOrdering::Relaxed), + } + } + + /// Get info about the next scheduled check (for logging/debugging) + pub fn next_check_info(&self) -> Option<(String, Duration)> { + let queue = self.queue.lock(); + queue.peek().map(|p| { + let until = p.0.next_check.saturating_duration_since(Instant::now()); + (p.0.github_repo.clone(), until) + }) + } +} + +/// Global scheduler instance (initialized during server startup) +static GITHUB_SCHEDULER: OnceCell> = OnceCell::const_new(); + +/// Get the global scheduler instance +pub fn get_scheduler() -> Option> { + GITHUB_SCHEDULER.get().cloned() +} + +/// Initialize the global scheduler with projects from the database +pub async fn init_scheduler(pool: &PgPool) -> Option> { + // Only init if client is available + GitHubClient::get().await.as_ref()?; + + let projects = crate::db::projects::get_projects_with_github_repo(pool) + .await + .ok()?; + + let scheduler = Arc::new(GitHubScheduler::new(projects)); + + // Store globally + let _ = GITHUB_SCHEDULER.set(scheduler.clone()); + + Some(scheduler) +} + /// GitHub API client with caching for default branches pub struct GitHubClient { client: reqwest::Client, @@ -314,103 +639,148 @@ fn parse_github_repo(github_repo: &str) -> Option<(&str, &str)> { } } -/// Sync GitHub activity for all projects that have github_repo set. -/// Updates the last_github_activity field with the most recent activity timestamp. -pub async fn sync_github_activity(pool: &PgPool) -> Result> { - let client = GitHubClient::get() - .await - .ok_or("GitHub client not initialized")?; +/// Sync a single project's GitHub activity. +/// +/// Returns the new activity timestamp if found, or None if no activity. +pub async fn sync_single_project( + client: &GitHubClient, + pool: &PgPool, + project: &ScheduledProject, +) -> Result, GitHubError> { + let (owner, repo) = parse_github_repo(&project.github_repo) + .ok_or_else(|| GitHubError::Api(400, "Invalid github_repo format".to_string()))?; - let projects = crate::db::projects::get_projects_with_github_repo(pool).await?; - let mut stats = SyncStats::default(); + let activity_time = client.get_latest_activity(owner, repo).await?; - tracing::debug!(count = projects.len(), "Starting GitHub activity sync"); + if let Some(new_time) = activity_time { + // Only update if newer than current value + let should_update = project + .last_activity + .is_none_or(|current| new_time > current); - for project in projects { - let github_repo = match &project.github_repo { - Some(repo) => repo, - None => continue, - }; + if should_update { + crate::db::projects::update_last_github_activity(pool, project.project_id, new_time) + .await + .map_err(|e| GitHubError::Api(500, e.to_string()))?; - let (owner, repo) = match parse_github_repo(github_repo) { - Some(parsed) => parsed, - None => { - tracing::warn!( - project_id = %project.id, - github_repo = %github_repo, - "Invalid github_repo format, expected 'owner/repo'" - ); - stats.skipped += 1; - continue; - } - }; - - match client.get_latest_activity(owner, repo).await { - Ok(Some(activity_time)) => { - // Only update if newer than current value - let should_update = project - .last_github_activity - .is_none_or(|current| activity_time > current); - - if should_update { - if let Err(e) = crate::db::projects::update_last_github_activity( - pool, - project.id, - activity_time, - ) - .await - { - tracing::error!( - project_id = %project.id, - error = %e, - "Failed to update last_github_activity" - ); - stats.errors += 1; - } else { - tracing::debug!( - project_id = %project.id, - github_repo = %github_repo, - activity_time = %activity_time, - "Updated last_github_activity" - ); - stats.synced += 1; - } - } else { - stats.skipped += 1; - } - } - Ok(None) => { - tracing::debug!( - project_id = %project.id, - github_repo = %github_repo, - "No activity found for repository" - ); - stats.skipped += 1; - } - Err(GitHubError::NotFound(repo)) => { - tracing::warn!( - project_id = %project.id, - github_repo = %repo, - "Repository not found or inaccessible, skipping" - ); - stats.skipped += 1; - } - Err(GitHubError::RateLimited) => { - tracing::warn!("GitHub API rate limit hit, stopping sync early"); - stats.errors += 1; - break; - } - Err(e) => { - tracing::error!( - project_id = %project.id, - github_repo = %github_repo, - error = %e, - "Failed to fetch GitHub activity" - ); - stats.errors += 1; - } + tracing::debug!( + repo = %project.github_repo, + activity_time = %new_time, + "Updated last_github_activity" + ); } } - Ok(stats) + Ok(activity_time) +} + +/// Run the GitHub activity sync scheduler loop. +/// +/// This is the main entry point for the background sync task. +/// It checks for due projects every 30 seconds and syncs them individually. +pub async fn run_scheduler(pool: PgPool) { + let client = match GitHubClient::get().await { + Some(c) => c, + None => return, // GitHub sync disabled + }; + + let scheduler = match init_scheduler(&pool).await { + Some(s) => s, + None => { + tracing::warn!("Failed to initialize GitHub scheduler"); + return; + } + }; + + if scheduler.is_empty() { + tracing::info!("GitHub scheduler: no projects with github_repo configured"); + return; + } + + // Log initial state + if let Some((next_repo, until)) = scheduler.next_check_info() { + tracing::info!( + projects = scheduler.len(), + next_repo = %next_repo, + next_check_secs = until.as_secs(), + "GitHub scheduler started with staggered checks" + ); + } else { + tracing::info!(projects = scheduler.len(), "GitHub scheduler started"); + } + + // Check for due projects every 30 seconds + let mut tick = tokio::time::interval(SCHEDULER_TICK_INTERVAL); + + loop { + tick.tick().await; + + // Process all due projects + let mut processed = 0u32; + while let Some(project) = scheduler.pop_if_due() { + processed += 1; + + match sync_single_project(&client, &pool, &project).await { + Ok(new_activity) => { + let interval = calculate_check_interval(new_activity); + tracing::debug!( + repo = %project.github_repo, + next_check_mins = interval.as_secs() / 60, + "GitHub sync complete" + ); + + // Check if activity changed + if new_activity != project.last_activity { + scheduler.reschedule(project, new_activity); + } else { + scheduler.reschedule_skipped(project); + } + } + Err(GitHubError::RateLimited) => { + // Back off ALL projects by 5 minutes + scheduler.backoff_all(Duration::from_secs(5 * 60)); + tracing::warn!( + processed, + "GitHub rate limit hit, backing off all projects 5 minutes" + ); + // Re-add the current project too + scheduler.reschedule_with_error(project); + break; + } + Err(GitHubError::NotFound(repo)) => { + // Repo doesn't exist or is private - use long interval + tracing::warn!( + repo = %repo, + "GitHub repo not found or inaccessible, using max interval" + ); + scheduler.reschedule_skipped(project); + } + Err(e) => { + tracing::warn!( + repo = %project.github_repo, + error = %e, + "GitHub sync error, scheduling retry with backoff" + ); + scheduler.reschedule_with_error(project); + } + } + } + + // Log periodic stats (every ~5 minutes worth of ticks = 10 ticks) + static TICK_COUNT: AtomicU32 = AtomicU32::new(0); + let ticks = TICK_COUNT.fetch_add(1, AtomicOrdering::Relaxed); + if ticks > 0 && ticks.is_multiple_of(10) { + let stats = scheduler.stats(); + if let Some((next_repo, until)) = scheduler.next_check_info() { + tracing::info!( + synced = stats.synced, + skipped = stats.skipped, + errors = stats.errors, + next_repo = %next_repo, + next_check_secs = until.as_secs(), + "GitHub scheduler stats" + ); + } + } + } } diff --git a/src/handlers/projects.rs b/src/handlers/projects.rs index 3fd7267..e0ebd80 100644 --- a/src/handlers/projects.rs +++ b/src/handlers/projects.rs @@ -1,7 +1,7 @@ use axum::{Json, extract::State, http::StatusCode, response::IntoResponse}; use std::sync::Arc; -use crate::{auth, db, handlers::AddProjectTagRequest, state::AppState}; +use crate::{auth, db, github, handlers::AddProjectTagRequest, state::AppState}; /// List all projects - returns filtered data based on auth status pub async fn projects_handler( @@ -231,6 +231,14 @@ pub async fn create_project_handler( tracing::info!(project_id = %project.id, project_name = %project.name, "Project created"); + // If project has a github_repo, add to scheduler for immediate checking + if let Some(ref repo) = project.github_repo + && let Some(scheduler) = github::get_scheduler() + { + scheduler.add_project(project.id, repo.clone()); + tracing::debug!(project_id = %project.id, repo = %repo, "Added project to GitHub scheduler"); + } + // Invalidate cached pages that display projects state.isr_cache.invalidate("/").await; @@ -399,6 +407,34 @@ pub async fn update_project_handler( tracing::info!(project_id = %project.id, project_name = %project.name, "Project updated"); + // Update GitHub scheduler if github_repo changed + if let Some(scheduler) = github::get_scheduler() { + let old_repo = existing_project.github_repo.as_ref(); + let new_repo = project.github_repo.as_ref(); + + match (old_repo, new_repo) { + (None, Some(repo)) => { + // Added github_repo - schedule for immediate check + scheduler.add_project(project.id, repo.clone()); + tracing::debug!(project_id = %project.id, repo = %repo, "Added project to GitHub scheduler"); + } + (Some(_), None) => { + // Removed github_repo - remove from scheduler + scheduler.remove_project(project.id); + tracing::debug!(project_id = %project.id, "Removed project from GitHub scheduler"); + } + (Some(old), Some(new)) if old != new => { + // Changed github_repo - update scheduler + scheduler.remove_project(project.id); + scheduler.add_project(project.id, new.clone()); + tracing::debug!(project_id = %project.id, old_repo = %old, new_repo = %new, "Updated project in GitHub scheduler"); + } + _ => { + // No change to github_repo + } + } + } + // Invalidate cached pages that display projects state.isr_cache.invalidate("/").await; @@ -448,6 +484,13 @@ pub async fn delete_project_handler( Ok(()) => { tracing::info!(project_id = %project.id, project_name = %project.name, "Project deleted"); + // Remove from GitHub scheduler if it had a github_repo + if project.github_repo.is_some() + && let Some(scheduler) = github::get_scheduler() + { + scheduler.remove_project(project.id); + } + // Invalidate cached pages that display projects state.isr_cache.invalidate("/").await;