mirror of
https://github.com/Xevion/xevion.dev.git
synced 2026-01-31 04:26:43 -06:00
feat: implement per-project GitHub sync scheduler with dynamic intervals
Replaces fixed-interval polling with a priority queue scheduler that adjusts check frequency based on activity recency (15min for active projects, up to 24hr for stale ones). Includes exponential backoff on errors and staggered initial checks to prevent API rate limit issues.
This commit is contained in:
+2
-2
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"db_name": "PostgreSQL",
|
"db_name": "PostgreSQL",
|
||||||
"query": "\n SELECT\n id,\n slug,\n name,\n short_description,\n description,\n status as \"status: ProjectStatus\",\n github_repo,\n demo_url,\n last_github_activity,\n created_at\n FROM projects\n WHERE github_repo IS NOT NULL\n ORDER BY updated_at DESC\n ",
|
"query": "\n SELECT\n id,\n slug,\n name,\n short_description,\n description,\n status as \"status: ProjectStatus\",\n github_repo,\n demo_url,\n last_github_activity,\n created_at\n FROM projects\n WHERE github_repo IS NOT NULL\n ORDER BY last_github_activity DESC NULLS LAST\n ",
|
||||||
"describe": {
|
"describe": {
|
||||||
"columns": [
|
"columns": [
|
||||||
{
|
{
|
||||||
@@ -82,5 +82,5 @@
|
|||||||
false
|
false
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"hash": "e35380efe500836a7a7bbcfcd54023b64bb20bbf58ef18acc37046b34996cefd"
|
"hash": "a0ef356b59f222775d062f6ae929f1566c0a50501c4b5e894caa3d3eb5b739f0"
|
||||||
}
|
}
|
||||||
Generated
+1
@@ -116,6 +116,7 @@ dependencies = [
|
|||||||
"mime_guess",
|
"mime_guess",
|
||||||
"moka",
|
"moka",
|
||||||
"nu-ansi-term",
|
"nu-ansi-term",
|
||||||
|
"parking_lot",
|
||||||
"rand 0.9.2",
|
"rand 0.9.2",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ include_dir = "0.7.4"
|
|||||||
mime_guess = "2.0.5"
|
mime_guess = "2.0.5"
|
||||||
moka = { version = "0.12.12", features = ["future"] }
|
moka = { version = "0.12.12", features = ["future"] }
|
||||||
nu-ansi-term = "0.50.3"
|
nu-ansi-term = "0.50.3"
|
||||||
|
parking_lot = "0.12.5"
|
||||||
rand = "0.9.2"
|
rand = "0.9.2"
|
||||||
reqwest = { version = "0.13.1", default-features = false, features = ["rustls", "charset", "json", "stream"] }
|
reqwest = { version = "0.13.1", default-features = false, features = ["rustls", "charset", "json", "stream"] }
|
||||||
serde = { version = "1.0.228", features = ["derive"] }
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
|
|||||||
+4
-52
@@ -152,62 +152,14 @@ pub async fn run(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Spawn GitHub activity sync background task (if GITHUB_TOKEN is set)
|
// Spawn GitHub activity sync scheduler (if GITHUB_TOKEN is set)
|
||||||
|
// Uses per-project dynamic intervals based on activity recency
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
async move {
|
async move {
|
||||||
// Check if GitHub client is available (GITHUB_TOKEN set)
|
// Brief delay to let server finish initializing
|
||||||
if github::GitHubClient::get().await.is_none() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let interval_secs: u64 = std::env::var("GITHUB_SYNC_INTERVAL_SEC")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.unwrap_or(900); // Default: 15 minutes
|
|
||||||
|
|
||||||
// Brief delay to let server finish initializing, then run initial sync
|
|
||||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
github::run_scheduler(pool).await;
|
||||||
tracing::info!(
|
|
||||||
interval_sec = interval_secs,
|
|
||||||
"Running initial GitHub activity sync"
|
|
||||||
);
|
|
||||||
|
|
||||||
match github::sync_github_activity(&pool).await {
|
|
||||||
Ok(stats) => {
|
|
||||||
tracing::info!(
|
|
||||||
synced = stats.synced,
|
|
||||||
skipped = stats.skipped,
|
|
||||||
errors = stats.errors,
|
|
||||||
"Initial GitHub activity sync completed"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "Initial GitHub sync failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Regular interval sync
|
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
|
|
||||||
interval.tick().await; // Skip the first tick (already ran initial sync)
|
|
||||||
|
|
||||||
loop {
|
|
||||||
interval.tick().await;
|
|
||||||
match github::sync_github_activity(&pool).await {
|
|
||||||
Ok(stats) => {
|
|
||||||
tracing::info!(
|
|
||||||
synced = stats.synced,
|
|
||||||
skipped = stats.skipped,
|
|
||||||
errors = stats.errors,
|
|
||||||
"GitHub activity sync completed"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "GitHub sync failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
+5
-2
@@ -452,7 +452,10 @@ pub async fn get_admin_stats(pool: &PgPool) -> Result<AdminStats, sqlx::Error> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get all projects that have a github_repo set (for GitHub sync)
|
/// Get all projects that have a github_repo set (for GitHub sync).
|
||||||
|
///
|
||||||
|
/// Orders by most recent activity first (NULLS LAST) so that projects with
|
||||||
|
/// the shortest check intervals are processed first by the scheduler.
|
||||||
pub async fn get_projects_with_github_repo(pool: &PgPool) -> Result<Vec<DbProject>, sqlx::Error> {
|
pub async fn get_projects_with_github_repo(pool: &PgPool) -> Result<Vec<DbProject>, sqlx::Error> {
|
||||||
query_as!(
|
query_as!(
|
||||||
DbProject,
|
DbProject,
|
||||||
@@ -470,7 +473,7 @@ pub async fn get_projects_with_github_repo(pool: &PgPool) -> Result<Vec<DbProjec
|
|||||||
created_at
|
created_at
|
||||||
FROM projects
|
FROM projects
|
||||||
WHERE github_repo IS NOT NULL
|
WHERE github_repo IS NOT NULL
|
||||||
ORDER BY updated_at DESC
|
ORDER BY last_github_activity DESC NULLS LAST
|
||||||
"#
|
"#
|
||||||
)
|
)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
|
|||||||
+446
-76
@@ -1,4 +1,9 @@
|
|||||||
//! GitHub API client for syncing repository activity.
|
//! GitHub API client and activity sync scheduler.
|
||||||
|
//!
|
||||||
|
//! Implements a per-project scheduler that dynamically adjusts check intervals
|
||||||
|
//! based on activity recency. Projects with recent activity are checked more
|
||||||
|
//! frequently (down to 15 minutes), while stale projects are checked less often
|
||||||
|
//! (up to 24 hours).
|
||||||
//!
|
//!
|
||||||
//! Fetches the latest activity from GitHub for projects that have `github_repo` set.
|
//! Fetches the latest activity from GitHub for projects that have `github_repo` set.
|
||||||
//! Only considers:
|
//! Only considers:
|
||||||
@@ -6,16 +11,75 @@
|
|||||||
//! - Main branch activity: Commits/pushes to the default branch only
|
//! - Main branch activity: Commits/pushes to the default branch only
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
|
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
|
use std::cmp::{Ordering, Reverse};
|
||||||
|
use std::collections::BinaryHeap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tokio::sync::OnceCell;
|
use tokio::sync::OnceCell;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::db::projects::DbProject;
|
||||||
|
|
||||||
static GITHUB_CLIENT: OnceCell<Option<Arc<GitHubClient>>> = OnceCell::const_new();
|
static GITHUB_CLIENT: OnceCell<Option<Arc<GitHubClient>>> = OnceCell::const_new();
|
||||||
|
|
||||||
/// Statistics from a sync run
|
// Interval bounds (configurable via environment variables)
|
||||||
|
fn min_interval() -> Duration {
|
||||||
|
let secs: u64 = std::env::var("GITHUB_SYNC_MIN_INTERVAL_SEC")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(15 * 60); // 15 minutes default
|
||||||
|
Duration::from_secs(secs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_interval() -> Duration {
|
||||||
|
let secs: u64 = std::env::var("GITHUB_SYNC_MAX_INTERVAL_SEC")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(24 * 60 * 60); // 24 hours default
|
||||||
|
Duration::from_secs(secs)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Days of inactivity after which the maximum interval is used
|
||||||
|
const DAYS_TO_MAX: f64 = 90.0;
|
||||||
|
|
||||||
|
/// How often the scheduler checks for due items (hardcoded)
|
||||||
|
pub const SCHEDULER_TICK_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
|
/// Calculate the check interval based on how recently the project had activity.
|
||||||
|
///
|
||||||
|
/// Uses a logarithmic curve that starts at MIN_INTERVAL for today's activity
|
||||||
|
/// and approaches MAX_INTERVAL as activity ages toward DAYS_TO_MAX.
|
||||||
|
///
|
||||||
|
/// Examples (with default 15min/24hr bounds):
|
||||||
|
/// - 0 days (today): 15 min
|
||||||
|
/// - 1 day: ~24 min
|
||||||
|
/// - 7 days: ~49 min
|
||||||
|
/// - 30 days: ~1.7 hr
|
||||||
|
/// - 90+ days: 24 hr
|
||||||
|
pub fn calculate_check_interval(last_activity: Option<OffsetDateTime>) -> Duration {
|
||||||
|
let min = min_interval();
|
||||||
|
let max = max_interval();
|
||||||
|
|
||||||
|
let days_since = last_activity
|
||||||
|
.map(|t| (OffsetDateTime::now_utc() - t).whole_days().max(0) as f64)
|
||||||
|
.unwrap_or(DAYS_TO_MAX); // Default to max interval if no activity recorded
|
||||||
|
|
||||||
|
// Logarithmic scaling: ln(1+days) / ln(1+90) gives 0..1 range
|
||||||
|
let scale = (1.0 + days_since).ln() / (1.0 + DAYS_TO_MAX).ln();
|
||||||
|
let scale = scale.clamp(0.0, 1.0);
|
||||||
|
|
||||||
|
let interval_secs = min.as_secs_f64() + scale * (max.as_secs_f64() - min.as_secs_f64());
|
||||||
|
|
||||||
|
Duration::from_secs(interval_secs as u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Statistics from scheduler operations
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct SyncStats {
|
pub struct SyncStats {
|
||||||
pub synced: u32,
|
pub synced: u32,
|
||||||
@@ -23,6 +87,267 @@ pub struct SyncStats {
|
|||||||
pub errors: u32,
|
pub errors: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A project scheduled for GitHub activity checking
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ScheduledProject {
|
||||||
|
/// When to next check this project
|
||||||
|
pub next_check: Instant,
|
||||||
|
/// Project database ID
|
||||||
|
pub project_id: Uuid,
|
||||||
|
/// GitHub repo in "owner/repo" format
|
||||||
|
pub github_repo: String,
|
||||||
|
/// Last known activity timestamp
|
||||||
|
pub last_activity: Option<OffsetDateTime>,
|
||||||
|
/// Consecutive error count (for exponential backoff)
|
||||||
|
pub error_count: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScheduledProject {
|
||||||
|
fn new(project: &DbProject, next_check: Instant) -> Option<Self> {
|
||||||
|
Some(Self {
|
||||||
|
next_check,
|
||||||
|
project_id: project.id,
|
||||||
|
github_repo: project.github_repo.clone()?,
|
||||||
|
last_activity: project.last_github_activity,
|
||||||
|
error_count: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a scheduled project for immediate checking
|
||||||
|
pub fn immediate(project_id: Uuid, github_repo: String) -> Self {
|
||||||
|
Self {
|
||||||
|
next_check: Instant::now(),
|
||||||
|
project_id,
|
||||||
|
github_repo,
|
||||||
|
last_activity: None,
|
||||||
|
error_count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ordering for BinaryHeap (we use Reverse<> for min-heap behavior)
|
||||||
|
impl Eq for ScheduledProject {}
|
||||||
|
|
||||||
|
impl PartialEq for ScheduledProject {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.project_id == other.project_id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for ScheduledProject {
|
||||||
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
|
// Order by next_check time (earliest first when wrapped in Reverse)
|
||||||
|
self.next_check.cmp(&other.next_check)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialOrd for ScheduledProject {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||||
|
Some(self.cmp(other))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GitHub activity sync scheduler with priority queue
|
||||||
|
pub struct GitHubScheduler {
|
||||||
|
/// Min-heap of scheduled projects (earliest check first)
|
||||||
|
queue: Mutex<BinaryHeap<Reverse<ScheduledProject>>>,
|
||||||
|
/// Running statistics
|
||||||
|
pub total_synced: AtomicU32,
|
||||||
|
pub total_skipped: AtomicU32,
|
||||||
|
pub total_errors: AtomicU32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GitHubScheduler {
|
||||||
|
/// Create a new scheduler with projects from the database.
|
||||||
|
///
|
||||||
|
/// Projects are sorted by activity recency and their initial checks are
|
||||||
|
/// staggered across their calculated intervals to prevent clumping.
|
||||||
|
pub fn new(mut projects: Vec<DbProject>) -> Self {
|
||||||
|
// Sort by activity recency (most recent first = shortest intervals first)
|
||||||
|
projects.sort_by(|a, b| {
|
||||||
|
let a_time = a.last_github_activity.unwrap_or(a.created_at);
|
||||||
|
let b_time = b.last_github_activity.unwrap_or(b.created_at);
|
||||||
|
b_time.cmp(&a_time) // Descending (most recent first)
|
||||||
|
});
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let total = projects.len().max(1) as f64;
|
||||||
|
|
||||||
|
let scheduled: Vec<_> = projects
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter_map(|(i, project)| {
|
||||||
|
let interval = calculate_check_interval(project.last_github_activity);
|
||||||
|
|
||||||
|
// Stagger initial checks: position i of n gets (i/n * interval) offset
|
||||||
|
// This spreads checks evenly across each project's interval
|
||||||
|
let offset_secs = (i as f64 / total) * interval.as_secs_f64();
|
||||||
|
let next_check = now + Duration::from_secs_f64(offset_secs);
|
||||||
|
|
||||||
|
ScheduledProject::new(&project, next_check)
|
||||||
|
})
|
||||||
|
.map(Reverse)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let queue = BinaryHeap::from(scheduled);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
queue: Mutex::new(queue),
|
||||||
|
total_synced: AtomicU32::new(0),
|
||||||
|
total_skipped: AtomicU32::new(0),
|
||||||
|
total_errors: AtomicU32::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of projects in the scheduler
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.queue.lock().len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if the scheduler has no projects
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.queue.lock().is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pop the next due project, if any.
|
||||||
|
///
|
||||||
|
/// Returns None if the queue is empty or the next project isn't due yet.
|
||||||
|
pub fn pop_if_due(&self) -> Option<ScheduledProject> {
|
||||||
|
let mut queue = self.queue.lock();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
// Peek to check if the next item is due
|
||||||
|
if queue.peek().is_some_and(|p| p.0.next_check <= now) {
|
||||||
|
queue.pop().map(|r| r.0)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reschedule a project after a successful sync.
|
||||||
|
pub fn reschedule(&self, mut project: ScheduledProject, new_activity: Option<OffsetDateTime>) {
|
||||||
|
project.last_activity = new_activity.or(project.last_activity);
|
||||||
|
project.error_count = 0; // Reset error count on success
|
||||||
|
|
||||||
|
let interval = calculate_check_interval(project.last_activity);
|
||||||
|
project.next_check = Instant::now() + interval;
|
||||||
|
|
||||||
|
self.queue.lock().push(Reverse(project));
|
||||||
|
self.total_synced.fetch_add(1, AtomicOrdering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reschedule a project after an error with exponential backoff.
|
||||||
|
///
|
||||||
|
/// Each consecutive error doubles the wait time, capped at max_interval.
|
||||||
|
pub fn reschedule_with_error(&self, mut project: ScheduledProject) {
|
||||||
|
project.error_count += 1;
|
||||||
|
|
||||||
|
// Exponential backoff: base_interval * 2^error_count, capped at max
|
||||||
|
let base = calculate_check_interval(project.last_activity);
|
||||||
|
let multiplier = 2u32.saturating_pow(project.error_count.min(6)); // Cap at 2^6 = 64x
|
||||||
|
let backoff = base.saturating_mul(multiplier);
|
||||||
|
let interval = backoff.min(max_interval());
|
||||||
|
|
||||||
|
project.next_check = Instant::now() + interval;
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
repo = %project.github_repo,
|
||||||
|
error_count = project.error_count,
|
||||||
|
backoff_mins = interval.as_secs() / 60,
|
||||||
|
"Rescheduled with error backoff"
|
||||||
|
);
|
||||||
|
|
||||||
|
self.queue.lock().push(Reverse(project));
|
||||||
|
self.total_errors.fetch_add(1, AtomicOrdering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a project as skipped (no update needed).
|
||||||
|
pub fn reschedule_skipped(&self, mut project: ScheduledProject) {
|
||||||
|
project.error_count = 0; // Reset on successful check (even if skipped)
|
||||||
|
|
||||||
|
let interval = calculate_check_interval(project.last_activity);
|
||||||
|
project.next_check = Instant::now() + interval;
|
||||||
|
|
||||||
|
self.queue.lock().push(Reverse(project));
|
||||||
|
self.total_skipped.fetch_add(1, AtomicOrdering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Back off all projects by a fixed duration (e.g., after rate limiting).
|
||||||
|
pub fn backoff_all(&self, backoff: Duration) {
|
||||||
|
let mut queue = self.queue.lock();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
// Drain and re-add with updated times
|
||||||
|
let projects: Vec<_> = queue.drain().collect();
|
||||||
|
for mut p in projects {
|
||||||
|
// Only push back projects that aren't already further out
|
||||||
|
if p.0.next_check < now + backoff {
|
||||||
|
p.0.next_check = now + backoff;
|
||||||
|
}
|
||||||
|
queue.push(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a new project to the scheduler for immediate checking.
|
||||||
|
pub fn add_project(&self, project_id: Uuid, github_repo: String) {
|
||||||
|
let project = ScheduledProject::immediate(project_id, github_repo);
|
||||||
|
self.queue.lock().push(Reverse(project));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a project from the scheduler.
|
||||||
|
pub fn remove_project(&self, project_id: Uuid) {
|
||||||
|
let mut queue = self.queue.lock();
|
||||||
|
let projects: Vec<_> = queue
|
||||||
|
.drain()
|
||||||
|
.filter(|p| p.0.project_id != project_id)
|
||||||
|
.collect();
|
||||||
|
queue.extend(projects);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get current statistics snapshot
|
||||||
|
pub fn stats(&self) -> SyncStats {
|
||||||
|
SyncStats {
|
||||||
|
synced: self.total_synced.load(AtomicOrdering::Relaxed),
|
||||||
|
skipped: self.total_skipped.load(AtomicOrdering::Relaxed),
|
||||||
|
errors: self.total_errors.load(AtomicOrdering::Relaxed),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get info about the next scheduled check (for logging/debugging)
|
||||||
|
pub fn next_check_info(&self) -> Option<(String, Duration)> {
|
||||||
|
let queue = self.queue.lock();
|
||||||
|
queue.peek().map(|p| {
|
||||||
|
let until = p.0.next_check.saturating_duration_since(Instant::now());
|
||||||
|
(p.0.github_repo.clone(), until)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Global scheduler instance (initialized during server startup)
|
||||||
|
static GITHUB_SCHEDULER: OnceCell<Arc<GitHubScheduler>> = OnceCell::const_new();
|
||||||
|
|
||||||
|
/// Get the global scheduler instance
|
||||||
|
pub fn get_scheduler() -> Option<Arc<GitHubScheduler>> {
|
||||||
|
GITHUB_SCHEDULER.get().cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize the global scheduler with projects from the database
|
||||||
|
pub async fn init_scheduler(pool: &PgPool) -> Option<Arc<GitHubScheduler>> {
|
||||||
|
// Only init if client is available
|
||||||
|
GitHubClient::get().await.as_ref()?;
|
||||||
|
|
||||||
|
let projects = crate::db::projects::get_projects_with_github_repo(pool)
|
||||||
|
.await
|
||||||
|
.ok()?;
|
||||||
|
|
||||||
|
let scheduler = Arc::new(GitHubScheduler::new(projects));
|
||||||
|
|
||||||
|
// Store globally
|
||||||
|
let _ = GITHUB_SCHEDULER.set(scheduler.clone());
|
||||||
|
|
||||||
|
Some(scheduler)
|
||||||
|
}
|
||||||
|
|
||||||
/// GitHub API client with caching for default branches
|
/// GitHub API client with caching for default branches
|
||||||
pub struct GitHubClient {
|
pub struct GitHubClient {
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
@@ -314,103 +639,148 @@ fn parse_github_repo(github_repo: &str) -> Option<(&str, &str)> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync GitHub activity for all projects that have github_repo set.
|
/// Sync a single project's GitHub activity.
|
||||||
/// Updates the last_github_activity field with the most recent activity timestamp.
|
///
|
||||||
pub async fn sync_github_activity(pool: &PgPool) -> Result<SyncStats, Box<dyn std::error::Error>> {
|
/// Returns the new activity timestamp if found, or None if no activity.
|
||||||
let client = GitHubClient::get()
|
pub async fn sync_single_project(
|
||||||
.await
|
client: &GitHubClient,
|
||||||
.ok_or("GitHub client not initialized")?;
|
pool: &PgPool,
|
||||||
|
project: &ScheduledProject,
|
||||||
|
) -> Result<Option<OffsetDateTime>, GitHubError> {
|
||||||
|
let (owner, repo) = parse_github_repo(&project.github_repo)
|
||||||
|
.ok_or_else(|| GitHubError::Api(400, "Invalid github_repo format".to_string()))?;
|
||||||
|
|
||||||
let projects = crate::db::projects::get_projects_with_github_repo(pool).await?;
|
let activity_time = client.get_latest_activity(owner, repo).await?;
|
||||||
let mut stats = SyncStats::default();
|
|
||||||
|
|
||||||
tracing::debug!(count = projects.len(), "Starting GitHub activity sync");
|
if let Some(new_time) = activity_time {
|
||||||
|
|
||||||
for project in projects {
|
|
||||||
let github_repo = match &project.github_repo {
|
|
||||||
Some(repo) => repo,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let (owner, repo) = match parse_github_repo(github_repo) {
|
|
||||||
Some(parsed) => parsed,
|
|
||||||
None => {
|
|
||||||
tracing::warn!(
|
|
||||||
project_id = %project.id,
|
|
||||||
github_repo = %github_repo,
|
|
||||||
"Invalid github_repo format, expected 'owner/repo'"
|
|
||||||
);
|
|
||||||
stats.skipped += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match client.get_latest_activity(owner, repo).await {
|
|
||||||
Ok(Some(activity_time)) => {
|
|
||||||
// Only update if newer than current value
|
// Only update if newer than current value
|
||||||
let should_update = project
|
let should_update = project
|
||||||
.last_github_activity
|
.last_activity
|
||||||
.is_none_or(|current| activity_time > current);
|
.is_none_or(|current| new_time > current);
|
||||||
|
|
||||||
if should_update {
|
if should_update {
|
||||||
if let Err(e) = crate::db::projects::update_last_github_activity(
|
crate::db::projects::update_last_github_activity(pool, project.project_id, new_time)
|
||||||
pool,
|
|
||||||
project.id,
|
|
||||||
activity_time,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
{
|
.map_err(|e| GitHubError::Api(500, e.to_string()))?;
|
||||||
tracing::error!(
|
|
||||||
project_id = %project.id,
|
|
||||||
error = %e,
|
|
||||||
"Failed to update last_github_activity"
|
|
||||||
);
|
|
||||||
stats.errors += 1;
|
|
||||||
} else {
|
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
project_id = %project.id,
|
repo = %project.github_repo,
|
||||||
github_repo = %github_repo,
|
activity_time = %new_time,
|
||||||
activity_time = %activity_time,
|
|
||||||
"Updated last_github_activity"
|
"Updated last_github_activity"
|
||||||
);
|
);
|
||||||
stats.synced += 1;
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(activity_time)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run the GitHub activity sync scheduler loop.
|
||||||
|
///
|
||||||
|
/// This is the main entry point for the background sync task.
|
||||||
|
/// It checks for due projects every 30 seconds and syncs them individually.
|
||||||
|
pub async fn run_scheduler(pool: PgPool) {
|
||||||
|
let client = match GitHubClient::get().await {
|
||||||
|
Some(c) => c,
|
||||||
|
None => return, // GitHub sync disabled
|
||||||
|
};
|
||||||
|
|
||||||
|
let scheduler = match init_scheduler(&pool).await {
|
||||||
|
Some(s) => s,
|
||||||
|
None => {
|
||||||
|
tracing::warn!("Failed to initialize GitHub scheduler");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if scheduler.is_empty() {
|
||||||
|
tracing::info!("GitHub scheduler: no projects with github_repo configured");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log initial state
|
||||||
|
if let Some((next_repo, until)) = scheduler.next_check_info() {
|
||||||
|
tracing::info!(
|
||||||
|
projects = scheduler.len(),
|
||||||
|
next_repo = %next_repo,
|
||||||
|
next_check_secs = until.as_secs(),
|
||||||
|
"GitHub scheduler started with staggered checks"
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
stats.skipped += 1;
|
tracing::info!(projects = scheduler.len(), "GitHub scheduler started");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Ok(None) => {
|
// Check for due projects every 30 seconds
|
||||||
|
let mut tick = tokio::time::interval(SCHEDULER_TICK_INTERVAL);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tick.tick().await;
|
||||||
|
|
||||||
|
// Process all due projects
|
||||||
|
let mut processed = 0u32;
|
||||||
|
while let Some(project) = scheduler.pop_if_due() {
|
||||||
|
processed += 1;
|
||||||
|
|
||||||
|
match sync_single_project(&client, &pool, &project).await {
|
||||||
|
Ok(new_activity) => {
|
||||||
|
let interval = calculate_check_interval(new_activity);
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
project_id = %project.id,
|
repo = %project.github_repo,
|
||||||
github_repo = %github_repo,
|
next_check_mins = interval.as_secs() / 60,
|
||||||
"No activity found for repository"
|
"GitHub sync complete"
|
||||||
);
|
);
|
||||||
stats.skipped += 1;
|
|
||||||
|
// Check if activity changed
|
||||||
|
if new_activity != project.last_activity {
|
||||||
|
scheduler.reschedule(project, new_activity);
|
||||||
|
} else {
|
||||||
|
scheduler.reschedule_skipped(project);
|
||||||
}
|
}
|
||||||
Err(GitHubError::NotFound(repo)) => {
|
|
||||||
tracing::warn!(
|
|
||||||
project_id = %project.id,
|
|
||||||
github_repo = %repo,
|
|
||||||
"Repository not found or inaccessible, skipping"
|
|
||||||
);
|
|
||||||
stats.skipped += 1;
|
|
||||||
}
|
}
|
||||||
Err(GitHubError::RateLimited) => {
|
Err(GitHubError::RateLimited) => {
|
||||||
tracing::warn!("GitHub API rate limit hit, stopping sync early");
|
// Back off ALL projects by 5 minutes
|
||||||
stats.errors += 1;
|
scheduler.backoff_all(Duration::from_secs(5 * 60));
|
||||||
|
tracing::warn!(
|
||||||
|
processed,
|
||||||
|
"GitHub rate limit hit, backing off all projects 5 minutes"
|
||||||
|
);
|
||||||
|
// Re-add the current project too
|
||||||
|
scheduler.reschedule_with_error(project);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(GitHubError::NotFound(repo)) => {
|
||||||
tracing::error!(
|
// Repo doesn't exist or is private - use long interval
|
||||||
project_id = %project.id,
|
tracing::warn!(
|
||||||
github_repo = %github_repo,
|
repo = %repo,
|
||||||
error = %e,
|
"GitHub repo not found or inaccessible, using max interval"
|
||||||
"Failed to fetch GitHub activity"
|
|
||||||
);
|
);
|
||||||
stats.errors += 1;
|
scheduler.reschedule_skipped(project);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
repo = %project.github_repo,
|
||||||
|
error = %e,
|
||||||
|
"GitHub sync error, scheduling retry with backoff"
|
||||||
|
);
|
||||||
|
scheduler.reschedule_with_error(project);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(stats)
|
// Log periodic stats (every ~5 minutes worth of ticks = 10 ticks)
|
||||||
|
static TICK_COUNT: AtomicU32 = AtomicU32::new(0);
|
||||||
|
let ticks = TICK_COUNT.fetch_add(1, AtomicOrdering::Relaxed);
|
||||||
|
if ticks > 0 && ticks.is_multiple_of(10) {
|
||||||
|
let stats = scheduler.stats();
|
||||||
|
if let Some((next_repo, until)) = scheduler.next_check_info() {
|
||||||
|
tracing::info!(
|
||||||
|
synced = stats.synced,
|
||||||
|
skipped = stats.skipped,
|
||||||
|
errors = stats.errors,
|
||||||
|
next_repo = %next_repo,
|
||||||
|
next_check_secs = until.as_secs(),
|
||||||
|
"GitHub scheduler stats"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
|
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::{auth, db, handlers::AddProjectTagRequest, state::AppState};
|
use crate::{auth, db, github, handlers::AddProjectTagRequest, state::AppState};
|
||||||
|
|
||||||
/// List all projects - returns filtered data based on auth status
|
/// List all projects - returns filtered data based on auth status
|
||||||
pub async fn projects_handler(
|
pub async fn projects_handler(
|
||||||
@@ -231,6 +231,14 @@ pub async fn create_project_handler(
|
|||||||
|
|
||||||
tracing::info!(project_id = %project.id, project_name = %project.name, "Project created");
|
tracing::info!(project_id = %project.id, project_name = %project.name, "Project created");
|
||||||
|
|
||||||
|
// If project has a github_repo, add to scheduler for immediate checking
|
||||||
|
if let Some(ref repo) = project.github_repo
|
||||||
|
&& let Some(scheduler) = github::get_scheduler()
|
||||||
|
{
|
||||||
|
scheduler.add_project(project.id, repo.clone());
|
||||||
|
tracing::debug!(project_id = %project.id, repo = %repo, "Added project to GitHub scheduler");
|
||||||
|
}
|
||||||
|
|
||||||
// Invalidate cached pages that display projects
|
// Invalidate cached pages that display projects
|
||||||
state.isr_cache.invalidate("/").await;
|
state.isr_cache.invalidate("/").await;
|
||||||
|
|
||||||
@@ -399,6 +407,34 @@ pub async fn update_project_handler(
|
|||||||
|
|
||||||
tracing::info!(project_id = %project.id, project_name = %project.name, "Project updated");
|
tracing::info!(project_id = %project.id, project_name = %project.name, "Project updated");
|
||||||
|
|
||||||
|
// Update GitHub scheduler if github_repo changed
|
||||||
|
if let Some(scheduler) = github::get_scheduler() {
|
||||||
|
let old_repo = existing_project.github_repo.as_ref();
|
||||||
|
let new_repo = project.github_repo.as_ref();
|
||||||
|
|
||||||
|
match (old_repo, new_repo) {
|
||||||
|
(None, Some(repo)) => {
|
||||||
|
// Added github_repo - schedule for immediate check
|
||||||
|
scheduler.add_project(project.id, repo.clone());
|
||||||
|
tracing::debug!(project_id = %project.id, repo = %repo, "Added project to GitHub scheduler");
|
||||||
|
}
|
||||||
|
(Some(_), None) => {
|
||||||
|
// Removed github_repo - remove from scheduler
|
||||||
|
scheduler.remove_project(project.id);
|
||||||
|
tracing::debug!(project_id = %project.id, "Removed project from GitHub scheduler");
|
||||||
|
}
|
||||||
|
(Some(old), Some(new)) if old != new => {
|
||||||
|
// Changed github_repo - update scheduler
|
||||||
|
scheduler.remove_project(project.id);
|
||||||
|
scheduler.add_project(project.id, new.clone());
|
||||||
|
tracing::debug!(project_id = %project.id, old_repo = %old, new_repo = %new, "Updated project in GitHub scheduler");
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// No change to github_repo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Invalidate cached pages that display projects
|
// Invalidate cached pages that display projects
|
||||||
state.isr_cache.invalidate("/").await;
|
state.isr_cache.invalidate("/").await;
|
||||||
|
|
||||||
@@ -448,6 +484,13 @@ pub async fn delete_project_handler(
|
|||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
tracing::info!(project_id = %project.id, project_name = %project.name, "Project deleted");
|
tracing::info!(project_id = %project.id, project_name = %project.name, "Project deleted");
|
||||||
|
|
||||||
|
// Remove from GitHub scheduler if it had a github_repo
|
||||||
|
if project.github_repo.is_some()
|
||||||
|
&& let Some(scheduler) = github::get_scheduler()
|
||||||
|
{
|
||||||
|
scheduler.remove_project(project.id);
|
||||||
|
}
|
||||||
|
|
||||||
// Invalidate cached pages that display projects
|
// Invalidate cached pages that display projects
|
||||||
state.isr_cache.invalidate("/").await;
|
state.isr_cache.invalidate("/").await;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user