feat: much better JSON logging, project-wide logging improvements, better use of debug/trace levels, field attributes

This commit is contained in:
2025-09-12 22:01:14 -05:00
parent 00cb209052
commit 14b02df8f4
19 changed files with 348 additions and 78 deletions
+8 -5
View File
@@ -35,14 +35,14 @@ impl ScraperService {
/// Starts the scheduler and a pool of workers.
pub fn start(&mut self) {
info!("ScraperService starting...");
info!("ScraperService starting");
let scheduler = Scheduler::new(self.db_pool.clone(), self.banner_api.clone());
let scheduler_handle = tokio::spawn(async move {
scheduler.run().await;
});
self.scheduler_handle = Some(scheduler_handle);
info!("Scheduler task spawned.");
info!("Scheduler task spawned");
let worker_count = 4; // This could be configurable
for i in 0..worker_count {
@@ -52,19 +52,22 @@ impl ScraperService {
});
self.worker_handles.push(worker_handle);
}
info!("Spawned {} worker tasks.", self.worker_handles.len());
info!(
worker_count = self.worker_handles.len(),
"Spawned worker tasks"
);
}
/// Signals all child tasks to gracefully shut down.
pub async fn shutdown(&mut self) {
info!("Shutting down scraper service...");
info!("Shutting down scraper service");
if let Some(handle) = self.scheduler_handle.take() {
handle.abort();
}
for handle in self.worker_handles.drain(..) {
handle.abort();
}
info!("Scraper service shutdown.");
info!("Scraper service shutdown");
}
}
+8 -9
View File
@@ -6,7 +6,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info};
use tracing::{error, info, debug, trace};
/// Periodically analyzes data and enqueues prioritized scrape jobs.
pub struct Scheduler {
@@ -24,12 +24,12 @@ impl Scheduler {
/// Runs the scheduler's main loop.
pub async fn run(&self) {
info!("Scheduler service started.");
info!("Scheduler service started");
let mut interval = time::interval(Duration::from_secs(60)); // Runs every minute
loop {
interval.tick().await;
info!("Scheduler waking up to analyze and schedule jobs...");
// Scheduler analyzing data...
if let Err(e) = self.schedule_jobs().await {
error!(error = ?e, "Failed to schedule jobs");
}
@@ -44,12 +44,10 @@ impl Scheduler {
// 3. If no job exists, create a new, low-priority job to be executed in the near future.
let term = Term::get_current().inner().to_string();
info!(
term = term,
"[Scheduler] Enqueuing baseline subject scrape jobs..."
);
debug!(term = term, "Enqueuing subject jobs");
let subjects = self.banner_api.get_subjects("", &term, 1, 500).await?;
debug!(subject_count = subjects.len(), "Retrieved subjects from API");
for subject in subjects {
let payload = json!({ "subject": subject.code });
@@ -63,6 +61,7 @@ impl Scheduler {
.await?;
if existing_job.is_some() {
trace!(subject = subject.code, "Job already exists, skipping");
continue;
}
@@ -76,10 +75,10 @@ impl Scheduler {
.execute(&self.db_pool)
.await?;
info!(subject = subject.code, "[Scheduler] Enqueued new job");
debug!(subject = subject.code, "New job enqueued for subject");
}
info!("[Scheduler] Job scheduling complete.");
debug!("Job scheduling complete");
Ok(())
}
}
+7 -7
View File
@@ -6,7 +6,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{error, info, warn};
use tracing::{debug, error, info, trace, warn};
/// A single worker instance.
///
@@ -34,7 +34,7 @@ impl Worker {
match self.fetch_and_lock_job().await {
Ok(Some(job)) => {
let job_id = job.id;
info!(worker_id = self.id, job_id = job.id, "Processing job");
debug!(worker_id = self.id, job_id = job.id, "Processing job");
if let Err(e) = self.process_job(job).await {
// Check if the error is due to an invalid session
if let Some(BannerApiError::InvalidSession(_)) =
@@ -58,7 +58,7 @@ impl Worker {
);
}
} else {
info!(worker_id = self.id, job_id, "Job processed successfully");
debug!(worker_id = self.id, job_id, "Job completed");
// If successful, delete the job.
if let Err(delete_err) = self.delete_job(job_id).await {
error!(
@@ -72,6 +72,7 @@ impl Worker {
}
Ok(None) => {
// No job found, wait for a bit before polling again.
trace!(worker_id = self.id, "No jobs available, waiting");
time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
@@ -127,7 +128,7 @@ impl Worker {
info!(
worker_id = self.id,
subject = subject_code,
"Processing subject job"
"Scraping subject"
);
let term = Term::get_current().inner().to_string();
@@ -143,7 +144,7 @@ impl Worker {
worker_id = self.id,
subject = subject_code,
count = courses_from_api.len(),
"Found courses to upsert"
"Found courses"
);
for course in courses_from_api {
self.upsert_course(&course).await?;
@@ -190,7 +191,6 @@ impl Worker {
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job deleted");
Ok(())
}
@@ -199,7 +199,7 @@ impl Worker {
.bind(job_id)
.execute(&self.db_pool)
.await?;
info!(worker_id = self.id, job_id, "Job unlocked after failure");
info!(worker_id = self.id, job_id, "Job unlocked for retry");
Ok(())
}
}