Skip to content

Commit 7efdc4e

Browse files
authored
ref(limits): Limit pipelines to 1 (#477)
1 parent d2f81e0 commit 7efdc4e

File tree

3 files changed

+148
-149
lines changed

3 files changed

+148
-149
lines changed

etl-api/src/db/pipelines.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use std::ops::DerefMut;
1818
use thiserror::Error;
1919

2020
/// Maximum number of pipelines allowed per tenant.
21-
pub const MAX_PIPELINES_PER_TENANT: i64 = 3;
21+
///
22+
/// For now, we keep the maximum to 1, this way, we give us a simpler surface area for breaking changes
23+
/// to the `etl` schema in the source database since only one pipeline will use it.
24+
pub const MAX_PIPELINES_PER_TENANT: i64 = 1;
2225

2326
pub struct Pipeline {
2427
pub id: i64,

etl-api/tests/destinations_pipelines.rs

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use etl_api::routes::destinations_pipelines::{
33
CreateDestinationPipelineRequest, CreateDestinationPipelineResponse,
44
UpdateDestinationPipelineRequest,
55
};
6-
use etl_api::routes::pipelines::{CreatePipelineRequest, ReadPipelineResponse};
6+
use etl_api::routes::pipelines::ReadPipelineResponse;
77
use etl_telemetry::tracing::init_test_tracing;
88
use reqwest::StatusCode;
99

@@ -134,15 +134,18 @@ async fn iceberg_supabase_destination_and_pipeline_can_be_created() {
134134
}
135135

136136
#[tokio::test(flavor = "multi_thread")]
137-
async fn tenant_cannot_create_more_than_three_destinations_pipelines() {
137+
async fn tenant_cannot_create_more_than_max_destinations_pipelines() {
138+
use etl_api::db::pipelines::MAX_PIPELINES_PER_TENANT;
139+
138140
init_test_tracing();
139141
// Arrange
140142
let app = spawn_test_app().await;
141143
let tenant_id = &create_tenant(&app).await;
142144
let source_id = create_source(&app, tenant_id).await;
143145
create_default_image(&app).await;
144146

145-
for idx in 0..3 {
147+
// Create the maximum allowed pipelines
148+
for idx in 0..MAX_PIPELINES_PER_TENANT {
146149
let destination_pipeline = CreateDestinationPipelineRequest {
147150
destination_name: format!("BigQuery Destination {idx}"),
148151
destination_config: new_bigquery_destination_config(),
@@ -155,8 +158,9 @@ async fn tenant_cannot_create_more_than_three_destinations_pipelines() {
155158
assert!(response.status().is_success());
156159
}
157160

161+
// Attempt to create one more pipeline should fail
158162
let destination_pipeline = CreateDestinationPipelineRequest {
159-
destination_name: "BigQuery Destination 3".to_string(),
163+
destination_name: format!("BigQuery Destination {MAX_PIPELINES_PER_TENANT}"),
160164
destination_config: new_bigquery_destination_config(),
161165
source_id,
162166
pipeline_config: new_pipeline_config(),
@@ -531,43 +535,44 @@ async fn destination_and_pipeline_with_another_tenants_pipeline_cannot_be_update
531535
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
532536
}
533537

534-
#[tokio::test(flavor = "multi_thread")]
535-
async fn duplicate_destination_pipeline_with_same_source_cannot_be_created() {
536-
init_test_tracing();
537-
// Arrange
538-
let app = spawn_test_app().await;
539-
create_default_image(&app).await;
540-
let tenant_id = &create_tenant(&app).await;
541-
let source_id = create_source(&app, tenant_id).await;
542-
543-
// Create first destination and pipeline
544-
let destination_pipeline = CreateDestinationPipelineRequest {
545-
destination_name: new_name(),
546-
destination_config: new_bigquery_destination_config(),
547-
source_id,
548-
pipeline_config: new_pipeline_config(),
549-
};
550-
let response = app
551-
.create_destination_pipeline(tenant_id, &destination_pipeline)
552-
.await;
553-
assert!(response.status().is_success());
554-
let response: CreateDestinationPipelineResponse = response
555-
.json()
556-
.await
557-
.expect("failed to deserialize response");
558-
let first_destination_id = response.destination_id;
559-
560-
// Act - Try to create another pipeline with same source and the first destination
561-
let pipeline_request = CreatePipelineRequest {
562-
source_id,
563-
destination_id: first_destination_id,
564-
config: updated_pipeline_config(),
565-
};
566-
let response = app.create_pipeline(tenant_id, &pipeline_request).await;
567-
568-
// Assert
569-
assert_eq!(response.status(), StatusCode::CONFLICT);
570-
}
538+
// TODO: Re-enable these tests once MAX_PIPELINES_PER_TENANT is lifted from 1.
539+
// #[tokio::test(flavor = "multi_thread")]
540+
// async fn duplicate_destination_pipeline_with_same_source_cannot_be_created() {
541+
// init_test_tracing();
542+
// // Arrange
543+
// let app = spawn_test_app().await;
544+
// create_default_image(&app).await;
545+
// let tenant_id = &create_tenant(&app).await;
546+
// let source_id = create_source(&app, tenant_id).await;
547+
//
548+
// // Create first destination and pipeline
549+
// let destination_pipeline = CreateDestinationPipelineRequest {
550+
// destination_name: new_name(),
551+
// destination_config: new_bigquery_destination_config(),
552+
// source_id,
553+
// pipeline_config: new_pipeline_config(),
554+
// };
555+
// let response = app
556+
// .create_destination_pipeline(tenant_id, &destination_pipeline)
557+
// .await;
558+
// assert!(response.status().is_success());
559+
// let response: CreateDestinationPipelineResponse = response
560+
// .json()
561+
// .await
562+
// .expect("failed to deserialize response");
563+
// let first_destination_id = response.destination_id;
564+
//
565+
// // Act - Try to create another pipeline with same source and the first destination
566+
// let pipeline_request = CreatePipelineRequest {
567+
// source_id,
568+
// destination_id: first_destination_id,
569+
// config: updated_pipeline_config(),
570+
// };
571+
// let response = app.create_pipeline(tenant_id, &pipeline_request).await;
572+
//
573+
// // Assert
574+
// assert_eq!(response.status(), StatusCode::CONFLICT);
575+
// }
571576

572577
#[tokio::test(flavor = "multi_thread")]
573578
async fn destination_and_pipeline_can_be_deleted() {

etl-api/tests/pipelines.rs

Lines changed: 98 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,17 @@ async fn pipeline_can_be_created() {
223223
}
224224

225225
#[tokio::test(flavor = "multi_thread")]
226-
async fn tenant_cannot_create_more_than_three_pipelines() {
226+
async fn tenant_cannot_create_more_than_max_pipelines() {
227+
use etl_api::db::pipelines::MAX_PIPELINES_PER_TENANT;
228+
227229
init_test_tracing();
228230
// Arrange
229231
let app = spawn_test_app().await;
230232
create_default_image(&app).await;
231233
let tenant_id = &create_tenant(&app).await;
232234

233-
for _ in 0..3 {
235+
// Create the maximum allowed pipelines
236+
for _ in 0..MAX_PIPELINES_PER_TENANT {
234237
let source_id = create_source(&app, tenant_id).await;
235238
let destination_id = create_destination(&app, tenant_id).await;
236239
let pipeline = CreatePipelineRequest {
@@ -242,6 +245,7 @@ async fn tenant_cannot_create_more_than_three_pipelines() {
242245
assert!(response.status().is_success());
243246
}
244247

248+
// Attempt to create one more pipeline should fail
245249
let source_id = create_source(&app, tenant_id).await;
246250
let destination_id = create_destination(&app, tenant_id).await;
247251
let pipeline = CreatePipelineRequest {
@@ -575,27 +579,17 @@ async fn all_pipelines_can_be_read() {
575579
let app = spawn_test_app().await;
576580
create_default_image(&app).await;
577581
let tenant_id = &create_tenant(&app).await;
578-
let source1_id = create_source(&app, tenant_id).await;
579-
let source2_id = create_source(&app, tenant_id).await;
580-
let destination1_id = create_destination(&app, tenant_id).await;
581-
let destination2_id = create_destination(&app, tenant_id).await;
582+
let source_id = create_source(&app, tenant_id).await;
583+
let destination_id = create_destination(&app, tenant_id).await;
582584

583-
let pipeline1_id = create_pipeline_with_config(
585+
let pipeline_id = create_pipeline_with_config(
584586
&app,
585587
tenant_id,
586-
source1_id,
587-
destination1_id,
588+
source_id,
589+
destination_id,
588590
new_pipeline_config(),
589591
)
590592
.await;
591-
let pipeline2_id = create_pipeline_with_config(
592-
&app,
593-
tenant_id,
594-
source2_id,
595-
destination2_id,
596-
updated_pipeline_config(),
597-
)
598-
.await;
599593

600594
// Act
601595
let response = app.read_all_pipelines(tenant_id).await;
@@ -606,98 +600,95 @@ async fn all_pipelines_can_be_read() {
606600
.json()
607601
.await
608602
.expect("failed to deserialize response");
609-
for pipeline in response.pipelines {
610-
if pipeline.id == pipeline1_id {
611-
assert_eq!(&pipeline.tenant_id, tenant_id);
612-
assert_eq!(pipeline.source_id, source1_id);
613-
assert_eq!(pipeline.destination_id, destination1_id);
614-
insta::assert_debug_snapshot!(pipeline.config);
615-
} else if pipeline.id == pipeline2_id {
616-
assert_eq!(&pipeline.tenant_id, tenant_id);
617-
assert_eq!(pipeline.source_id, source2_id);
618-
assert_eq!(pipeline.destination_id, destination2_id);
619-
insta::assert_debug_snapshot!(pipeline.config);
620-
}
621-
}
603+
assert_eq!(response.pipelines.len(), 1);
604+
let pipeline = &response.pipelines[0];
605+
assert_eq!(pipeline.id, pipeline_id);
606+
assert_eq!(&pipeline.tenant_id, tenant_id);
607+
assert_eq!(pipeline.source_id, source_id);
608+
assert_eq!(pipeline.destination_id, destination_id);
609+
insta::assert_debug_snapshot!(pipeline.config);
622610
}
623611

624-
#[tokio::test(flavor = "multi_thread")]
625-
async fn duplicate_pipeline_with_same_source_and_destination_cannot_be_created() {
626-
init_test_tracing();
627-
// Arrange
628-
let app = spawn_test_app().await;
629-
create_default_image(&app).await;
630-
let tenant_id = &create_tenant(&app).await;
631-
let source_id = create_source(&app, tenant_id).await;
632-
let destination_id = create_destination(&app, tenant_id).await;
633-
634-
// Create first pipeline
635-
let pipeline = CreatePipelineRequest {
636-
source_id,
637-
destination_id,
638-
config: new_pipeline_config(),
639-
};
640-
let response = app.create_pipeline(tenant_id, &pipeline).await;
641-
assert!(response.status().is_success());
642-
643-
// Act - Try to create duplicate pipeline with same source and destination
644-
let duplicate_pipeline = CreatePipelineRequest {
645-
source_id,
646-
destination_id,
647-
config: updated_pipeline_config(),
648-
};
649-
let response = app.create_pipeline(tenant_id, &duplicate_pipeline).await;
650-
651-
// Assert
652-
assert_eq!(response.status(), StatusCode::CONFLICT);
653-
}
654-
655-
#[tokio::test(flavor = "multi_thread")]
656-
async fn updating_pipeline_to_duplicate_source_destination_combination_fails() {
657-
init_test_tracing();
658-
// Arrange
659-
let app = spawn_test_app().await;
660-
create_default_image(&app).await;
661-
let tenant_id = &create_tenant(&app).await;
662-
let source1_id = create_source(&app, tenant_id).await;
663-
let source2_id = create_source(&app, tenant_id).await;
664-
let destination_id = create_destination(&app, tenant_id).await;
665-
666-
// Create first pipeline
667-
let pipeline1 = CreatePipelineRequest {
668-
source_id: source1_id,
669-
destination_id,
670-
config: new_pipeline_config(),
671-
};
672-
let response = app.create_pipeline(tenant_id, &pipeline1).await;
673-
assert!(response.status().is_success());
674-
675-
// Create second pipeline with different source
676-
let pipeline2 = CreatePipelineRequest {
677-
source_id: source2_id,
678-
destination_id,
679-
config: new_pipeline_config(),
680-
};
681-
let response = app.create_pipeline(tenant_id, &pipeline2).await;
682-
let response: CreatePipelineResponse = response
683-
.json()
684-
.await
685-
.expect("failed to deserialize response");
686-
let pipeline2_id = response.id;
687-
688-
// Act - Try to update second pipeline to have same source as first
689-
let updated_config = UpdatePipelineRequest {
690-
source_id: source1_id, // This would create a duplicate
691-
destination_id,
692-
config: updated_pipeline_config(),
693-
};
694-
let response = app
695-
.update_pipeline(tenant_id, pipeline2_id, &updated_config)
696-
.await;
697-
698-
// Assert
699-
assert_eq!(response.status(), StatusCode::CONFLICT);
700-
}
612+
// TODO: Re-enable these tests once MAX_PIPELINES_PER_TENANT is lifted from 1.
613+
// These tests require multiple pipelines per tenant to function correctly.
614+
//
615+
// #[tokio::test(flavor = "multi_thread")]
616+
// async fn duplicate_pipeline_with_same_source_and_destination_cannot_be_created() {
617+
// init_test_tracing();
618+
// // Arrange
619+
// let app = spawn_test_app().await;
620+
// create_default_image(&app).await;
621+
// let tenant_id = &create_tenant(&app).await;
622+
// let source_id = create_source(&app, tenant_id).await;
623+
// let destination_id = create_destination(&app, tenant_id).await;
624+
//
625+
// // Create first pipeline
626+
// let pipeline = CreatePipelineRequest {
627+
// source_id,
628+
// destination_id,
629+
// config: new_pipeline_config(),
630+
// };
631+
// let response = app.create_pipeline(tenant_id, &pipeline).await;
632+
// assert!(response.status().is_success());
633+
//
634+
// // Act - Try to create duplicate pipeline with same source and destination
635+
// let duplicate_pipeline = CreatePipelineRequest {
636+
// source_id,
637+
// destination_id,
638+
// config: updated_pipeline_config(),
639+
// };
640+
// let response = app.create_pipeline(tenant_id, &duplicate_pipeline).await;
641+
//
642+
// // Assert
643+
// assert_eq!(response.status(), StatusCode::CONFLICT);
644+
// }
645+
//
646+
// #[tokio::test(flavor = "multi_thread")]
647+
// async fn updating_pipeline_to_duplicate_source_destination_combination_fails() {
648+
// init_test_tracing();
649+
// // Arrange
650+
// let app = spawn_test_app().await;
651+
// create_default_image(&app).await;
652+
// let tenant_id = &create_tenant(&app).await;
653+
// let source1_id = create_source(&app, tenant_id).await;
654+
// let source2_id = create_source(&app, tenant_id).await;
655+
// let destination_id = create_destination(&app, tenant_id).await;
656+
//
657+
// // Create first pipeline
658+
// let pipeline1 = CreatePipelineRequest {
659+
// source_id: source1_id,
660+
// destination_id,
661+
// config: new_pipeline_config(),
662+
// };
663+
// let response = app.create_pipeline(tenant_id, &pipeline1).await;
664+
// assert!(response.status().is_success());
665+
//
666+
// // Create second pipeline with different source
667+
// let pipeline2 = CreatePipelineRequest {
668+
// source_id: source2_id,
669+
// destination_id,
670+
// config: new_pipeline_config(),
671+
// };
672+
// let response = app.create_pipeline(tenant_id, &pipeline2).await;
673+
// let response: CreatePipelineResponse = response
674+
// .json()
675+
// .await
676+
// .expect("failed to deserialize response");
677+
// let pipeline2_id = response.id;
678+
//
679+
// // Act - Try to update second pipeline to have same source as first
680+
// let updated_config = UpdatePipelineRequest {
681+
// source_id: source1_id, // This would create a duplicate
682+
// destination_id,
683+
// config: updated_pipeline_config(),
684+
// };
685+
// let response = app
686+
// .update_pipeline(tenant_id, pipeline2_id, &updated_config)
687+
// .await;
688+
//
689+
// // Assert
690+
// assert_eq!(response.status(), StatusCode::CONFLICT);
691+
// }
701692

702693
#[tokio::test(flavor = "multi_thread")]
703694
async fn pipeline_version_can_be_updated() {

0 commit comments

Comments
 (0)