feat: add confidence-based RMP matching with manual review workflow

Replace simple auto-matching with scored candidate generation that
considers department overlap, name uniqueness, and rating volume.
Candidates above 0.85 auto-accept; others require admin approval.
This commit is contained in:
2026-01-30 01:31:11 -06:00
parent 39ba131322
commit 203c337cf0
19 changed files with 2428 additions and 175 deletions
+59 -33
View File
@@ -392,11 +392,11 @@ pub async fn batch_upsert_courses(courses: &[Course], db_pool: &PgPool) -> Resul
insert_audits(&audits, &mut tx).await?;
insert_metrics(&metrics, &mut tx).await?;
// Step 5: Upsert instructors (deduplicated across batch)
upsert_instructors(courses, &mut tx).await?;
// Step 5: Upsert instructors (returns email -> id map)
let email_to_id = upsert_instructors(courses, &mut tx).await?;
// Step 6: Link courses to instructors via junction table
upsert_course_instructors(courses, &course_ids, &mut tx).await?;
upsert_course_instructors(courses, &course_ids, &email_to_id, &mut tx).await?;
tx.commit().await?;
@@ -596,62 +596,85 @@ async fn upsert_courses(courses: &[Course], conn: &mut PgConnection) -> Result<V
Ok(rows)
}
/// Deduplicate and upsert all instructors from the batch.
async fn upsert_instructors(courses: &[Course], conn: &mut PgConnection) -> Result<()> {
/// Deduplicate and upsert all instructors from the batch by email.
/// Returns a map of lowercased_email -> instructor id for junction linking.
async fn upsert_instructors(
courses: &[Course],
conn: &mut PgConnection,
) -> Result<HashMap<String, i32>> {
let mut seen = HashSet::new();
let mut banner_ids = Vec::new();
let mut display_names = Vec::new();
let mut emails: Vec<Option<&str>> = Vec::new();
let mut display_names: Vec<&str> = Vec::new();
let mut emails_lower: Vec<String> = Vec::new();
let mut skipped_no_email = 0u32;
for course in courses {
for faculty in &course.faculty {
if seen.insert(faculty.banner_id.as_str()) {
banner_ids.push(faculty.banner_id.as_str());
display_names.push(faculty.display_name.as_str());
emails.push(faculty.email_address.as_deref());
if let Some(email) = &faculty.email_address {
let email_lower = email.to_lowercase();
if seen.insert(email_lower.clone()) {
display_names.push(faculty.display_name.as_str());
emails_lower.push(email_lower);
}
} else {
skipped_no_email += 1;
}
}
}
if banner_ids.is_empty() {
return Ok(());
if skipped_no_email > 0 {
tracing::warn!(
count = skipped_no_email,
"Skipped instructors with no email address"
);
}
sqlx::query(
if display_names.is_empty() {
return Ok(HashMap::new());
}
let email_refs: Vec<&str> = emails_lower.iter().map(|s| s.as_str()).collect();
let rows: Vec<(i32, String)> = sqlx::query_as(
r#"
INSERT INTO instructors (banner_id, display_name, email)
SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[])
ON CONFLICT (banner_id)
DO UPDATE SET
display_name = EXCLUDED.display_name,
email = COALESCE(EXCLUDED.email, instructors.email)
INSERT INTO instructors (display_name, email)
SELECT * FROM UNNEST($1::text[], $2::text[])
ON CONFLICT (email)
DO UPDATE SET display_name = EXCLUDED.display_name
RETURNING id, email
"#,
)
.bind(&banner_ids)
.bind(&display_names)
.bind(&emails)
.execute(&mut *conn)
.bind(&email_refs)
.fetch_all(&mut *conn)
.await
.map_err(|e| anyhow::anyhow!("Failed to batch upsert instructors: {}", e))?;
Ok(())
Ok(rows.into_iter().map(|(id, email)| (email, id)).collect())
}
/// Link courses to their instructors via the junction table.
async fn upsert_course_instructors(
courses: &[Course],
course_ids: &[i32],
email_to_id: &HashMap<String, i32>,
conn: &mut PgConnection,
) -> Result<()> {
let mut cids = Vec::new();
let mut iids = Vec::new();
let mut instructor_ids: Vec<i32> = Vec::new();
let mut banner_ids: Vec<&str> = Vec::new();
let mut primaries = Vec::new();
for (course, &course_id) in courses.iter().zip(course_ids) {
for faculty in &course.faculty {
cids.push(course_id);
iids.push(faculty.banner_id.as_str());
primaries.push(faculty.primary_indicator);
if let Some(email) = &faculty.email_address {
let email_lower = email.to_lowercase();
if let Some(&instructor_id) = email_to_id.get(&email_lower) {
cids.push(course_id);
instructor_ids.push(instructor_id);
banner_ids.push(faculty.banner_id.as_str());
primaries.push(faculty.primary_indicator);
}
}
}
}
@@ -668,14 +691,17 @@ async fn upsert_course_instructors(
sqlx::query(
r#"
INSERT INTO course_instructors (course_id, instructor_id, is_primary)
SELECT * FROM UNNEST($1::int4[], $2::text[], $3::bool[])
INSERT INTO course_instructors (course_id, instructor_id, banner_id, is_primary)
SELECT * FROM UNNEST($1::int4[], $2::int4[], $3::text[], $4::bool[])
ON CONFLICT (course_id, instructor_id)
DO UPDATE SET is_primary = EXCLUDED.is_primary
DO UPDATE SET
banner_id = EXCLUDED.banner_id,
is_primary = EXCLUDED.is_primary
"#,
)
.bind(&cids)
.bind(&iids)
.bind(&instructor_ids)
.bind(&banner_ids)
.bind(&primaries)
.execute(&mut *conn)
.await