feat: schedule & query jobs efficiently in batches

This commit is contained in:
2025-09-12 23:41:27 -05:00
parent a6e7adcaef
commit 4ca55a1fd4

View File

@@ -6,7 +6,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info, debug, trace};
use tracing::{debug, error, info, trace};
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
@@ -40,42 +40,76 @@ impl Scheduler {
async fn schedule_jobs(&self) -> Result<()> {
// For now, we will implement a simple baseline scheduling strategy:
// 1. Get a list of all subjects from the Banner API.
// 2. For each subject, check if an active (not locked, not completed) job already exists.
// 3. If no job exists, create a new, low-priority job to be executed in the near future.
// 2. Query existing jobs for all subjects in a single query.
// 3. Create new jobs only for subjects that don't have existing jobs.
let term = Term::get_current().inner().to_string();
debug!(term = term, "Enqueuing subject jobs");
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
debug!(subject_count = subjects.len(), "Retrieved subjects from API");
debug!(
subject_count = subjects.len(),
"Retrieved subjects from API"
);
for subject in subjects {
let payload = json!({ "subject": subject.code });
// Create payloads for all subjects
let subject_payloads: Vec<_> = subjects
.iter()
.map(|subject| json!({ "subject": subject.code }))
.collect();
let existing_job: Option<(i32,)> = sqlx::query_as(
"SELECT id FROM scrape_jobs WHERE target_type = $1 AND target_payload = $2 AND locked_at IS NULL"
)
.bind(TargetType::Subject)
.bind(&payload)
.fetch_optional(&self.db_pool)
.await?;
// Query existing jobs for all subjects in a single query
let existing_jobs: Vec<(serde_json::Value,)> = sqlx::query_as(
"SELECT target_payload FROM scrape_jobs
WHERE target_type = $1 AND target_payload = ANY($2) AND locked_at IS NULL",
)
.bind(TargetType::Subject)
.bind(&subject_payloads)
.fetch_all(&self.db_pool)
.await?;
if existing_job.is_some() {
trace!(subject = subject.code, "Job already exists, skipping");
continue;
// Convert to a HashSet for efficient lookup
let existing_payloads: std::collections::HashSet<String> = existing_jobs
.into_iter()
.map(|(payload,)| payload.to_string())
.collect();
// Filter out subjects that already have jobs and prepare new jobs
let new_jobs: Vec<_> = subjects
.into_iter()
.filter_map(|subject| {
let payload = json!({ "subject": subject.code });
let payload_str = payload.to_string();
if existing_payloads.contains(&payload_str) {
trace!(subject = subject.code, "Job already exists, skipping");
None
} else {
Some((payload, subject.code))
}
})
.collect();
// Insert all new jobs in a single batch
if !new_jobs.is_empty() {
let now = chrono::Utc::now();
let mut tx = self.db_pool.begin().await?;
for (payload, subject_code) in new_jobs {
sqlx::query(
"INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)"
)
.bind(TargetType::Subject)
.bind(&payload)
.bind(ScrapePriority::Low)
.bind(now)
.execute(&mut *tx)
.await?;
debug!(subject = subject_code, "New job enqueued for subject");
}
sqlx::query(
"INSERT INTO scrape_jobs (target_type, target_payload, priority, execute_at) VALUES ($1, $2, $3, $4)"
)
.bind(TargetType::Subject)
.bind(&payload)
.bind(ScrapePriority::Low)
.bind(chrono::Utc::now())
.execute(&self.db_pool)
.await?;
debug!(subject = subject.code, "New job enqueued for subject");
tx.commit().await?;
}
debug!("Job scheduling complete");