Skip to content

Commit eeef10c

Browse files
authored
feat(api): Add feature flag to control maximum pipelines (#486)
1 parent 1b25cda commit eeef10c

File tree

3 files changed

+41
-4
lines changed

3 files changed

+41
-4
lines changed

etl-api/src/feature_flags.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use actix_web::web::Data;
12
use tracing::info;
23

34
/// Initializes the ConfigCat client for feature flag evaluation in the API.
@@ -19,3 +20,23 @@ pub fn init_feature_flags(
1920
}
2021
}
2122
}
23+
24+
/// Returns the maximum number of pipelines allowed per tenant.
25+
///
26+
/// Checks the `maximumPipelinesPerTenant` feature flag and falls back to
27+
/// the default if the flag is not set or the client is unavailable.
28+
pub async fn get_max_pipelines_per_tenant(
29+
client: Option<&Data<configcat::Client>>,
30+
tenant_id: &str,
31+
default_value: i64,
32+
) -> i64 {
33+
match client {
34+
Some(client) => {
35+
let user = configcat::User::new(tenant_id);
36+
client
37+
.get_value("maximumPipelinesPerTenant", default_value, Some(user))
38+
.await
39+
}
40+
None => default_value,
41+
}
42+
}

etl-api/src/routes/destinations_pipelines.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::db::pipelines::{
2222
MAX_PIPELINES_PER_TENANT, PipelinesDbError, count_pipelines_for_tenant, read_pipeline,
2323
};
2424
use crate::db::sources::{SourcesDbError, source_exists};
25+
use crate::feature_flags::get_max_pipelines_per_tenant;
2526

2627
#[derive(Debug, Error)]
2728
enum DestinationPipelineError {
@@ -192,6 +193,7 @@ pub async fn create_destination_and_pipeline(
192193
pool: Data<PgPool>,
193194
destination_and_pipeline: Json<CreateDestinationPipelineRequest>,
194195
encryption_key: Data<EncryptionKey>,
196+
feature_flags_client: Option<Data<configcat::Client>>,
195197
) -> Result<impl Responder, DestinationPipelineError> {
196198
let tenant_id = extract_tenant_id(&req)?;
197199
let destination_and_pipeline = destination_and_pipeline.into_inner();
@@ -210,10 +212,16 @@ pub async fn create_destination_and_pipeline(
210212
));
211213
}
212214

215+
let max_pipelines = get_max_pipelines_per_tenant(
216+
feature_flags_client.as_ref(),
217+
tenant_id,
218+
MAX_PIPELINES_PER_TENANT,
219+
)
220+
.await;
213221
let pipeline_count = count_pipelines_for_tenant(txn.deref_mut(), tenant_id).await?;
214-
if pipeline_count >= MAX_PIPELINES_PER_TENANT {
222+
if pipeline_count >= max_pipelines {
215223
return Err(DestinationPipelineError::PipelineLimitReached {
216-
limit: MAX_PIPELINES_PER_TENANT,
224+
limit: max_pipelines,
217225
});
218226
}
219227

etl-api/src/routes/pipelines.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::db::images::ImagesDbError;
2222
use crate::db::pipelines::{MAX_PIPELINES_PER_TENANT, PipelinesDbError, read_pipeline_components};
2323
use crate::db::replicators::ReplicatorsDbError;
2424
use crate::db::sources::{SourcesDbError, source_exists};
25+
use crate::feature_flags::get_max_pipelines_per_tenant;
2526
use crate::k8s::core::{
2627
create_k8s_object_prefix, create_or_update_pipeline_resources_in_k8s,
2728
delete_pipeline_resources_in_k8s,
@@ -463,6 +464,7 @@ pub async fn create_pipeline(
463464
req: HttpRequest,
464465
pool: Data<PgPool>,
465466
pipeline: Json<CreatePipelineRequest>,
467+
feature_flags_client: Option<Data<configcat::Client>>,
466468
) -> Result<impl Responder, PipelineError> {
467469
let tenant_id = extract_tenant_id(&req)?;
468470
let pipeline = pipeline.into_inner();
@@ -476,11 +478,17 @@ pub async fn create_pipeline(
476478
return Err(PipelineError::DestinationNotFound(pipeline.destination_id));
477479
}
478480

481+
let max_pipelines = get_max_pipelines_per_tenant(
482+
feature_flags_client.as_ref(),
483+
tenant_id,
484+
MAX_PIPELINES_PER_TENANT,
485+
)
486+
.await;
479487
let pipeline_count =
480488
db::pipelines::count_pipelines_for_tenant(txn.deref_mut(), tenant_id).await?;
481-
if pipeline_count >= MAX_PIPELINES_PER_TENANT {
489+
if pipeline_count >= max_pipelines {
482490
return Err(PipelineError::PipelineLimitReached {
483-
limit: MAX_PIPELINES_PER_TENANT,
491+
limit: max_pipelines,
484492
});
485493
}
486494

0 commit comments

Comments
 (0)