diff --git a/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json b/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json new file mode 100644 index 000000000..14efdbfd4 --- /dev/null +++ b/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET priority = GREATEST(priority, $1)\n WHERE\n name = $2\n AND version != $3\n AND attempt < $4\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Text", + "Text", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c" +} diff --git a/.sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json b/.sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json new file mode 100644 index 000000000..b31520820 --- /dev/null +++ b/.sqlx/query-265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id\n FROM crates\n WHERE name = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "265729219105c1d8021fe898dcbead19f890f2baf677654f404f51ba48e57013" +} diff --git a/.sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json b/.sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json new file mode 100644 index 000000000..fd8f3842a --- /dev/null +++ b/.sqlx/query-8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT yanked\n FROM releases\n WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "yanked", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + true + ] + }, + "hash": "8d1fc18ed39b0b5588dfa3958e6a9dfb847ca73426f9e5c20bf907d9236d10b9" +} diff --git a/.sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json b/.sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json new file mode 100644 index 000000000..4098d58a7 --- /dev/null +++ b/.sqlx/query-de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id\n FROM releases", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "de03b48baa8a4561687aab398857cbc597f224b532a5312983962aef8b187c2a" +} diff --git a/src/build_queue.rs b/src/build_queue.rs index e1d5945ea..f7a403ff0 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -1,3 +1,4 @@ +use crate::db::AsyncPoolClient; use crate::{ BuildPackageSummary, Config, Context, Index, RustwideBuilder, cdn::{self, CdnMetrics}, @@ -14,6 +15,7 @@ use crate::{ }; use anyhow::Context as _; use chrono::NaiveDate; +use crates_index_diff::{Change, CrateVersion}; use fn_error_context::context; use futures_util::{StreamExt, stream::TryStreamExt}; use opentelemetry::metrics::Counter; @@ -275,6 +277,33 @@ impl AsyncBuildQueue { Ok(()) } + + /// Decreases the priority of all releases currently present in the queue not matching the version passed to *at least* new_priority. + pub(crate) async fn deprioritize_other_releases( + &self, + name: &str, + latest_version: &Version, + new_priority: i32, + ) -> Result<()> { + let mut conn = self.db.get_async().await?; + sqlx::query!( + "UPDATE queue + SET priority = GREATEST(priority, $1) + WHERE + name = $2 + AND version != $3 + AND attempt < $4 + ", + new_priority, + name, + latest_version as _, + self.max_attempts, + ) + .execute(&mut *conn) + .await?; + + Ok(()) + } } /// Locking functions. @@ -348,116 +377,137 @@ impl AsyncBuildQueue { debug!("queueing changes from {last_seen_reference} to {new_reference}"); for change in &changes { - if let Some((ref krate, ..)) = change.crate_deleted() { - match delete_crate(&mut conn, &self.storage, &self.config, krate) - .await - .with_context(|| format!("failed to delete crate {krate}")) - { - Ok(_) => info!( - "crate {} was deleted from the index and the database", - krate - ), - Err(err) => report_error(&err), - } - - self.queue_crate_invalidation(krate).await; - self.remove_crate_from_queue(krate).await?; - continue; + if self.process_change(index, &mut conn, change).await? { + crates_added += 1; } + } - if let Some(release) = change.version_deleted() { - let version: Version = release - .version - .parse() - .context("couldn't parse release version as semver")?; - - match delete_version( - &mut conn, - &self.storage, - &self.config, - &release.name, - &version, - ) - .await - .with_context(|| { - format!( - "failed to delete version {}-{}", - release.name, release.version - ) - }) { - Ok(_) => info!( - "release {}-{} was deleted from the index and the database", - release.name, release.version - ), - Err(err) => report_error(&err), - } + // set the reference in the database + // so this survives recreating the registry watcher + // server. + self.set_last_seen_reference(new_reference).await?; - self.queue_crate_invalidation(&release.name).await; - self.remove_version_from_queue(&release.name, &version) + Ok(crates_added) + } + + /// Process a crate change, returning whether the change was a crate addition or not. + async fn process_change( + &self, + index: &Index, + conn: &mut AsyncPoolClient, + change: &Change, + ) -> Result { + match change { + Change::Added(release) => { + self.process_version_added(conn, release, index.repository_url()) + .await? + } + Change::AddedAndYanked(release) => { + self.process_version_added(conn, release, index.repository_url()) .await?; - continue; + self.process_version_yank_status(conn, release).await?; } - - if let Some(release) = change.added() { - let priority = get_crate_priority(&mut conn, &release.name).await?; - - match self - .add_crate( - &release.name, - &release - .version - .parse() - .context("couldn't parse release version as semver")?, - priority, - index.repository_url(), - ) - .await - .with_context(|| { - format!( - "failed adding {}-{} into build queue", - release.name, release.version - ) - }) { - Ok(()) => { - debug!( - "{}-{} added into build queue", - release.name, release.version - ); - self.queue_metrics.queued_builds.add(1, &[]); - crates_added += 1; - } - Err(err) => report_error(&err), - } + Change::Unyanked(release) | Change::Yanked(release) => { + self.process_version_yank_status(conn, release).await? } - - let yanked = change.yanked(); - let unyanked = change.unyanked(); - if let Some(release) = yanked.or(unyanked) { - // FIXME: delay yanks of crates that have not yet finished building - // https://github.com/rust-lang/docs.rs/issues/1934 - if let Ok(release_version) = Version::parse(&release.version) - && let Err(err) = self - .set_yanked_inner( - &mut conn, - release.name.as_str(), - &release_version, - yanked.is_some(), - ) - .await - { - report_error(&err); - } - - self.queue_crate_invalidation(&release.name).await; + Change::CrateDeleted { name, .. } => { + self.process_crate_deleted(conn, name.as_str()).await? } + Change::VersionDeleted(release) => self.process_version_deleted(conn, release).await?, + }; + Ok(change.added().is_some()) + } + + /// Processes crate changes, whether they got yanked or unyanked. + async fn process_version_yank_status( + &self, + conn: &mut AsyncPoolClient, + release: &CrateVersion, + ) -> Result<()> { + // FIXME: delay yanks of crates that have not yet finished building + // https://github.com/rust-lang/docs.rs/issues/1934 + if let Ok(release_version) = Version::parse(&release.version) { + self.set_yanked_inner( + conn, + release.name.as_str(), + &release_version, + release.yanked, + ) + .await?; } - // set the reference in the database - // so this survives recreating the registry watcher - // server. - self.set_last_seen_reference(new_reference).await?; + self.queue_crate_invalidation(&release.name).await; + Ok(()) + } - Ok(crates_added) + async fn process_version_added( + &self, + conn: &mut AsyncPoolClient, + release: &CrateVersion, + registry: Option<&str>, + ) -> Result<()> { + let priority = get_crate_priority(conn, &release.name).await?; + let version = &release + .version + .parse() + .context("couldn't parse release version as semver")?; + self.add_crate(&release.name, version, priority, registry) + .await + .with_context(|| { + format!( + "failed adding {}-{} into build queue", + release.name, release.version + ) + })?; + debug!( + "{}-{} added into build queue", + release.name, release.version + ); + self.queue_metrics.queued_builds.add(1, &[]); + self.deprioritize_other_releases(&release.name, version, PRIORITY_MANUAL_FROM_CRATES_IO) + .await + .unwrap_or_else(|err| report_error(&err)); + Ok(()) + } + + async fn process_version_deleted( + &self, + conn: &mut AsyncPoolClient, + release: &CrateVersion, + ) -> Result<()> { + let version: Version = release + .version + .parse() + .context("couldn't parse release version as semver")?; + + delete_version(conn, &self.storage, &self.config, &release.name, &version) + .await + .with_context(|| { + format!( + "failed to delete version {}-{}", + release.name, release.version + ) + })?; + info!( + "release {}-{} was deleted from the index and the database", + release.name, release.version + ); + self.queue_crate_invalidation(&release.name).await; + self.remove_version_from_queue(&release.name, &version) + .await?; + Ok(()) + } + + async fn process_crate_deleted(&self, conn: &mut AsyncPoolClient, krate: &str) -> Result<()> { + delete_crate(conn, &self.storage, &self.config, krate) + .await + .with_context(|| format!("failed to delete crate {krate}"))?; + info!( + "crate {} was deleted from the index and the database", + krate + ); + self.queue_crate_invalidation(krate).await; + self.remove_crate_from_queue(krate).await } pub async fn set_yanked(&self, name: &str, version: &Version, yanked: bool) -> Result<()> { @@ -880,6 +930,171 @@ mod tests { use chrono::Utc; use std::time::Duration; + #[tokio::test(flavor = "multi_thread")] + async fn test_process_version_added() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + let krate = CrateVersion { + name: "krate".parse()?, + version: V1.to_string().parse()?, + ..Default::default() + }; + build_queue + .process_version_added(&mut conn, &krate, None) + .await?; + let queue = build_queue.queued_crates().await?; + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].priority, PRIORITY_DEFAULT); + + let krate = CrateVersion { + name: "krate".parse()?, + version: V2.to_string().parse()?, + ..Default::default() + }; + build_queue + .process_version_added(&mut conn, &krate, None) + .await?; + let queue = build_queue.queued_crates().await?; + assert_eq!(queue.len(), 2); + // The other queued version should be deprioritized + assert_eq!(queue[0].version, V2); + assert_eq!(queue[0].priority, PRIORITY_DEFAULT); + assert_eq!(queue[1].version, V1); + assert_eq!(queue[1].priority, PRIORITY_MANUAL_FROM_CRATES_IO); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_version_yank_status() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + // Given a release that is yanked + let id = env + .fake_release() + .await + .name("krate") + .version(V1) + .create() + .await?; + // Simulate a yank change + let krate = CrateVersion { + name: "krate".parse()?, + version: V1.to_string().parse()?, + yanked: true, + ..Default::default() + }; + build_queue + .process_version_yank_status(&mut conn, &krate) + .await?; + + // And verify it's actually marked as yanked + let row = sqlx::query!( + "SELECT yanked + FROM releases + WHERE id = $1", + id.0 + ) + .fetch_one(&mut *conn) + .await?; + assert_eq!(row.yanked, Some(true)); + + // Verify whether we can unyank it too + let krate = CrateVersion { + name: "krate".parse()?, + version: V1.to_string().parse()?, + yanked: false, + ..Default::default() + }; + build_queue + .process_version_yank_status(&mut conn, &krate) + .await?; + + let row = sqlx::query!( + "SELECT yanked + FROM releases + WHERE id = $1", + id.0 + ) + .fetch_one(&mut *conn) + .await?; + assert_eq!(row.yanked, Some(false)); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_crate_deleted() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + env.fake_release() + .await + .name("krate") + .version(V1) + .create() + .await?; + build_queue + .process_crate_deleted(&mut conn, "krate") + .await?; + + let row = sqlx::query!( + "SELECT id + FROM crates + WHERE name = $1", + "krate" + ) + .fetch_optional(&mut *conn) + .await?; + assert!(row.is_none()); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_version_deleted() -> Result<()> { + let env = TestEnvironment::new().await?; + let build_queue = env.async_build_queue(); + let mut conn = env.async_db().async_conn().await; + + let rid_1 = env + .fake_release() + .await + .name("krate") + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name("krate") + .version(V2) + .create() + .await?; + + let krate = CrateVersion { + name: "krate".parse()?, + version: V2.to_string().parse()?, + ..Default::default() + }; + build_queue + .process_version_deleted(&mut conn, &krate) + .await?; + + let row = sqlx::query!( + "SELECT id + FROM releases", + ) + .fetch_all(&mut *conn) + .await?; + assert_eq!(row.len(), 1); + assert_eq!(row[0].id, rid_1.0); + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_rebuild_when_old() -> Result<()> { let env = TestEnvironment::with_config(