From 308726c2739b76b150b9ccfc1c11aa94f3ebb8a7 Mon Sep 17 00:00:00 2001 From: Carbonhell Date: Thu, 4 Dec 2025 23:11:34 +0100 Subject: [PATCH 1/2] tweak(queue): deprioritize older releases when new ones are added refactor(queue): break up get_new_crates() in smaller methods to facilitate testing --- ...ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json | 17 + ...ead19f890f2baf677654f404f51ba48e57013.json | 22 + ...a9dfb847ca73426f9e5c20bf907d9236d10b9.json | 22 + ...7cbc597f224b532a5312983962aef8b187c2a.json | 20 + src/build_queue.rs | 415 +++++++++++++----- 5 files changed, 396 insertions(+), 100 deletions(-) create mode 100644 .sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json create mode 100644 .sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json create mode 100644 .sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json create mode 100644 .sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json diff --git a/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json b/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json new file mode 100644 index 000000000..14efdbfd4 --- /dev/null +++ b/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET priority = GREATEST(priority, $1)\n WHERE\n name = $2\n AND version != $3\n AND attempt < $4\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Text", + "Text", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c" +} diff --git a/.sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json b/.sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json new file mode 100644 index 000000000..b31520820 --- /dev/null +++ b/.sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id\n FROM crates\n WHERE name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013" +} diff --git a/.sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json b/.sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json new file mode 100644 index 000000000..fd8f3842a --- /dev/null +++ b/.sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT yanked\n FROM releases\n WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "yanked", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + true + ] + }, + "hash": "8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9" +} diff --git a/.sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json b/.sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json new file mode 100644 index 000000000..4098d58a7 --- /dev/null +++ b/.sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id\n FROM releases", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a" +} diff --git a/src/build_queue.rs b/src/build_queue.rs index e1d5945ea..f7a403ff0 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -1,3 +1,4 @@ +use crate::db::AsyncPoolClient; use crate::{ BuildPackageSummary, Config, Context, Index, RustwideBuilder, cdn::{self, CdnMetrics}, @@ -14,6 +15,7 @@ use crate::{ }; use anyhow::Context as _; use chrono::NaiveDate; +use crates_index_diff::{Change, CrateVersion}; use fn_error_context::context; use futures_util::{StreamExt, stream::TryStreamExt}; use opentelemetry::metrics::Counter; @@ -275,6 +277,33 @@ impl AsyncBuildQueue { Ok(()) } + + /// Decreases the priority of all releases currently present in the queue not matching the version passed to *at least* new_priority. + pub(crate) async fn deprioritize_other_releases( + &self, + name: &str, + latest_version: &Version, + new_priority: i32, + ) -> Result<()> { + let mut conn = self.db.get_async().await?; + sqlx::query!( + "UPDATE queue + SET priority = GREATEST(priority, $1) + WHERE + name = $2 + AND version != $3 + AND attempt < $4 + ", + new_priority, + name, + latest_version as _, + self.max_attempts, + ) + .execute(&mut *conn) + .await?; + + Ok(()) + } } /// Locking functions. @@ -348,116 +377,137 @@ impl AsyncBuildQueue { debug!("queueing changes from {last_seen_reference} to {new_reference}"); for change in &changes { - if let Some((ref krate, ..)) = change.crate_deleted() { - match delete_crate(&mut conn, &self.storage, &self.config, krate) - .await - .with_context(|| format!("failed to delete crate {krate}")) - { - Ok(_) => info!( - "crate {} was deleted from the index and the database", - krate - ), - Err(err) => report_error(&err), - } - - self.queue_crate_invalidation(krate).await; - self.remove_crate_from_queue(krate).await?; - continue; + if self.process_change(index, &mut conn, change).await? { + crates_added += 1; } + } - if let Some(release) = change.version_deleted() { - let version: Version = release - .version - .parse() - .context("couldn't parse release version as semver")?; - - match delete_version( - &mut conn, - &self.storage, - &self.config, - &release.name, - &version, - ) - .await - .with_context(|| { - format!( - "failed to delete version {}-{}", - release.name, release.version - ) - }) { - Ok(_) => info!( - "release {}-{} was deleted from the index and the database", - release.name, release.version - ), - Err(err) => report_error(&err), - } + // set the reference in the database + // so this survives recreating the registry watcher + // server. + self.set_last_seen_reference(new_reference).await?; - self.queue_crate_invalidation(&release.name).await; - self.remove_version_from_queue(&release.name, &version) + Ok(crates_added) + } + + /// Process a crate change, returning whether the change was a crate addition or not. + async fn process_change( + &self, + index: &Index, + conn: &mut AsyncPoolClient, + change: &Change, + ) -> Result { + match change { + Change::Added(release) => { + self.process_version_added(conn, release, index.repository_url()) + .await? + } + Change::AddedAndYanked(release) => { + self.process_version_added(conn, release, index.repository_url()) .await?; - continue; + self.process_version_yank_status(conn, release).await?; } - - if let Some(release) = change.added() { - let priority = get_crate_priority(&mut conn, &release.name).await?; - - match self - .add_crate( - &release.name, - &release - .version - .parse() - .context("couldn't parse release version as semver")?, - priority, - index.repository_url(), - ) - .await - .with_context(|| { - format!( - "failed adding {}-{} into build queue", - release.name, release.version - ) - }) { - Ok(()) => { - debug!( - "{}-{} added into build queue", - release.name, release.version - ); - self.queue_metrics.queued_builds.add(1, &[]); - crates_added += 1; - } - Err(err) => report_error(&err), - } + Change::Unyanked(release) | Change::Yanked(release) => { + self.process_version_yank_status(conn, release).await? } - - let yanked = change.yanked(); - let unyanked = change.unyanked(); - if let Some(release) = yanked.or(unyanked) { - // FIXME: delay yanks of crates that have not yet finished building - // https://github.com/rust-lang/docs.rs/issues/1934 - if let Ok(release_version) = Version::parse(&release.version) - && let Err(err) = self - .set_yanked_inner( - &mut conn, - release.name.as_str(), - &release_version, - yanked.is_some(), - ) - .await - { - report_error(&err); - } - - self.queue_crate_invalidation(&release.name).await; + Change::CrateDeleted { name, .. } => { + self.process_crate_deleted(conn, name.as_str()).await? } + Change::VersionDeleted(release) => self.process_version_deleted(conn, release).await?, + }; + Ok(change.added().is_some()) + } + + /// Processes crate changes, whether they got yanked or unyanked. + async fn process_version_yank_status( + &self, + conn: &mut AsyncPoolClient, + release: &CrateVersion, + ) -> Result<()> { + // FIXME: delay yanks of crates that have not yet finished building + // https://github.com/rust-lang/docs.rs/issues/1934 + if let Ok(release_version) = Version::parse(&release.version) { + self.set_yanked_inner( + conn, + release.name.as_str(), + &release_version, + release.yanked, + ) + .await?; } - // set the reference in the database - // so this survives recreating the registry watcher - // server. - self.set_last_seen_reference(new_reference).await?; + self.queue_crate_invalidation(&release.name).await; + Ok(()) + } - Ok(crates_added) + async fn process_version_added( + &self, + conn: &mut AsyncPoolClient, + release: &CrateVersion, + registry: Option<&str>, + ) -> Result<()> { + let priority = get_crate_priority(conn, &release.name).await?; + let version = &release + .version + .parse() + .context("couldn't parse release version as semver")?; + self.add_crate(&release.name, version, priority, registry) + .await + .with_context(|| { + format!( + "failed adding {}-{} into build queue", + release.name, release.version + ) + })?; + debug!( + "{}-{} added into build queue", + release.name, release.version + ); + self.queue_metrics.queued_builds.add(1, &[]); + self.deprioritize_other_releases(&release.name, version, PRIORITY_MANUAL_FROM_CRATES_IO) + .await + .unwrap_or_else(|err| report_error(&err)); + Ok(()) + } + + async fn process_version_deleted( + &self, + conn: &mut AsyncPoolClient, + release: &CrateVersion, + ) -> Result<()> { + let version: Version = release + .version + .parse() + .context("couldn't parse release version as semver")?; + + delete_version(conn, &self.storage, &self.config, &release.name, &version) + .await + .with_context(|| { + format!( + "failed to delete version {}-{}", + release.name, release.version + ) + })?; + info!( + "release {}-{} was deleted from the index and the database", + release.name, release.version + ); + self.queue_crate_invalidation(&release.name).await; + self.remove_version_from_queue(&release.name, &version) + .await?; + Ok(()) + } + + async fn process_crate_deleted(&self, conn: &mut AsyncPoolClient, krate: &str) -> Result<()> { + delete_crate(conn, &self.storage, &self.config, krate) + .await + .with_context(|| format!("failed to delete crate {krate}"))?; + info!( + "crate {} was deleted from the index and the database", + krate + ); + self.queue_crate_invalidation(krate).await; + self.remove_crate_from_queue(krate).await } pub async fn set_yanked(&self, name: &str, version: &Version, yanked: bool) -> Result<()> { @@ -880,6 +930,171 @@ mod tests { use chrono::Utc; use std::time::Duration; + #[tokio::test(flavor = "multi_thread")] + async fn test_process_version_added() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + let krate = CrateVersion { + name: "krate".parse()?, + version: V1.to_string().parse()?, + ..Default::default() + }; + build_queue + .process_version_added(&mut conn, &krate, None) + .await?; + let queue = build_queue.queued_crates().await?; + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].priority, PRIORITY_DEFAULT); + + let krate = CrateVersion { + name: "krate".parse()?, + version: V2.to_string().parse()?, + ..Default::default() + }; + build_queue + .process_version_added(&mut conn, &krate, None) + .await?; + let queue = build_queue.queued_crates().await?; + assert_eq!(queue.len(), 2); + // The other queued version should be deprioritized + assert_eq!(queue[0].version, V2); + assert_eq!(queue[0].priority, PRIORITY_DEFAULT); + assert_eq!(queue[1].version, V1); + assert_eq!(queue[1].priority, PRIORITY_MANUAL_FROM_CRATES_IO); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_version_yank_status() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + // Given a release that is yanked + let id = env + .fake_release() + .await + .name("krate") + .version(V1) + .create() + .await?; + // Simulate a yank change + let krate = CrateVersion { + name: "krate".parse()?, + version: V1.to_string().parse()?, + yanked: true, + ..Default::default() + }; + build_queue + .process_version_yank_status(&mut conn, &krate) + .await?; + + // And verify it's actually marked as yanked + let row = sqlx::query!( + "SELECT yanked + FROM releases + WHERE id = $1", + id.0 + ) + .fetch_one(&mut *conn) + .await?; + assert_eq!(row.yanked, Some(true)); + + // Verify whether we can unyank it too + let krate = CrateVersion { + name: "krate".parse()?, + version: V1.to_string().parse()?, + yanked: false, + ..Default::default() + }; + build_queue + .process_version_yank_status(&mut conn, &krate) + .await?; + + let row = sqlx::query!( + "SELECT yanked + FROM releases + WHERE id = $1", + id.0 + ) + .fetch_one(&mut *conn) + .await?; + assert_eq!(row.yanked, Some(false)); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_crate_deleted() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + env.fake_release() + .await + .name("krate") + .version(V1) + .create() + .await?; + build_queue + .process_crate_deleted(&mut conn, "krate") + .await?; + + let row = sqlx::query!( + "SELECT id + FROM crates + WHERE name = $1", + "krate" + ) + .fetch_optional(&mut *conn) + .await?; + assert!(row.is_none()); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_version_deleted() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + let rid_1 = env + .fake_release() + .await + .name("krate") + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name("krate") + .version(V2) + .create() + .await?; + + let krate = CrateVersion { + name: "krate".parse()?, + version: V2.to_string().parse()?, + ..Default::default() + }; + build_queue + .process_version_deleted(&mut conn, &krate) + .await?; + + let row = sqlx::query!( + "SELECT id + FROM releases", + ) + .fetch_all(&mut *conn) + .await?; + assert_eq!(row.len(), 1); + assert_eq!(row[0].id, rid_1.0); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_rebuild_when_old() -> Result<()> { let env = TestEnvironment::with_config( From 7607323a0b50a1aee24b0685fa06c6e0fcaf57c6 Mon Sep 17 00:00:00 2001 From: Carbonhell Date: Sat, 6 Dec 2025 00:29:16 +0100 Subject: [PATCH 2/2] fix(queue): handle errors while processing changes gracefully chore(tracing): update tracing macro usages to use structured fields instead of string interpolation --- src/build_queue.rs | 119 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 96 insertions(+), 23 deletions(-) diff --git a/src/build_queue.rs b/src/build_queue.rs index f7a403ff0..cb7274ffc 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -372,15 +372,12 @@ impl AsyncBuildQueue { let (changes, new_reference) = index.peek_changes_ordered().await?; let mut conn = self.db.get_async().await?; - let mut crates_added = 0; - debug!("queueing changes from {last_seen_reference} to {new_reference}"); + debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes"); - for change in &changes { - if self.process_change(index, &mut conn, change).await? { - crates_added += 1; - } - } + let crates_added = self + .process_changes(&mut conn, &changes, index.repository_url()) + .await; // set the reference in the database // so this survives recreating the registry watcher @@ -390,21 +387,38 @@ impl AsyncBuildQueue { Ok(crates_added) } + async fn process_changes( + &self, + conn: &mut AsyncPoolClient, + changes: &Vec, + registry: Option<&str>, + ) -> usize { + let mut crates_added = 0; + + for change in changes { + match self.process_change(conn, change, registry).await { + Ok(added) => { + if added { + crates_added += 1; + } + } + Err(err) => report_error(&err), + } + } + crates_added + } + /// Process a crate change, returning whether the change was a crate addition or not. async fn process_change( &self, - index: &Index, conn: &mut AsyncPoolClient, change: &Change, + registry: Option<&str>, ) -> Result { match change { - Change::Added(release) => { - self.process_version_added(conn, release, index.repository_url()) - .await? - } + Change::Added(release) => self.process_version_added(conn, release, registry).await?, Change::AddedAndYanked(release) => { - self.process_version_added(conn, release, index.repository_url()) - .await?; + self.process_version_added(conn, release, registry).await?; self.process_version_yank_status(conn, release).await?; } Change::Unyanked(release) | Change::Yanked(release) => { @@ -460,8 +474,9 @@ impl AsyncBuildQueue { ) })?; debug!( - "{}-{} added into build queue", - release.name, release.version + name=%release.name, + version=%release.version, + "added into build queue", ); self.queue_metrics.queued_builds.add(1, &[]); self.deprioritize_other_releases(&release.name, version, PRIORITY_MANUAL_FROM_CRATES_IO) @@ -489,8 +504,9 @@ impl AsyncBuildQueue { ) })?; info!( - "release {}-{} was deleted from the index and the database", - release.name, release.version + name=%release.name, + version=%release.version, + "release was deleted from the index and the database", ); self.queue_crate_invalidation(&release.name).await; self.remove_version_from_queue(&release.name, &version) @@ -503,8 +519,8 @@ impl AsyncBuildQueue { .await .with_context(|| format!("failed to delete crate {krate}"))?; info!( - "crate {} was deleted from the index and the database", - krate + name=%krate, + "crate deleted from the index and the database", ); self.queue_crate_invalidation(krate).await; self.remove_crate_from_queue(krate).await @@ -542,7 +558,12 @@ impl AsyncBuildQueue { .fetch_optional(&mut *conn) .await? { - debug!("{}-{} {}", name, version, activity); + debug!( + %name, + %version, + %activity, + "updating latest version id" + ); update_latest_version_id(&mut *conn, crate_id).await?; } else { match self @@ -552,8 +573,9 @@ impl AsyncBuildQueue { { Ok(false) => { error!( - "tried to yank or unyank non-existing release: {} {}", - name, version + %name, + %version, + "tried to yank or unyank non-existing release", ); } Ok(true) => { @@ -1095,6 +1117,57 @@ mod tests { Ok(()) } + /// Ensure changes can be processed with graceful error handling and proper tracking of added versions + #[tokio::test(flavor = "multi_thread")] + async fn test_process_changes() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + env.fake_release() + .await + .name("krate_already_present") + .version(V1) + .create() + .await?; + + let krate1 = CrateVersion { + name: "krate1".parse()?, + version: V1.to_string().parse()?, + ..Default::default() + }; + let krate2 = CrateVersion { + name: "krate2".parse()?, + version: V1.to_string().parse()?, + ..Default::default() + }; + let krate_already_present = CrateVersion { + name: "krate_already_present".parse()?, + version: V1.to_string().parse()?, + ..Default::default() + }; + let non_existing_krate = CrateVersion { + name: "krate_already_present".parse()?, + version: V2.to_string().parse()?, + ..Default::default() + }; + let added = build_queue + .process_changes( + &mut conn, + &vec![ + Change::Added(krate1), // Should be added correctly + Change::Added(krate2), // Should be added correctly + Change::VersionDeleted(krate_already_present), // Should be deleted correctly, without affecting the returned counter + Change::VersionDeleted(non_existing_krate), // Should error out, but the error should be handled gracefully + ], + None, + ) + .await; + + assert_eq!(added, 2); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_rebuild_when_old() -> Result<()> { let env = TestEnvironment::with_config(