diff --git a/Cargo.lock b/Cargo.lock index cc4b87327..1774ad119 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -536,28 +536,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "aws-sdk-cloudfront" -version = "1.105.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "274d73c589ac39d97a42e553a5b9b8a79cd51b04e1078e7499f81f4bd9e85d74" -dependencies = [ - "aws-credential-types", - "aws-runtime", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "fastrand", - "http 0.2.12", - "regex-lite", - "tracing", -] - [[package]] name = "aws-sdk-s3" version = "1.115.0" @@ -2019,7 +1997,6 @@ dependencies = [ "async-stream", "async-trait", "aws-config", - "aws-sdk-cloudfront", "aws-sdk-s3", "aws-smithy-runtime", "aws-smithy-types", diff --git a/Cargo.toml b/Cargo.toml index 8be28cb18..4c737db90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,6 @@ futures-util = "0.3.5" async-stream = "0.3.5" aws-config = "1.0.0" aws-sdk-s3 = "1.3.0" -aws-sdk-cloudfront = "1.3.0" aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } http = "1.0.0" uuid = { version = "1.1.2", features = ["v4"]} diff --git a/migrations/20251202040858_remove-cdn-invalidation-queue.down.sql b/migrations/20251202040858_remove-cdn-invalidation-queue.down.sql new file mode 100644 index 000000000..3de7ef61e --- /dev/null +++ b/migrations/20251202040858_remove-cdn-invalidation-queue.down.sql @@ -0,0 +1,28 @@ +-- content copy/pasted from `.._initial.up.sql` + +CREATE TABLE cdn_invalidation_queue ( + id bigint NOT NULL, + crate character varying(255) NOT NULL, + cdn_distribution_id character varying(255) NOT NULL, + path_pattern text NOT NULL, + queued timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, + created_in_cdn timestamp with time zone, + cdn_reference character varying(255) +); + + +CREATE SEQUENCE cdn_invalidation_queue_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + + +ALTER SEQUENCE cdn_invalidation_queue_id_seq OWNED BY cdn_invalidation_queue.id; +ALTER TABLE ONLY cdn_invalidation_queue ALTER COLUMN id SET DEFAULT nextval('cdn_invalidation_queue_id_seq'::regclass); + +CREATE INDEX cdn_invalidation_queue_cdn_reference_idx ON cdn_invalidation_queue USING btree (cdn_reference); +CREATE INDEX cdn_invalidation_queue_crate_idx ON cdn_invalidation_queue USING btree (crate); +CREATE INDEX cdn_invalidation_queue_created_in_cdn_idx ON cdn_invalidation_queue USING btree (created_in_cdn); diff --git a/migrations/20251202040858_remove-cdn-invalidation-queue.up.sql b/migrations/20251202040858_remove-cdn-invalidation-queue.up.sql new file mode 100644 index 000000000..97d4b1b88 --- /dev/null +++ b/migrations/20251202040858_remove-cdn-invalidation-queue.up.sql @@ -0,0 +1 @@ +DROP TABLE cdn_invalidation_queue; diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index b08c9975d..69cc2a3cc 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -144,8 +144,6 @@ enum CommandLine { value_enum )] repository_stats_updater: Toggle, - #[arg(long = "cdn-invalidator", default_value = "enabled", value_enum)] - cdn_invalidator: Toggle, #[arg(long = "queue-rebuilds", default_value = "enabled", value_enum)] queue_rebuilds: Toggle, }, @@ -182,15 +180,11 @@ impl CommandLine { Self::Build { subcommand } => subcommand.handle_args(ctx)?, Self::StartRegistryWatcher { repository_stats_updater, - cdn_invalidator, queue_rebuilds, } => { if repository_stats_updater == Toggle::Enabled { docs_rs::utils::daemon::start_background_repository_stats_updater(&ctx)?; } - if cdn_invalidator == Toggle::Enabled { - docs_rs::utils::daemon::start_background_cdn_invalidator(&ctx)?; - } if queue_rebuilds == Toggle::Enabled { docs_rs::utils::daemon::start_background_queue_rebuild(&ctx)?; } diff --git a/src/build_queue.rs b/src/build_queue.rs index 112d6d438..e1d5945ea 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -303,7 +303,7 @@ impl AsyncBuildQueue { /// Index methods. impl AsyncBuildQueue { - async fn queue_crate_invalidation(&self, conn: &mut sqlx::PgConnection, krate: &str) { + async fn queue_crate_invalidation(&self, krate: &str) { let krate = match krate .parse::() .with_context(|| format!("can't parse crate name '{}'", krate)) @@ -316,7 +316,7 @@ impl AsyncBuildQueue { }; if let Err(err) = - cdn::queue_crate_invalidation(conn, &self.config, &self.cdn_metrics, &krate).await + cdn::queue_crate_invalidation(&self.config, &self.cdn_metrics, &krate).await { report_error(&err); } @@ -360,7 +360,7 @@ impl AsyncBuildQueue { Err(err) => report_error(&err), } - self.queue_crate_invalidation(&mut conn, krate).await; + self.queue_crate_invalidation(krate).await; self.remove_crate_from_queue(krate).await?; continue; } @@ -392,8 +392,7 @@ impl AsyncBuildQueue { Err(err) => report_error(&err), } - self.queue_crate_invalidation(&mut conn, &release.name) - .await; + self.queue_crate_invalidation(&release.name).await; self.remove_version_from_queue(&release.name, &version) .await?; continue; @@ -449,8 +448,7 @@ impl AsyncBuildQueue { report_error(&err); } - self.queue_crate_invalidation(&mut conn, &release.name) - .await; + self.queue_crate_invalidation(&release.name).await; } } @@ -639,10 +637,8 @@ impl BuildQueue { self.inner.builder_metrics.total_builds.add(1, &[]); - self.runtime.block_on( - self.inner - .queue_crate_invalidation(&mut transaction, &to_process.name), - ); + self.runtime + .block_on(self.inner.queue_crate_invalidation(&to_process.name)); let mut increase_attempt_count = || -> Result<()> { let attempt: i32 = self.runtime.block_on( @@ -1405,72 +1401,66 @@ mod tests { 9 ); - // no invalidations were run since we don't have a distribution id configured - assert!( - env.runtime() - .block_on(async { - cdn::cloudfront::queued_or_active_crate_invalidations( - &mut *env.async_db().async_conn().await, - ) - .await - })? - .is_empty() - ); - Ok(()) } #[test] - fn test_invalidate_cdn_after_build_and_error() -> Result<()> { + fn test_invalidate_cdn_after_error() -> Result<()> { + let mut fastly_api = mockito::Server::new(); + let env = TestEnvironment::with_config_and_runtime( TestEnvironment::base_config() - .cloudfront_distribution_id_web(Some("distribution_id_web".into())) - .cloudfront_distribution_id_static(Some("distribution_id_static".into())) + .fastly_api_host(fastly_api.url().parse().unwrap()) + .fastly_api_token(Some("test-token".into())) + .fastly_service_sid(Some("test-sid-1".into())) .build()?, )?; let queue = env.build_queue(); - queue.add_crate("will_succeed", &V1, -1, None)?; - queue.add_crate("will_fail", &V1, 0, None)?; + let m = fastly_api + .mock("POST", "/service/test-sid-1/purge") + .with_status(200) + .create(); - let fetch_invalidations = || { - env.runtime() - .block_on(async { - let mut conn = env.async_db().async_conn().await; - cdn::cloudfront::queued_or_active_crate_invalidations(&mut conn).await - }) - .unwrap() - }; - - assert!(fetch_invalidations().is_empty()); + queue.add_crate("will_fail", &V1, 0, None)?; queue.process_next_crate(|krate| { - assert_eq!("will_succeed", krate.name); - Ok(BuildPackageSummary::default()) + assert_eq!("will_fail", krate.name); + anyhow::bail!("simulate a failure"); })?; - let queued_invalidations = fetch_invalidations(); - assert_eq!(queued_invalidations.len(), 3); - assert!( - queued_invalidations - .iter() - .all(|i| i.krate == "will_succeed") - ); + m.expect(1).assert(); + + Ok(()) + } + #[test] + fn test_invalidate_cdn_after_build() -> Result<()> { + let mut fastly_api = mockito::Server::new(); + + let env = TestEnvironment::with_config_and_runtime( + TestEnvironment::base_config() + .fastly_api_host(fastly_api.url().parse().unwrap()) + .fastly_api_token(Some("test-token".into())) + .fastly_service_sid(Some("test-sid-1".into())) + .build()?, + )?; + + let queue = env.build_queue(); + + let m = fastly_api + .mock("POST", "/service/test-sid-1/purge") + .with_status(200) + .create(); + + queue.add_crate("will_succeed", &V1, -1, None)?; queue.process_next_crate(|krate| { - assert_eq!("will_fail", krate.name); - anyhow::bail!("simulate a failure"); + assert_eq!("will_succeed", krate.name); + Ok(BuildPackageSummary::default()) })?; - let queued_invalidations = fetch_invalidations(); - assert_eq!(queued_invalidations.len(), 6); - assert!( - queued_invalidations - .iter() - .skip(3) - .all(|i| i.krate == "will_fail") - ); + m.expect(1).assert(); Ok(()) } diff --git a/src/cdn/cloudfront.rs b/src/cdn/cloudfront.rs deleted file mode 100644 index 4ee4c6f36..000000000 --- a/src/cdn/cloudfront.rs +++ /dev/null @@ -1,1276 +0,0 @@ -use super::CdnMetrics; -use crate::{Config, utils::report_error}; -use anyhow::{Context, Error, Result, anyhow, bail}; -use aws_config::BehaviorVersion; -use aws_sdk_cloudfront::{ - Client, - config::{Region, retry::RetryConfig}, - error::SdkError, - types::{InvalidationBatch, Paths}, -}; -use chrono::{DateTime, Utc}; -use futures_util::stream::TryStreamExt; -use opentelemetry::KeyValue; -use serde::Serialize; -use sqlx::Connection as _; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; -use strum::EnumString; -use tracing::{debug, info, instrument, warn}; -use uuid::Uuid; - -/// maximum amount of parallel in-progress wildcard invalidations -/// The actual limit is 15, but we want to keep some room for manually -/// triggered invalidations -const MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS: i32 = 13; - -#[derive(Debug, EnumString)] -pub enum CdnKind { - #[strum(ascii_case_insensitive)] - Dummy, - - #[strum(ascii_case_insensitive)] - CloudFront, -} - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct CdnInvalidation { - pub(crate) distribution_id: String, - pub(crate) invalidation_id: String, - pub(crate) path_patterns: Vec, - pub(crate) completed: bool, -} - -pub enum CdnBackend { - Dummy { - invalidation_requests: Arc>>, - }, - CloudFront { - client: Client, - }, -} - -impl CdnBackend { - pub async fn new(config: &Config) -> CdnBackend { - match config.cdn_backend { - CdnKind::CloudFront => { - let shared_config = aws_config::load_defaults(BehaviorVersion::latest()).await; - let config_builder = aws_sdk_cloudfront::config::Builder::from(&shared_config) - .retry_config( - RetryConfig::standard().with_max_attempts(config.aws_sdk_max_retries), - ) - .region(Region::new(config.s3_region.clone())); - - Self::CloudFront { - client: Client::from_conf(config_builder.build()), - } - } - CdnKind::Dummy => Self::Dummy { - invalidation_requests: Arc::new(Mutex::new(Vec::new())), - }, - } - } - /// create a Front invalidation request for a list of path patterns. - /// patterns can be - /// * `/filename.ext` (a specific path) - /// * `/directory-path/file-name.*` (delete these files, all extensions) - /// * `/directory-path/*` (invalidate all of the files in a directory, without subdirectories) - /// * `/directory-path*` (recursive directory delete, including subdirectories) - /// see https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/Invalidation.html#invalidation-specifying-objects - /// - /// Returns the caller reference that can be used to query the status of this - /// invalidation request. - #[instrument(skip(self))] - async fn create_invalidation( - &self, - distribution_id: &str, - path_patterns: &[&str], - ) -> Result { - let caller_reference = Uuid::new_v4(); - - match *self { - CdnBackend::CloudFront { ref client, .. } => { - let id = CdnBackend::create_cloudfront_invalidation( - client, - distribution_id, - &caller_reference.to_string(), - path_patterns, - ) - .await?; - Ok(CdnInvalidation { - distribution_id: distribution_id.to_owned(), - invalidation_id: id, - path_patterns: path_patterns.iter().cloned().map(str::to_owned).collect(), - completed: false, - }) - } - CdnBackend::Dummy { - ref invalidation_requests, - .. - } => { - let mut invalidation_requests = invalidation_requests - .lock() - .expect("could not lock mutex on dummy CDN"); - - let invalidation = CdnInvalidation { - distribution_id: distribution_id.to_owned(), - invalidation_id: caller_reference.to_string(), - path_patterns: path_patterns.iter().cloned().map(str::to_owned).collect(), - completed: false, - }; - - invalidation_requests.push(invalidation.clone()); - Ok(invalidation) - } - } - } - - #[cfg(test)] - fn insert_completed_invalidation( - &self, - distribution_id: &str, - invalidation_id: &str, - path_patterns: &[&str], - ) { - let CdnBackend::Dummy { - invalidation_requests, - .. - } = self - else { - panic!("invalid CDN backend"); - }; - - let mut invalidation_requests = invalidation_requests - .lock() - .expect("could not lock mutex on dummy CDN"); - - invalidation_requests.push(CdnInvalidation { - distribution_id: distribution_id.to_owned(), - invalidation_id: invalidation_id.to_owned(), - path_patterns: path_patterns.iter().cloned().map(str::to_owned).collect(), - completed: true, - }); - } - - #[cfg(test)] - fn clear_active_invalidations(&self) { - match self { - CdnBackend::Dummy { - invalidation_requests, - .. - } => { - invalidation_requests - .lock() - .expect("could not lock mutex on dummy CDN") - .clear(); - } - CdnBackend::CloudFront { .. } => unreachable!(), - } - } - - async fn invalidation_status( - &self, - distribution_id: &str, - invalidation_id: &str, - ) -> Result, Error> { - match self { - CdnBackend::Dummy { - invalidation_requests, - .. - } => { - let invalidation_requests = invalidation_requests - .lock() - .expect("could not lock mutex on dummy CDN"); - - Ok(invalidation_requests - .iter() - .find(|i| { - i.distribution_id == distribution_id && i.invalidation_id == invalidation_id - }) - .cloned()) - } - CdnBackend::CloudFront { client, .. } => { - Ok(CdnBackend::get_cloudfront_invalidation_status( - client, - distribution_id, - invalidation_id, - ) - .await?) - } - } - } - - #[instrument] - async fn get_cloudfront_invalidation_status( - client: &Client, - distribution_id: &str, - invalidation_id: &str, - ) -> Result, Error> { - let response = match client - .get_invalidation() - .distribution_id(distribution_id) - .id(invalidation_id.to_owned()) - .send() - .await - { - Ok(response) => response, - Err(SdkError::ServiceError(err)) => { - if err.raw().status().as_u16() == http::StatusCode::NOT_FOUND.as_u16() { - return Ok(None); - } else { - return Err(err.into_err().into()); - } - } - Err(err) => return Err(err.into()), - }; - - let Some(invalidation) = response.invalidation() else { - bail!("missing invalidation in response"); - }; - - let patterns = invalidation - .invalidation_batch() - .and_then(|batch| batch.paths()) - .map(|paths| paths.items()) - .unwrap_or_default() - .to_vec(); - - if patterns.is_empty() { - warn!( - invalidation_id, - ?invalidation, - "got invalidation detail response without paths" - ); - } - Ok(Some(CdnInvalidation { - distribution_id: distribution_id.to_owned(), - invalidation_id: invalidation_id.to_owned(), - path_patterns: patterns, - completed: match invalidation.status() { - "InProgress" => false, - "Completed" => true, - _ => { - report_error(&anyhow!( - "got unknown cloudfront invalidation status: {} in {:?}", - invalidation.status(), - invalidation - )); - true - } - }, - })) - } - - #[instrument] - async fn create_cloudfront_invalidation( - client: &Client, - distribution_id: &str, - caller_reference: &str, - path_patterns: &[&str], - ) -> Result { - let path_patterns: Vec<_> = path_patterns.iter().cloned().map(String::from).collect(); - - Ok(client - .create_invalidation() - .distribution_id(distribution_id) - .invalidation_batch( - InvalidationBatch::builder() - .paths( - Paths::builder() - .quantity(path_patterns.len().try_into().unwrap()) - .set_items(Some(path_patterns)) - .build() - .context("could not build path items")?, - ) - .caller_reference(caller_reference) - .build() - .context("could not build invalidation batch")?, - ) - .send() - .await? - .invalidation() - .ok_or_else(|| { - anyhow!("missing invalidation information in create-invalidation result") - })? - .id() - .to_owned()) - } -} - -/// fully invalidate the CDN distribution, also emptying the queue. -#[instrument(skip_all, fields(distribution_id))] -pub(crate) async fn full_invalidation( - cdn: &CdnBackend, - otel_metrics: &CdnMetrics, - conn: &mut sqlx::PgConnection, - distribution_id: &str, -) -> Result<()> { - let mut transaction = conn.begin().await?; - - let now = Utc::now(); - for row in sqlx::query!( - "SELECT queued - FROM cdn_invalidation_queue - WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL - FOR UPDATE", - distribution_id, - ) - .fetch_all(&mut *transaction) - .await? - { - if let Ok(duration) = (now - row.queued).to_std() { - let duration = duration.as_secs_f64(); - otel_metrics.queue_time.record( - duration, - &[KeyValue::new("distribution", distribution_id.to_string())], - ); - } - } - - match cdn - .create_invalidation(distribution_id, &["/*"]) - .await - .context("error creating new invalidation") - { - Ok(invalidation) => { - sqlx::query!( - "UPDATE cdn_invalidation_queue - SET - created_in_cdn = CURRENT_TIMESTAMP, - cdn_reference = $1 - WHERE - cdn_distribution_id = $2 AND created_in_cdn IS NULL", - invalidation.invalidation_id, - distribution_id, - ) - .execute(&mut *transaction) - .await?; - - transaction.commit().await?; - } - Err(err) => return Err(err), - }; - - Ok(()) -} - -#[instrument(skip_all, fields(distribution_id))] -pub(crate) async fn handle_queued_invalidation_requests( - config: &Config, - cdn: &CdnBackend, - otel_metrics: &CdnMetrics, - conn: &mut sqlx::PgConnection, - distribution_id: &str, -) -> Result<()> { - info!("handling queued CDN invalidations"); - - let mut active_invalidations = Vec::new(); - for row in sqlx::query!( - r#"SELECT - DISTINCT cdn_reference as "cdn_reference!" - FROM cdn_invalidation_queue - WHERE - cdn_reference IS NOT NULL AND - cdn_distribution_id = $1 - "#, - distribution_id, - ) - .fetch_all(&mut *conn) - .await? - { - if let Some(status) = cdn - .invalidation_status(distribution_id, &row.cdn_reference) - .await? - && !status.completed - { - active_invalidations.push(status); - } - } - - // for now we assume all invalidation paths are wildcard invalidations, - // so we apply the wildcard limit. - let active_path_invalidations: usize = active_invalidations - .iter() - .map(|i| i.path_patterns.len()) - .sum(); - - debug!( - active_invalidations = active_invalidations.len(), - active_path_invalidations, "found active invalidations", - ); - - // remove the invalidation from the queue when they are completed. - // We're only looking at InProgress invalidations, - // we don't differentiate between `Completed` ones, and invalidations - // missing in the CloudFront `ListInvalidations` response. - let now = Utc::now(); - for row in sqlx::query!( - "DELETE FROM cdn_invalidation_queue - WHERE - cdn_distribution_id = $1 AND - created_in_cdn IS NOT NULL AND - NOT (cdn_reference = ANY($2)) - RETURNING created_in_cdn - ", - &distribution_id, - &active_invalidations - .iter() - .map(|i| i.invalidation_id.clone()) - .collect::>(), - ) - .fetch_all(&mut *conn) - .await? - { - if let Ok(duration) = (now - row.created_in_cdn.expect("this is always Some")).to_std() { - let duration = duration.as_secs_f64(); - otel_metrics.invalidation_time.record( - duration, - &[KeyValue::new("distribution", distribution_id.to_string())], - ); - } - } - let possible_path_invalidations: i32 = - MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS - active_path_invalidations as i32; - - if possible_path_invalidations <= 0 { - info!( - active_path_invalidations, - "too many active cloudfront wildcard invalidations \ - will not create a new one." - ); - return Ok(()); - } - - if let Some(min_queued) = sqlx::query_scalar!( - "SELECT - min(queued) - FROM cdn_invalidation_queue - WHERE - cdn_distribution_id = $1 AND - created_in_cdn IS NULL", - distribution_id - ) - .fetch_one(&mut *conn) - .await? - && (now - min_queued).to_std().unwrap_or_default() >= config.cdn_max_queued_age - { - full_invalidation(cdn, otel_metrics, conn, distribution_id).await?; - return Ok(()); - } - - // create new an invalidation for the queued path patterns - let mut transaction = conn.begin().await?; - let mut path_patterns: Vec = Vec::new(); - let mut queued_entry_ids: Vec = Vec::new(); - - for row in sqlx::query!( - "SELECT id, path_pattern, queued - FROM cdn_invalidation_queue - WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL - ORDER BY queued, id - LIMIT $2 - FOR UPDATE", - distribution_id, - &(possible_path_invalidations as i64) - ) - .fetch_all(&mut *transaction) - .await? - { - queued_entry_ids.push(row.id); - path_patterns.push(row.path_pattern); - - if let Ok(duration) = (now - row.queued).to_std() { - let duration = duration.as_secs_f64(); - otel_metrics.queue_time.record( - duration, - &[KeyValue::new("distribution", distribution_id.to_string())], - ); - } - } - - if path_patterns.is_empty() { - info!("no queued path patterns to invalidate, going back to sleep"); - return Ok(()); - } - - match cdn - .create_invalidation( - distribution_id, - &path_patterns.iter().map(String::as_str).collect::>(), - ) - .await - .context("error creating new invalidation") - { - Ok(invalidation) => { - sqlx::query!( - "UPDATE cdn_invalidation_queue - SET - created_in_cdn = CURRENT_TIMESTAMP, - cdn_reference = $1 - WHERE - id = ANY($2)", - invalidation.invalidation_id, - &queued_entry_ids, - ) - .execute(&mut *transaction) - .await?; - - transaction.commit().await?; - } - Err(err) => return Err(err), - } - - Ok(()) -} - -#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)] -pub(crate) struct QueuedInvalidation { - pub krate: String, - pub cdn_distribution_id: String, - pub path_pattern: String, - pub queued: DateTime, - pub created_in_cdn: Option>, - pub cdn_reference: Option, -} - -/// Return which crates have queued or active cloudfront invalidations. -pub(crate) async fn queued_or_active_crate_invalidations( - conn: &mut sqlx::PgConnection, -) -> Result> { - Ok(sqlx::query_as!( - QueuedInvalidation, - r#" - SELECT - crate as "krate", - cdn_distribution_id, - path_pattern, - queued, - created_in_cdn, - cdn_reference - FROM cdn_invalidation_queue - ORDER BY queued, id"#, - ) - .fetch_all(&mut *conn) - .await?) -} - -/// Return the count of queued or active invalidations, per distribution id -pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution( - conn: &mut sqlx::PgConnection, - config: &Config, -) -> Result> { - let mut result: HashMap = HashMap::from_iter( - config - .cloudfront_distribution_id_web - .iter() - .chain(config.cloudfront_distribution_id_static.iter()) - .cloned() - .map(|id| (id, 0)), - ); - - result.extend( - sqlx::query!( - r#" - SELECT - cdn_distribution_id, - count(*) as "count!" - FROM cdn_invalidation_queue - GROUP BY cdn_distribution_id"#, - ) - .fetch(&mut *conn) - .map_ok(|row| (row.cdn_distribution_id, row.count)) - .try_collect::>() - .await?, - ); - - Ok(result) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{cdn::queue_crate_invalidation, test::TestEnvironment}; - use aws_sdk_cloudfront::{Config, config::Credentials}; - use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; - use aws_smithy_types::body::SdkBody; - use std::time::Duration; - - const DISTRIBUTION_ID_WEB: &str = "distribution_id_web"; - const DISTRIBUTION_ID_STATIC: &str = "distribution_id_static"; - - fn config_with_cdn() -> crate::config::ConfigBuilder { - TestEnvironment::base_config() - .cloudfront_distribution_id_web(Some(DISTRIBUTION_ID_WEB.into())) - .cloudfront_distribution_id_static(Some(DISTRIBUTION_ID_STATIC.into())) - } - - fn active_invalidations(cdn: &CdnBackend, distribution_id: &str) -> Vec { - let CdnBackend::Dummy { - invalidation_requests, - .. - } = cdn - else { - panic!("invalid CDN backend"); - }; - - let invalidation_requests = invalidation_requests - .lock() - .expect("could not lock mutex on dummy CDN"); - - invalidation_requests - .iter() - .filter(|i| !i.completed && i.distribution_id == distribution_id) - .cloned() - .collect() - } - - async fn insert_running_invalidation( - conn: &mut sqlx::PgConnection, - distribution_id: &str, - invalidation_id: &str, - ) -> Result<()> { - sqlx::query!( - "INSERT INTO cdn_invalidation_queue ( - crate, cdn_distribution_id, path_pattern, queued, created_in_cdn, cdn_reference - ) VALUES ( - 'dummy', - $1, - '/doesnt_matter', - CURRENT_TIMESTAMP, - CURRENT_TIMESTAMP, - $2 - )", - distribution_id, - invalidation_id - ) - .execute(&mut *conn) - .await?; - Ok(()) - } - - fn otel_metrics(env: &TestEnvironment) -> CdnMetrics { - CdnMetrics::new(env.meter_provider()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn create_cloudfront() -> Result<()> { - let env = TestEnvironment::with_config( - TestEnvironment::base_config() - .cdn_backend(CdnKind::CloudFront) - .build()?, - ) - .await?; - - assert!(matches!(*env.cdn(), CdnBackend::CloudFront { .. })); - assert!(matches!( - CdnBackend::new(env.config()).await, - CdnBackend::CloudFront { .. } - )); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn create_dummy() -> Result<()> { - let env = TestEnvironment::new().await?; - - assert!(matches!(*env.cdn(), CdnBackend::Dummy { .. })); - assert!(matches!( - CdnBackend::new(env.config()).await, - CdnBackend::Dummy { .. } - )); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn invalidation_counts_are_zero_with_empty_queue() -> Result<()> { - let env = TestEnvironment::with_config(config_with_cdn().build()?).await?; - - let config = env.config(); - let mut conn = env.async_db().async_conn().await; - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - - let counts = dbg!( - queued_or_active_crate_invalidation_count_by_distribution(&mut conn, config).await? - ); - assert_eq!(counts.len(), 2); - assert_eq!(*counts.get(DISTRIBUTION_ID_WEB).unwrap(), 0); - assert_eq!(*counts.get(DISTRIBUTION_ID_STATIC).unwrap(), 0); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn escalate_to_full_invalidation() -> Result<()> { - let env = TestEnvironment::with_config( - config_with_cdn() - .cdn_max_queued_age(Duration::from_secs(0)) - .build()?, - ) - .await?; - - let cdn = env.cdn(); - let config = env.config(); - let mut conn = env.async_db().async_conn().await; - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - - let metrics = otel_metrics(&env); - queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) - .await?; - - // invalidation paths are queued. - assert_eq!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .into_iter() - .map(|i| ( - i.cdn_distribution_id, - i.krate, - i.path_pattern, - i.cdn_reference - )) - .collect::>(), - vec![ - ( - DISTRIBUTION_ID_WEB.into(), - "krate".into(), - "/krate*".into(), - None - ), - ( - DISTRIBUTION_ID_WEB.into(), - "krate".into(), - "/crate/krate*".into(), - None - ), - ( - DISTRIBUTION_ID_STATIC.into(), - "krate".into(), - "/rustdoc/krate*".into(), - None - ), - ] - ); - - let counts = - queued_or_active_crate_invalidation_count_by_distribution(&mut conn, config).await?; - assert_eq!(counts.len(), 2); - assert_eq!(*counts.get(DISTRIBUTION_ID_WEB).unwrap(), 2); - assert_eq!(*counts.get(DISTRIBUTION_ID_STATIC).unwrap(), 1); - - // queueing the invalidation doesn't create it in the CDN - assert!(active_invalidations(cdn, DISTRIBUTION_ID_WEB).is_empty()); - assert!(active_invalidations(cdn, DISTRIBUTION_ID_STATIC).is_empty()); - - let cdn = env.cdn(); - let config = env.config(); - let metrics = otel_metrics(&env); - - // now handle the queued invalidations - handle_queued_invalidation_requests(config, cdn, &metrics, &mut conn, DISTRIBUTION_ID_WEB) - .await?; - handle_queued_invalidation_requests( - config, - cdn, - &metrics, - &mut conn, - DISTRIBUTION_ID_STATIC, - ) - .await?; - - // which creates them in the CDN - { - let ir_web = active_invalidations(cdn, DISTRIBUTION_ID_WEB); - assert_eq!(ir_web.len(), 1); - assert_eq!(ir_web[0].path_patterns, vec!["/*"]); - - let ir_static = active_invalidations(cdn, DISTRIBUTION_ID_STATIC); - assert_eq!(ir_web.len(), 1); - assert_eq!(ir_static[0].path_patterns, vec!["/*"]); - } - - // the queued entries got a CDN reference attached - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .iter() - .all(|i| i.cdn_reference.is_some() && i.created_in_cdn.is_some()) - ); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn invalidate_a_crate() -> Result<()> { - let env = TestEnvironment::with_config(config_with_cdn().build()?).await?; - - let cdn = env.cdn(); - let config = env.config(); - let mut conn = env.async_db().async_conn().await; - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - - let metrics = otel_metrics(&env); - queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) - .await?; - - // invalidation paths are queued. - assert_eq!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .into_iter() - .map(|i| ( - i.cdn_distribution_id, - i.krate, - i.path_pattern, - i.cdn_reference - )) - .collect::>(), - vec![ - ( - DISTRIBUTION_ID_WEB.into(), - "krate".into(), - "/krate*".into(), - None - ), - ( - DISTRIBUTION_ID_WEB.into(), - "krate".into(), - "/crate/krate*".into(), - None - ), - ( - DISTRIBUTION_ID_STATIC.into(), - "krate".into(), - "/rustdoc/krate*".into(), - None - ), - ] - ); - - let counts = - queued_or_active_crate_invalidation_count_by_distribution(&mut conn, config).await?; - assert_eq!(counts.len(), 2); - assert_eq!(*counts.get(DISTRIBUTION_ID_WEB).unwrap(), 2); - assert_eq!(*counts.get(DISTRIBUTION_ID_STATIC).unwrap(), 1); - - // queueing the invalidation doesn't create it in the CDN - assert!(active_invalidations(cdn, DISTRIBUTION_ID_WEB).is_empty()); - assert!(active_invalidations(cdn, DISTRIBUTION_ID_STATIC).is_empty()); - - let cdn = env.cdn(); - let config = env.config(); - let metrics = otel_metrics(&env); - - // now handle the queued invalidations - handle_queued_invalidation_requests(config, cdn, &metrics, &mut conn, DISTRIBUTION_ID_WEB) - .await?; - handle_queued_invalidation_requests( - config, - cdn, - &metrics, - &mut conn, - DISTRIBUTION_ID_STATIC, - ) - .await?; - - // which creates them in the CDN - { - let ir_web = active_invalidations(cdn, DISTRIBUTION_ID_WEB); - assert_eq!(ir_web.len(), 1); - assert_eq!(ir_web[0].path_patterns, vec!["/krate*", "/crate/krate*"]); - - let ir_static = active_invalidations(cdn, DISTRIBUTION_ID_STATIC); - assert_eq!(ir_web.len(), 1); - assert_eq!(ir_static[0].path_patterns, vec!["/rustdoc/krate*"]); - } - - // the queued entries got a CDN reference attached - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .iter() - .all(|i| i.cdn_reference.is_some() && i.created_in_cdn.is_some()) - ); - - // clear the active invalidations in the CDN to _fake_ them - // being completed on the CDN side. - cdn.clear_active_invalidations(); - - // now handle again - handle_queued_invalidation_requests(config, cdn, &metrics, &mut conn, DISTRIBUTION_ID_WEB) - .await?; - handle_queued_invalidation_requests( - config, - cdn, - &metrics, - &mut conn, - DISTRIBUTION_ID_STATIC, - ) - .await?; - - // which removes them from the queue table - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn only_add_some_invalidations_when_too_many_are_active() -> Result<()> { - let env = TestEnvironment::with_config(config_with_cdn().build()?).await?; - - let cdn = env.cdn(); - - // create an invalidation with 15 paths, so we're over the limit - let already_running_invalidation = cdn - .create_invalidation( - DISTRIBUTION_ID_WEB, - &(0..(MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS - 1)) - .map(|_| "/something*") - .collect::>(), - ) - .await?; - - let mut conn = env.async_db().async_conn().await; - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - - // insert some completed invalidations into the queue & the CDN, these will be ignored - for i in 0..10 { - insert_running_invalidation(&mut conn, DISTRIBUTION_ID_WEB, &format!("some_id_{i}")) - .await?; - cdn.insert_completed_invalidation( - DISTRIBUTION_ID_WEB, - &format!("some_id_{i}"), - &["/*"], - ); - } - - // insert the CDN representation of the already running invalidation - insert_running_invalidation( - &mut conn, - DISTRIBUTION_ID_WEB, - &already_running_invalidation.invalidation_id, - ) - .await?; - - // queue an invalidation - let metrics = otel_metrics(&env); - queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) - .await?; - - let metrics = otel_metrics(&env); - - // handle the queued invalidations - handle_queued_invalidation_requests( - env.config(), - env.cdn(), - &metrics, - &mut conn, - DISTRIBUTION_ID_WEB, - ) - .await?; - - // only one path was added to the CDN - let q = queued_or_active_crate_invalidations(&mut conn).await?; - assert_eq!( - q.iter() - .filter_map(|i| i.cdn_reference.as_ref()) - .filter(|&reference| reference != &already_running_invalidation.invalidation_id) - .count(), - 1 - ); - - // old invalidation is still active, new one is added - let ir_web = active_invalidations(cdn, DISTRIBUTION_ID_WEB); - assert_eq!(ir_web.len(), 2); - assert_eq!(ir_web[0].path_patterns.len(), 12); - assert_eq!(ir_web[1].path_patterns.len(), 1); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn dont_create_invalidations_when_too_many_are_active() -> Result<()> { - let env = TestEnvironment::with_config(config_with_cdn().build()?).await?; - - let cdn = env.cdn(); - - // create an invalidation with 15 paths, so we're over the limit - let already_running_invalidation = cdn - .create_invalidation( - DISTRIBUTION_ID_WEB, - &(0..15).map(|_| "/something*").collect::>(), - ) - .await?; - - let mut conn = env.async_db().async_conn().await; - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - insert_running_invalidation( - &mut conn, - DISTRIBUTION_ID_WEB, - &already_running_invalidation.invalidation_id, - ) - .await?; - - // queue an invalidation - let metrics = otel_metrics(&env); - queue_crate_invalidation(&mut conn, env.config(), &metrics, &"krate".parse().unwrap()) - .await?; - - let metrics = otel_metrics(&env); - - // handle the queued invalidations - handle_queued_invalidation_requests( - env.config(), - env.cdn(), - &metrics, - &mut conn, - DISTRIBUTION_ID_WEB, - ) - .await?; - - // nothing was added to the CDN - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .iter() - .filter(|i| i.cdn_distribution_id == DISTRIBUTION_ID_WEB - && !matches!( - &i.cdn_reference, - Some(val) if val == &already_running_invalidation.invalidation_id - )) - .all(|i| i.cdn_reference.is_none()) - ); - - // old invalidations are still active - let ir_web = active_invalidations(cdn, DISTRIBUTION_ID_WEB); - assert_eq!(ir_web.len(), 1); - assert_eq!(ir_web[0].path_patterns.len(), 15); - - // clear the active invalidations in the CDN to _fake_ them - // being completed on the CDN side. - cdn.clear_active_invalidations(); - - // now handle again - handle_queued_invalidation_requests( - env.config(), - env.cdn(), - &metrics, - &mut conn, - DISTRIBUTION_ID_WEB, - ) - .await?; - - // which adds the CDN reference - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .iter() - .filter(|i| i.cdn_distribution_id == DISTRIBUTION_ID_WEB) - .all(|i| i.cdn_reference.is_some()) - ); - - // and creates them in the CDN too - let ir_web = active_invalidations(cdn, DISTRIBUTION_ID_WEB); - assert_eq!(ir_web.len(), 1); - assert_eq!(ir_web[0].path_patterns, vec!["/krate*", "/crate/krate*"]); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn dont_create_invalidations_without_paths() -> Result<()> { - let env = TestEnvironment::with_config(config_with_cdn().build()?).await?; - - let cdn = env.cdn(); - let metrics = otel_metrics(&env); - - let mut conn = env.async_db().async_conn().await; - // no invalidation is queued - assert!( - queued_or_active_crate_invalidations(&mut conn) - .await? - .is_empty() - ); - - // run the handler - handle_queued_invalidation_requests( - env.config(), - env.cdn(), - &metrics, - &mut conn, - DISTRIBUTION_ID_WEB, - ) - .await?; - - // no invalidation was created - assert!(active_invalidations(cdn, DISTRIBUTION_ID_WEB).is_empty()); - - Ok(()) - } - - async fn get_mock_config(http_client: StaticReplayClient) -> aws_sdk_cloudfront::Config { - let cfg = aws_config::defaults(BehaviorVersion::latest()) - .region(Region::new("eu-central-1")) - .credentials_provider(Credentials::new( - "accesskey", - "privatekey", - None, - None, - "dummy", - )) - .http_client(http_client) - .load() - .await; - - Config::new(&cfg) - } - - #[tokio::test(flavor = "multi_thread")] - async fn invalidate_path() { - let conn = StaticReplayClient::new(vec![ReplayEvent::new( - http::Request::builder() - .header("content-type", "application/xml") - .uri(http::uri::Uri::from_static( - "https://cloudfront.amazonaws.com/2020-05-31/distribution/some_distribution/invalidation", - )) - .body(SdkBody::from( - r#"2/some/path*/another/path/*some_reference"#, - )) - .unwrap(), - http::Response::builder() - .status(200) - .body(SdkBody::from( - r#" - - 2019-12-05T18:40:49.413Z - I2J0I21PCUYOIK - - some_reference - - - /some/path* - /another/path/* - - 2 - - - InProgress - - "#, - )) - .unwrap(), - )]); - let client = Client::from_conf(get_mock_config(conn.clone()).await); - - CdnBackend::create_cloudfront_invalidation( - &client, - "some_distribution", - "some_reference", - &["/some/path*", "/another/path/*"], - ) - .await - .expect("error creating invalidation"); - - assert_eq!(conn.actual_requests().count(), 1); - conn.assert_requests_match(&[]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn get_invalidation_info_doesnt_exist() { - let conn = StaticReplayClient::new(vec![ReplayEvent::new( - http::Request::builder() - .header("content-type", "application/xml") - .uri(http::uri::Uri::from_static( - "https://cloudfront.amazonaws.com/2020-05-31/distribution/some_distribution/invalidation/some_reference" - )) - .body(SdkBody::empty()) - .unwrap(), - http::Response::builder() - .status(404) - .body(SdkBody::empty()) - .unwrap(), - )]); - let client = Client::from_conf(get_mock_config(conn.clone()).await); - - assert!( - CdnBackend::get_cloudfront_invalidation_status( - &client, - "some_distribution", - "some_reference", - ) - .await - .expect("error getting invalidation") - .is_none() - ); - } - - #[tokio::test(flavor = "multi_thread")] - async fn get_invalidation_info_completed() { - let conn = StaticReplayClient::new(vec![ReplayEvent::new( - http::Request::builder() - .header("content-type", "application/xml") - .uri(http::uri::Uri::from_static( - "https://cloudfront.amazonaws.com/2020-05-31/distribution/some_distribution/invalidation/some_reference" - )) - .body(SdkBody::empty()) - .unwrap(), - http::Response::builder() - .status(200) - .body(SdkBody::from( - r#" - some_reference - Completed - 2023-04-09T18:09:50.346Z - - - 1 - /* - - 03a63d75-21e7-46ba-858d-8999466e633f - - "# - )).unwrap(), - )]); - let client = Client::from_conf(get_mock_config(conn.clone()).await); - - assert_eq!( - CdnBackend::get_cloudfront_invalidation_status( - &client, - "some_distribution", - "some_reference", - ) - .await - .expect("error getting invalidation"), - Some(CdnInvalidation { - distribution_id: "some_distribution".into(), - invalidation_id: "some_reference".into(), - path_patterns: ["/*".into()].to_vec(), - completed: true - }) - ); - } -} diff --git a/src/cdn/fastly.rs b/src/cdn/fastly.rs index 0e0f10e8b..f78cde3b0 100644 --- a/src/cdn/fastly.rs +++ b/src/cdn/fastly.rs @@ -106,11 +106,7 @@ where None } }) { - for sid in config - .fastly_service_sid_web - .iter() - .chain(config.fastly_service_sid_static.iter()) - { + if let Some(ref sid) = config.fastly_service_sid { // NOTE: we start with just calling the API, and logging an error if they happen. // We can then see if we need retries or escalation to full purges. @@ -208,7 +204,7 @@ mod tests { let config = TestEnvironment::base_config() .fastly_api_host(fastly_api.url().parse().unwrap()) .fastly_api_token(Some("test-token".into())) - .fastly_service_sid_web(Some("test-sid-1".into())) + .fastly_service_sid(Some("test-sid-1".into())) .build()?; let m = fastly_api @@ -237,52 +233,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_purge_both() -> Result<()> { - let mut fastly_api = mockito::Server::new_async().await; - - let config = TestEnvironment::base_config() - .fastly_api_host(fastly_api.url().parse().unwrap()) - .fastly_api_token(Some("test-token".into())) - .fastly_service_sid_web(Some("test-sid-1".into())) - .fastly_service_sid_static(Some("test-sid-2".into())) - .build()?; - - let m1 = fastly_api - .mock("POST", "/service/test-sid-1/purge") - .match_header(FASTLY_KEY, "test-token") - .match_header(&SURROGATE_KEY, "crate-foo crate-bar") - .with_status(200) - .create_async() - .await; - - let m2 = fastly_api - .mock("POST", "/service/test-sid-2/purge") - .match_header(FASTLY_KEY, "test-token") - .match_header(&SURROGATE_KEY, "crate-foo crate-bar") - .with_status(200) - .create_async() - .await; - - let (_exporter, meter_provider) = setup_test_meter_provider(); - let metrics = CdnMetrics::new(&meter_provider); - - purge_surrogate_keys( - &config, - &metrics, - vec![ - SurrogateKey::from_str("crate-foo").unwrap(), - SurrogateKey::from_str("crate-bar").unwrap(), - ], - ) - .await?; - - m1.assert_async().await; - m2.assert_async().await; - - Ok(()) - } - #[tokio::test] async fn test_purge_err_doesnt_err() -> Result<()> { let mut fastly_api = mockito::Server::new_async().await; @@ -290,7 +240,7 @@ mod tests { let config = TestEnvironment::base_config() .fastly_api_host(fastly_api.url().parse().unwrap()) .fastly_api_token(Some("test-token".into())) - .fastly_service_sid_web(Some("test-sid-1".into())) + .fastly_service_sid(Some("test-sid-1".into())) .build()?; let m = fastly_api @@ -329,7 +279,7 @@ mod tests { let config = TestEnvironment::base_config() .fastly_api_host(fastly_api.url().parse().unwrap()) .fastly_api_token(Some("test-token".into())) - .fastly_service_sid_web(Some("test-sid-1".into())) + .fastly_service_sid(Some("test-sid-1".into())) .build()?; let m = fastly_api diff --git a/src/cdn/mod.rs b/src/cdn/mod.rs index 1ad7019dd..d6332c186 100644 --- a/src/cdn/mod.rs +++ b/src/cdn/mod.rs @@ -1,20 +1,15 @@ use crate::{ - Config, - db::types::krate_name::KrateName, - metrics::{CDN_INVALIDATION_HISTOGRAM_BUCKETS, otel::AnyMeterProvider}, + Config, db::types::krate_name::KrateName, metrics::otel::AnyMeterProvider, web::headers::SurrogateKey, }; -use anyhow::{Context, Result}; -use opentelemetry::metrics::{Counter, Gauge, Histogram}; -use tracing::{debug, error, info, instrument}; +use anyhow::Result; +use opentelemetry::metrics::{Counter, Gauge}; +use tracing::{error, info, instrument}; -pub(crate) mod cloudfront; pub(crate) mod fastly; #[derive(Debug)] pub struct CdnMetrics { - invalidation_time: Histogram, - queue_time: Histogram, fastly_batch_purges_with_surrogate: Counter, fastly_batch_purge_errors: Counter, fastly_purge_surrogate_keys: Counter, @@ -27,16 +22,6 @@ impl CdnMetrics { let meter = meter_provider.meter("cdn"); const PREFIX: &str = "docsrs.cdn"; Self { - invalidation_time: meter - .f64_histogram(format!("{PREFIX}.invalidation_time")) - .with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec()) - .with_unit("s") - .build(), - queue_time: meter - .f64_histogram(format!("{PREFIX}.queue_time")) - .with_boundaries(CDN_INVALIDATION_HISTOGRAM_BUCKETS.to_vec()) - .with_unit("s") - .build(), fastly_batch_purges_with_surrogate: meter .u64_counter(format!("{PREFIX}.fastly_batch_purges_with_surrogate")) .with_unit("1") @@ -61,9 +46,8 @@ impl CdnMetrics { } } -#[instrument(skip(conn, config))] +#[instrument(skip(config))] pub(crate) async fn queue_crate_invalidation( - conn: &mut sqlx::PgConnection, config: &Config, metrics: &CdnMetrics, krate_name: &KrateName, @@ -86,49 +70,5 @@ pub(crate) async fn queue_crate_invalidation( error!(%krate_name, ?err, "error purging Fastly surrogate keys"); } - /// cloudfront needs a queue to work around a concurrency limit of just 15 parallel - /// wildcard invalidations. - async fn add( - conn: &mut sqlx::PgConnection, - name: &str, - distribution_id: &str, - path_patterns: &[&str], - ) -> Result<()> { - for pattern in path_patterns { - debug!(distribution_id, pattern, "enqueueing web CDN invalidation"); - sqlx::query!( - "INSERT INTO cdn_invalidation_queue (crate, cdn_distribution_id, path_pattern) - VALUES ($1, $2, $3)", - name, - distribution_id, - pattern - ) - .execute(&mut *conn) - .await?; - } - Ok(()) - } - - if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { - add( - conn, - krate_name, - distribution_id, - &[&format!("/{krate_name}*"), &format!("/crate/{krate_name}*")], - ) - .await - .context("error enqueueing web CDN invalidation")?; - } - if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { - add( - conn, - krate_name, - distribution_id, - &[&format!("/rustdoc/{krate_name}*")], - ) - .await - .context("error enqueueing static CDN invalidation")?; - } - Ok(()) } diff --git a/src/config.rs b/src/config.rs index 7781305e9..da4705955 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use crate::{cdn::cloudfront::CdnKind, storage::StorageKind}; +use crate::storage::StorageKind; use anyhow::{Context, Result, anyhow, bail}; use std::{ env::VarError, @@ -104,22 +104,6 @@ pub struct Config { // This only affects pages that depend on invalidations to work. pub(crate) cache_invalidatable_responses: bool, - /// CDN backend to use for invalidations. - /// Only needed for cloudfront. - pub(crate) cdn_backend: CdnKind, - - /// The maximum age of a queued invalidation request before it is - /// considered too old and we fall back to a full purge of the - /// distributions. - pub(crate) cdn_max_queued_age: Duration, - - // CloudFront distribution ID for the web server. - // Will be used for invalidation-requests. - pub cloudfront_distribution_id_web: Option, - - /// same for the `static.docs.rs` distribution - pub cloudfront_distribution_id_static: Option, - /// Fastly API host, typically only overwritten for testing pub fastly_api_host: Url, @@ -127,10 +111,7 @@ pub struct Config { pub fastly_api_token: Option, /// fastly service SID for the main domain - pub fastly_service_sid_web: Option, - - /// same for the `static.docs.rs` distribution - pub fastly_service_sid_static: Option, + pub fastly_service_sid: Option, pub(crate) build_workspace_reinitialization_interval: Duration, @@ -223,17 +204,12 @@ impl Config { "CACHE_CONTROL_STALE_WHILE_REVALIDATE", )?) .cache_invalidatable_responses(env("DOCSRS_CACHE_INVALIDATEABLE_RESPONSES", true)?) - .cdn_backend(env("DOCSRS_CDN_BACKEND", CdnKind::Dummy)?) - .cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?)) - .cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?) - .cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?) .fastly_api_host(env( "DOCSRS_FASTLY_API_HOST", "https://api.fastly.com".parse().unwrap(), )?) .fastly_api_token(maybe_env("DOCSRS_FASTLY_API_TOKEN")?) - .fastly_service_sid_web(maybe_env("DOCSRS_FASTLY_SERVICE_SID_WEB")?) - .fastly_service_sid_static(maybe_env("DOCSRS_FASTLY_SERVICE_SID_STATIC")?) + .fastly_service_sid(maybe_env("DOCSRS_FASTLY_SERVICE_SID_WEB")?) .local_archive_cache_path(ensure_absolute_path(env( "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", prefix.join("archive_cache"), diff --git a/src/context.rs b/src/context.rs index b968ac7b6..a0dec9fb1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,6 +1,6 @@ use crate::{ AsyncBuildQueue, AsyncStorage, BuildQueue, Config, RegistryApi, Storage, - cdn::{CdnMetrics, cloudfront::CdnBackend}, + cdn::CdnMetrics, db::Pool, metrics::otel::{AnyMeterProvider, get_meter_provider}, repositories::RepositoryStatsUpdater, @@ -15,7 +15,6 @@ pub struct Context { pub build_queue: Arc, pub storage: Arc, pub async_storage: Arc, - pub cdn: Arc, pub cdn_metrics: Arc, pub pool: Pool, pub registry_api: Arc, @@ -56,7 +55,6 @@ impl Context { Arc::new(AsyncStorage::new(pool.clone(), config.clone(), &meter_provider).await?); let cdn_metrics = Arc::new(CdnMetrics::new(&meter_provider)); - let cdn = Arc::new(CdnBackend::new(&config).await); let async_build_queue = Arc::new(AsyncBuildQueue::new( pool.clone(), config.clone(), @@ -76,7 +74,6 @@ impl Context { build_queue, storage, async_storage, - cdn, cdn_metrics, pool: pool.clone(), registry_api: Arc::new(RegistryApi::new( diff --git a/src/metrics/service.rs b/src/metrics/service.rs index 9203593bb..27e6b4260 100644 --- a/src/metrics/service.rs +++ b/src/metrics/service.rs @@ -1,4 +1,4 @@ -use crate::{AsyncBuildQueue, Config, cdn, metrics::otel::AnyMeterProvider}; +use crate::{AsyncBuildQueue, metrics::otel::AnyMeterProvider}; use anyhow::{Error, Result}; use opentelemetry::{KeyValue, metrics::Gauge}; use std::collections::HashSet; @@ -10,7 +10,6 @@ pub struct OtelServiceMetrics { pub failed_crates_count: Gauge, pub queue_is_locked: Gauge, pub queued_crates_count_by_priority: Gauge, - pub queued_cdn_invalidations_by_distribution: Gauge, } impl OtelServiceMetrics { @@ -38,19 +37,10 @@ impl OtelServiceMetrics { .u64_gauge(format!("{PREFIX}.queued_crates_count_by_priority")) .with_unit("1") .build(), - queued_cdn_invalidations_by_distribution: meter - .u64_gauge(format!("{PREFIX}.queued_cdn_invalidations_by_distribution")) - .with_unit("1") - .build(), } } - pub(crate) async fn gather( - &self, - conn: &mut sqlx::PgConnection, - queue: &AsyncBuildQueue, - config: &Config, - ) -> Result<(), Error> { + pub(crate) async fn gather(&self, queue: &AsyncBuildQueue) -> Result<(), Error> { self.queue_is_locked .record(queue.is_locked().await? as u64, &[]); self.queued_crates_count @@ -83,18 +73,6 @@ impl OtelServiceMetrics { ); } - for (distribution_id, count) in - cdn::cloudfront::queued_or_active_crate_invalidation_count_by_distribution( - &mut *conn, config, - ) - .await? - { - self.queued_cdn_invalidations_by_distribution.record( - count as u64, - &[KeyValue::new("distribution", distribution_id)], - ); - } - self.failed_crates_count .record(queue.failed_count().await? as u64, &[]); diff --git a/src/test/mod.rs b/src/test/mod.rs index 257c3de0b..5b2130daa 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -8,7 +8,6 @@ pub(crate) use self::{ }; use crate::{ AsyncBuildQueue, BuildQueue, Config, Context, - cdn::{CdnMetrics, cloudfront::CdnBackend}, config::ConfigBuilder, db::{self, AsyncPoolClient, Pool, types::version::Version}, error::Result, @@ -17,7 +16,7 @@ use crate::{ test::test_metrics::CollectedMetrics, web::{ build_axum_app, - cache::{self, TargetCdn, X_RLNG_SOURCE_CDN}, + cache::{self}, headers::{IfNoneMatch, SURROGATE_CONTROL}, page::TemplateData, }, @@ -107,13 +106,7 @@ impl AxumResponseTestExt for axum::response::Response { assert!(config.cache_control_stale_while_revalidate.is_some()); // This method is only about asserting if the handler did set the right _policy_. - // - // But we only test for CloudFront here. - // The different policies are unique enough so we would have a test failure when - // we emit the wrong cache policy in a handler. - // - // The fastly specifics are tested in web::cache unittests. - assert_cache_headers_eq(self, &cache_policy.render(config, TargetCdn::Fastly)); + assert_cache_headers_eq(self, &cache_policy.render(config)); } fn error_for_status(self) -> Result @@ -225,7 +218,6 @@ impl AxumRouterTestExt for axum::Router { { let cached_response = self .get_with_headers(initial_path, |headers| { - headers.insert(X_RLNG_SOURCE_CDN, HeaderValue::from_static("fastly")); headers.typed_insert(if_none_match); }) .await?; @@ -284,13 +276,7 @@ impl AxumRouterTestExt for axum::Router { async fn get(&self, path: &str) -> Result { Ok(self .clone() - .oneshot( - Request::builder() - .uri(path) - .header(X_RLNG_SOURCE_CDN, "fastly") - .body(Body::empty()) - .unwrap(), - ) + .oneshot(Request::builder().uri(path).body(Body::empty()).unwrap()) .await?) } @@ -518,18 +504,10 @@ impl TestEnvironment { &self.context.build_queue } - pub(crate) fn cdn(&self) -> &CdnBackend { - &self.context.cdn - } - pub(crate) fn config(&self) -> &Config { &self.context.config } - pub(crate) fn cdn_metrics(&self) -> &CdnMetrics { - &self.context.cdn_metrics - } - pub(crate) fn async_storage(&self) -> &AsyncStorage { &self.context.async_storage } @@ -538,10 +516,6 @@ impl TestEnvironment { &self.context.storage } - pub(crate) fn meter_provider(&self) -> &AnyMeterProvider { - &self.context.meter_provider - } - pub(crate) fn runtime(&self) -> &runtime::Handle { &self.context.runtime } diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 05b61fcd0..1cefffcce 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -3,7 +3,7 @@ //! This daemon will start web server, track new packages and build them use crate::{ - AsyncBuildQueue, Config, Context, Index, RustwideBuilder, cdn, + AsyncBuildQueue, Config, Context, Index, RustwideBuilder, metrics::service::OtelServiceMetrics, queue_rebuilds, utils::{queue_builder, report_error}, @@ -114,8 +114,6 @@ pub fn start_background_queue_rebuild(context: &Context) -> Result<(), Error> { pub fn start_background_service_metric_collector(context: &Context) -> Result<(), Error> { let runtime = context.runtime.clone(); - let pool = context.pool.clone(); - let config = context.config.clone(); let build_queue = context.async_build_queue.clone(); let service_metrics = Arc::new(OtelServiceMetrics::new(&context.meter_provider)); @@ -126,76 +124,11 @@ pub fn start_background_service_metric_collector(context: &Context) -> Result<() // for these service metrics. Duration::from_secs(30), move || { - let pool = pool.clone(); let build_queue = build_queue.clone(); - let config = config.clone(); let service_metrics = service_metrics.clone(); async move { trace!("collecting service metrics"); - let mut conn = pool.get_async().await?; - service_metrics - .gather(&mut conn, &build_queue, &config) - .await - } - }, - ); - Ok(()) -} - -pub fn start_background_cdn_invalidator(context: &Context) -> Result<(), Error> { - let config = context.config.clone(); - let pool = context.pool.clone(); - let runtime = context.runtime.clone(); - let cdn = context.cdn.clone(); - - let otel_metrics = context.cdn_metrics.clone(); - - if config.cloudfront_distribution_id_web.is_none() - && config.cloudfront_distribution_id_static.is_none() - { - info!("no cloudfront distribution IDs found, skipping background cdn invalidation"); - return Ok(()); - } - - if !config.cache_invalidatable_responses { - info!("full page cache disabled, skipping background cdn invalidation"); - return Ok(()); - } - - async_cron( - &runtime, - "cdn invalidator", - Duration::from_secs(60), - move || { - let pool = pool.clone(); - let config = config.clone(); - let cdn = cdn.clone(); - let otel_metrics = otel_metrics.clone(); - async move { - let mut conn = pool.get_async().await?; - if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { - cdn::cloudfront::handle_queued_invalidation_requests( - &config, - &cdn, - &otel_metrics, - &mut conn, - distribution_id, - ) - .await - .context("error handling queued invalidations for web CDN invalidation")?; - } - if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { - cdn::cloudfront::handle_queued_invalidation_requests( - &config, - &cdn, - &otel_metrics, - &mut conn, - distribution_id, - ) - .await - .context("error handling queued invalidations for static CDN invalidation")?; - } - Ok(()) + service_metrics.gather(&build_queue).await } }, ); @@ -229,7 +162,6 @@ pub fn start_daemon(context: Context, enable_registry_watcher: bool) -> Result<( .unwrap(); start_background_repository_stats_updater(&context)?; - start_background_cdn_invalidator(&context)?; start_background_queue_rebuild(&context)?; // when people run the daemon, we assume the daemon is the one single process where diff --git a/src/web/cache.rs b/src/web/cache.rs index 1f8cbb52d..6900419b5 100644 --- a/src/web/cache.rs +++ b/src/web/cache.rs @@ -8,22 +8,19 @@ use crate::{ }; use axum::{ Extension, - extract::{FromRequestParts, MatchedPath, Request as AxumHttpRequest}, + extract::{MatchedPath, Request as AxumHttpRequest}, middleware::Next, response::Response as AxumResponse, }; use axum_extra::headers::HeaderMapExt as _; use http::{ - HeaderMap, HeaderName, HeaderValue, StatusCode, + HeaderMap, HeaderValue, StatusCode, header::{CACHE_CONTROL, ETAG}, - request::Parts, }; use serde::Deserialize; -use std::{convert::Infallible, sync::Arc}; +use std::sync::Arc; use tracing::error; -pub const X_RLNG_SOURCE_CDN: HeaderName = HeaderName::from_static("x-rlng-source-cdn"); - #[derive(Debug, Clone, PartialEq)] pub struct ResponseCacheHeaders { pub cache_control: Option, @@ -86,24 +83,6 @@ pub static FOREVER_IN_FASTLY_CDN: ResponseCacheHeaders = ResponseCacheHeaders { needs_cdn_invalidation: true, }; -pub static FOREVER_IN_CLOUDFRONT_CDN: ResponseCacheHeaders = ResponseCacheHeaders { - // A missing `max-age` or `s-maxage` in the Cache-Control header will lead to - // CloudFront using the default TTL, while the browser not seeing any caching header. - // - // Default TTL is set here: - // https://github.com/rust-lang/simpleinfra/blob/becf4532a10a7a218aedb34d4648ecb73e61f5fd/terraform/docs-rs/cloudfront.tf#L24 - // - // This means we can have the CDN caching the documentation while just - // issuing a purge after a build. - // https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/Expiration.html#ExpirationDownloadDist - // - // There might be edge cases where browsers add caching based on arbitraty heuristics - // when `Cache-Control` is missing. - cache_control: None, - surrogate_control: None, - needs_cdn_invalidation: true, -}; - /// cache forever in browser & CDN. /// Only usable for content with unique filenames. /// @@ -117,28 +96,6 @@ pub static FOREVER_IN_CDN_AND_BROWSER: ResponseCacheHeaders = ResponseCacheHeade needs_cdn_invalidation: false, }; -#[derive(Debug, Copy, Clone, PartialEq)] -#[cfg_attr(test, derive(strum::EnumIter))] -pub enum TargetCdn { - Fastly, - CloudFront, -} - -impl FromRequestParts for TargetCdn -where - S: Send + Sync, -{ - type Rejection = Infallible; - - async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { - if parts.headers.contains_key(X_RLNG_SOURCE_CDN) { - Ok(TargetCdn::Fastly) - } else { - Ok(TargetCdn::CloudFront) - } - } -} - /// defines the wanted caching behaviour for a web response. #[derive(Debug, Clone)] #[cfg_attr(test, derive(strum::EnumIter))] @@ -175,7 +132,7 @@ pub enum CachePolicy { } impl CachePolicy { - pub fn render(&self, config: &Config, target_cdn: TargetCdn) -> ResponseCacheHeaders { + pub fn render(&self, config: &Config) -> ResponseCacheHeaders { match *self { CachePolicy::NoCaching => NO_CACHING.clone(), CachePolicy::NoStoreMustRevalidate => NO_STORE_MUST_REVALIDATE.clone(), @@ -183,17 +140,14 @@ impl CachePolicy { CachePolicy::ForeverInCdnAndBrowser => FOREVER_IN_CDN_AND_BROWSER.clone(), CachePolicy::ForeverInCdn => { if config.cache_invalidatable_responses { - match target_cdn { - TargetCdn::Fastly => FOREVER_IN_FASTLY_CDN.clone(), - TargetCdn::CloudFront => FOREVER_IN_CLOUDFRONT_CDN.clone(), - } + FOREVER_IN_FASTLY_CDN.clone() } else { NO_CACHING.clone() } } CachePolicy::ForeverInCdnAndStaleInBrowser => { // when caching invalidatable responses is disabled, this results in NO_CACHING - let mut forever_in_cdn = CachePolicy::ForeverInCdn.render(config, target_cdn); + let mut forever_in_cdn = CachePolicy::ForeverInCdn.render(config); if config.cache_invalidatable_responses && let Some(cache_control) = @@ -224,7 +178,6 @@ pub(crate) async fn cache_middleware( Path(param): Path, matched_route: Option, Extension(config): Extension>, - target_cdn: TargetCdn, req: AxumHttpRequest, next: Next, ) -> AxumResponse { @@ -255,20 +208,10 @@ pub(crate) async fn cache_middleware( .extensions() .get::() .unwrap_or(&CachePolicy::NoCaching); - let cache_headers = cache_policy.render(&config, target_cdn); + let cache_headers = cache_policy.render(&config); let resp_status = response.status(); let resp_headers = response.headers_mut(); - // early return for CloudFront, as it doesn't support the `Surrogate-Control` header, - // but also doesn't need surrogate keys. - // While that sounds nice, CloudFront invalidations with path prefixes are suuper slow, - // and have a concurrency limit. - if let TargetCdn::CloudFront = target_cdn { - debug_assert!(cache_headers.surrogate_control.is_none()); - cache_headers.set_on_response(resp_headers); - return response; - } - // simple implementation first: // This is for content we need to invalidate in the CDN level. // We don't care about content that is filename-hashed and can be cached @@ -372,7 +315,6 @@ mod tests { use anyhow::{Context as _, Result}; use axum::{Router, body::Body, http::Request, routing::get}; use axum_extra::headers::CacheControl; - use itertools::Itertools as _; use strum::IntoEnumIterator as _; use test_case::{test_case, test_matrix}; use tower::{ServiceBuilder, ServiceExt as _}; @@ -399,7 +341,6 @@ mod tests { FOREVER_IN_FASTLY_CDN.cache_control, NO_CACHING.cache_control ); - assert!(FOREVER_IN_CLOUDFRONT_CDN.cache_control.is_none()); } #[test_matrix( @@ -415,14 +356,14 @@ mod tests { .cache_control_stale_while_revalidate(stale_while_revalidate) .build()?; - for (policy, target_cdn) in CachePolicy::iter().cartesian_product(TargetCdn::iter()) { - let headers = policy.render(&config, target_cdn); + for policy in CachePolicy::iter() { + let headers = policy.render(&config); if let Some(cache_control) = headers.cache_control { validate_cache_control(&cache_control).with_context(|| { format!( - "couldn't validate Cache-Control header syntax for policy {:?}, CDN: {:?}", - policy, target_cdn, + "couldn't validate Cache-Control header syntax for policy {:?}", + policy ) })?; } @@ -430,9 +371,8 @@ mod tests { if let Some(surrogate_control) = headers.surrogate_control { validate_cache_control(&surrogate_control).with_context(|| { format!( - "couldn't validate Surrogate-Control header syntax for policy {:?}, CDN: {:?}", - policy, - target_cdn, + "couldn't validate Surrogate-Control header syntax for policy {:?}", + policy ) })?; } @@ -440,89 +380,6 @@ mod tests { Ok(()) } - #[test_case(CachePolicy::NoCaching, Some("max-age=0"))] - #[test_case( - CachePolicy::NoStoreMustRevalidate, - Some("no-cache, no-store, must-revalidate, max-age=0") - )] - #[test_case( - CachePolicy::ForeverInCdnAndBrowser, - Some("public, max-age=31104000, immutable") - )] - #[test_case(CachePolicy::ForeverInCdn, None)] - #[test_case( - CachePolicy::ForeverInCdnAndStaleInBrowser, - Some("stale-while-revalidate=86400") - )] - fn render(cache: CachePolicy, cache_control: Option<&str>) -> Result<()> { - let config = TestEnvironment::base_config().build()?; - let headers = cache.render(&config, TargetCdn::CloudFront); - - assert_eq!( - headers.cache_control, - cache_control.map(|s| HeaderValue::from_str(s).unwrap()) - ); - - assert!(headers.surrogate_control.is_none()); - - Ok(()) - } - - #[test] - fn render_stale_without_config() -> Result<()> { - let config = TestEnvironment::base_config() - .cache_control_stale_while_revalidate(None) - .build()?; - - let headers = - CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::CloudFront); - assert!(headers.cache_control.is_none()); - assert!(headers.surrogate_control.is_none()); - - Ok(()) - } - - #[test] - fn render_stale_with_config() -> Result<()> { - let config = TestEnvironment::base_config() - .cache_control_stale_while_revalidate(Some(666)) - .build()?; - - let headers = - CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::CloudFront); - assert_eq!(headers.cache_control.unwrap(), "stale-while-revalidate=666"); - assert!(headers.surrogate_control.is_none()); - - Ok(()) - } - - #[test] - fn render_forever_in_cdn_disabled() -> Result<()> { - let config = TestEnvironment::base_config() - .cache_invalidatable_responses(false) - .build()?; - - let headers = CachePolicy::ForeverInCdn.render(&config, TargetCdn::CloudFront); - assert_eq!(headers.cache_control.unwrap(), "max-age=0"); - assert!(headers.surrogate_control.is_none()); - - Ok(()) - } - - #[test] - fn render_forever_in_cdn_or_stale_disabled() -> Result<()> { - let config = TestEnvironment::base_config() - .cache_invalidatable_responses(false) - .build()?; - - let headers = - CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::CloudFront); - assert_eq!(headers.cache_control.unwrap(), "max-age=0"); - assert!(headers.surrogate_control.is_none()); - - Ok(()) - } - #[test_case(CachePolicy::NoCaching, Some("max-age=0"), None)] #[test_case( CachePolicy::NoStoreMustRevalidate, @@ -546,7 +403,7 @@ mod tests { surrogate_control: Option<&str>, ) -> Result<()> { let config = TestEnvironment::base_config().build()?; - let headers = cache.render(&config, TargetCdn::Fastly); + let headers = cache.render(&config); assert_eq!( headers.cache_control, @@ -567,7 +424,7 @@ mod tests { .cache_control_stale_while_revalidate(None) .build()?; - let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::Fastly); + let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config); assert_eq!(headers, FOREVER_IN_FASTLY_CDN); Ok(()) @@ -579,7 +436,7 @@ mod tests { .cache_control_stale_while_revalidate(Some(666)) .build()?; - let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::Fastly); + let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config); assert_eq!(headers.cache_control.unwrap(), "stale-while-revalidate=666"); assert_eq!( headers.surrogate_control, @@ -595,7 +452,7 @@ mod tests { .cache_invalidatable_responses(false) .build()?; - let headers = CachePolicy::ForeverInCdn.render(&config, TargetCdn::Fastly); + let headers = CachePolicy::ForeverInCdn.render(&config); assert_eq!(headers.cache_control.unwrap(), "max-age=0"); assert!(headers.surrogate_control.is_none()); @@ -608,20 +465,15 @@ mod tests { .cache_invalidatable_responses(false) .build()?; - let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config, TargetCdn::Fastly); + let headers = CachePolicy::ForeverInCdnAndStaleInBrowser.render(&config); assert_eq!(headers.cache_control.unwrap(), "max-age=0"); assert!(headers.surrogate_control.is_none()); Ok(()) } - #[test_case(TargetCdn::Fastly, &FOREVER_IN_FASTLY_CDN)] - #[test_case(TargetCdn::CloudFront, &FOREVER_IN_CLOUDFRONT_CDN)] #[tokio::test] - async fn test_middleware_reacts_to_fastly_header_in_crate_route( - expected_target_cdn: TargetCdn, - expected_headers: &ResponseCacheHeaders, - ) -> Result<()> { + async fn test_middleware_reacts_to_fastly_header_in_crate_route() -> Result<()> { let config = TestEnvironment::base_config() .cache_invalidatable_responses(true) .build()?; @@ -629,15 +481,7 @@ mod tests { let app = Router::new() .route( "/{name}", - get(move |target_cdn: TargetCdn| async move { - assert_eq!(target_cdn, expected_target_cdn); - - ( - // this cache policy leads to the same result in both CDNs - Extension(CachePolicy::ForeverInCdn), - "Hello, World!", - ) - }), + get(move || async move { (Extension(CachePolicy::ForeverInCdn), "Hello, World!") }), ) .layer( ServiceBuilder::new() @@ -645,11 +489,7 @@ mod tests { .layer(axum::middleware::from_fn(cache_middleware)), ); - let mut builder = Request::builder().uri("/krate"); - - if let TargetCdn::Fastly = expected_target_cdn { - builder = builder.header(X_RLNG_SOURCE_CDN, "some_value"); - } + let builder = Request::builder().uri("/krate"); let response = app .clone() @@ -661,27 +501,20 @@ mod tests { "{}", response.text().await.unwrap(), ); - assert_cache_headers_eq(&response, expected_headers); + assert_cache_headers_eq(&response, &FOREVER_IN_FASTLY_CDN); Ok(()) } - #[test_case(TargetCdn::Fastly)] - #[test_case(TargetCdn::CloudFront)] #[tokio::test] - async fn test_middleware_reacts_to_fastly_header_in_other_route( - expected_target_cdn: TargetCdn, - ) -> Result<()> { + async fn test_middleware_reacts_to_fastly_header_in_other_route() -> Result<()> { let config = TestEnvironment::base_config().build()?; let app = Router::new() .route( "/", - get(move |target_cdn: TargetCdn| async move { - assert_eq!(target_cdn, expected_target_cdn); - + get(move || async move { ( - // this cache policy leads to the same result in both CDNs Extension(CachePolicy::ForeverInCdnAndBrowser), "Hello, World!", ) @@ -693,11 +526,7 @@ mod tests { .layer(axum::middleware::from_fn(cache_middleware)), ); - let mut builder = Request::builder().uri("/"); - - if let TargetCdn::Fastly = expected_target_cdn { - builder = builder.header(X_RLNG_SOURCE_CDN, "some_value"); - } + let builder = Request::builder().uri("/"); let response = app .clone() diff --git a/src/web/releases.rs b/src/web/releases.rs index 109e0d7e1..735bf2492 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -5,7 +5,6 @@ use crate::build_queue::PRIORITY_CONTINUOUS; use crate::{ AsyncBuildQueue, Config, RegistryApi, build_queue::QueuedCrate, - cdn, db::types::version::Version, impl_axum_webpage, utils::report_error, @@ -32,7 +31,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use sqlx::Row; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap}, str, sync::Arc, }; @@ -713,7 +712,6 @@ struct BuildQueuePage { description: &'static str, queue: Vec, rebuild_queue: Vec, - active_cdn_deployments: Vec, in_progress_builds: Vec<(String, Version)>, expand_rebuild_queue: bool, } @@ -730,20 +728,6 @@ pub(crate) async fn build_queue_handler( mut conn: DbConnection, Query(params): Query, ) -> AxumResult { - let mut active_cdn_deployments: Vec<_> = - cdn::cloudfront::queued_or_active_crate_invalidations(&mut conn) - .await? - .into_iter() - .map(|i| i.krate) - .collect(); - - // deduplicate the list of crates while keeping their order - let mut set = HashSet::new(); - active_cdn_deployments.retain(|k| set.insert(k.clone())); - - // reverse the list, so the oldest comes first - active_cdn_deployments.reverse(); - let in_progress_builds: Vec<(String, Version)> = sqlx::query!( r#"SELECT crates.name, @@ -791,7 +775,6 @@ pub(crate) async fn build_queue_handler( description: "crate documentation scheduled to build & deploy", queue, rebuild_queue, - active_cdn_deployments, in_progress_builds, expand_rebuild_queue: params.expand.is_some(), }) @@ -813,6 +796,7 @@ mod tests { use mockito::Matcher; use reqwest::StatusCode; use serde_json::json; + use std::collections::HashSet; use test_case::test_case; #[test] @@ -1806,47 +1790,6 @@ mod tests { }) } - #[tokio::test(flavor = "multi_thread")] - async fn test_deployment_queue() -> Result<()> { - let env = TestEnvironment::with_config( - TestEnvironment::base_config() - .cloudfront_distribution_id_web(Some("distribution_id_web".into())) - .build()?, - ) - .await?; - - let web = env.web_app().await; - - let mut conn = env.async_db().async_conn().await; - cdn::queue_crate_invalidation( - &mut conn, - env.config(), - env.cdn_metrics(), - &"krate_2".parse().unwrap(), - ) - .await?; - - let content = kuchikiki::parse_html().one(web.get("/releases/queue").await?.text().await?); - assert!( - content - .select(".release > div > strong") - .expect("missing heading") - .any(|el| el.text_contents().contains("active CDN deployments")) - ); - - let items = content - .select(".queue-list > li") - .expect("missing list items") - .collect::>(); - - assert_eq!(items.len(), 1); - let a = items[0].as_node().select_first("a").expect("missing link"); - - assert!(a.text_contents().contains("krate_2")); - - Ok(()) - } - #[test] fn test_releases_queue() { async_wrapper(|env| async move { diff --git a/templates/releases/build_queue.html b/templates/releases/build_queue.html index 51f0f0123..1718d59ce 100644 --- a/templates/releases/build_queue.html +++ b/templates/releases/build_queue.html @@ -19,11 +19,6 @@
currently being built
-
- {%- if !active_cdn_deployments.is_empty() %} - active CDN deployments - {%- endif %} -
@@ -45,26 +40,6 @@
{%- endif %} -
- {%- if !active_cdn_deployments.is_empty() %} -
    - {% for krate in active_cdn_deployments -%} -
  1. - - {{ krate }} - -
  2. - {%- endfor %} -
-
-

- After the build finishes it may take up to 20 minutes for all documentation - pages to be up-to-date and available to everybody. -

-

Especially /latest/ URLs might be affected.

-
- {%- endif %} -