Skip to content

Commit 5dc8ad7

Browse files
committed
Improve
1 parent a0337a9 commit 5dc8ad7

File tree

3 files changed

+249
-128
lines changed

3 files changed

+249
-128
lines changed

etl-postgres/src/replication/schema.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,21 +136,23 @@ define_type_mappings! {
136136

137137
/// Stores a table schema in the database with a specific snapshot ID.
138138
///
139-
/// Inserts a new table schema version and column information in schema storage tables
140-
/// using a transaction to ensure atomicity. Unlike upsert, this creates a new version
141-
/// entry for schema versioning.
139+
/// Upserts table schema and replaces all column information in schema storage tables
140+
/// using a transaction to ensure atomicity. If a schema version already exists for
141+
/// the same (pipeline_id, table_id, snapshot_id), columns are deleted and re-inserted.
142142
pub async fn store_table_schema(
143143
pool: &PgPool,
144144
pipeline_id: i64,
145145
table_schema: &TableSchema,
146146
) -> Result<(), sqlx::Error> {
147147
let mut tx = pool.begin().await?;
148148

149-
// Insert new table schema version
149+
// Upsert table schema version
150150
let table_schema_id: i64 = sqlx::query(
151151
r#"
152152
insert into etl.table_schemas (pipeline_id, table_id, schema_name, table_name, snapshot_id)
153153
values ($1, $2, $3, $4, $5)
154+
on conflict (pipeline_id, table_id, snapshot_id)
155+
do update set schema_name = excluded.schema_name, table_name = excluded.table_name
154156
returning id
155157
"#,
156158
)
@@ -163,6 +165,12 @@ pub async fn store_table_schema(
163165
.await?
164166
.get(0);
165167

168+
// Delete existing columns for this table schema to handle schema changes
169+
sqlx::query("delete from etl.table_columns where table_schema_id = $1")
170+
.bind(table_schema_id)
171+
.execute(&mut *tx)
172+
.await?;
173+
166174
// Insert all columns
167175
for column_schema in table_schema.column_schemas.iter() {
168176
sqlx::query(

etl/src/store/both/postgres.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ use crate::{bail, etl_error};
1919

2020
const NUM_POOL_CONNECTIONS: u32 = 1;
2121

22+
/// Maximum number of schema snapshots to keep cached per table.
23+
///
24+
/// This limits memory usage by evicting older snapshots when new ones are added.
25+
/// In practice, during a single batch of events, it's highly unlikely to need
26+
/// more than 2 schema versions for any given table.
27+
const MAX_CACHED_SCHEMAS_PER_TABLE: usize = 2;
28+
2229
/// Converts ETL table replication phases to Postgres database state format.
2330
///
2431
/// This conversion transforms internal ETL replication states into the format
@@ -157,6 +164,38 @@ impl Inner {
157164
let count = self.phase_counts.entry(phase).or_default();
158165
*count += 1;
159166
}
167+
168+
/// Inserts a schema into the cache and evicts older snapshots if necessary.
169+
///
170+
/// Maintains at most [`MAX_CACHED_SCHEMAS_PER_TABLE`] snapshots per table,
171+
/// evicting the oldest snapshots when the limit is exceeded.
172+
fn insert_schema_with_eviction(&mut self, table_schema: Arc<TableSchema>) {
173+
let table_id = table_schema.id;
174+
let snapshot_id = table_schema.snapshot_id;
175+
176+
// Insert the new schema
177+
self.table_schemas.insert((table_id, snapshot_id), table_schema);
178+
179+
// Collect all snapshot_ids for this table
180+
let mut snapshots_for_table: Vec<SnapshotId> = self
181+
.table_schemas
182+
.keys()
183+
.filter(|(tid, _)| *tid == table_id)
184+
.map(|(_, sid)| *sid)
185+
.collect();
186+
187+
// If we exceed the limit, evict oldest snapshots
188+
if snapshots_for_table.len() > MAX_CACHED_SCHEMAS_PER_TABLE {
189+
// Sort ascending so oldest are first
190+
snapshots_for_table.sort();
191+
192+
// Remove oldest entries until we're at the limit
193+
let to_remove = snapshots_for_table.len() - MAX_CACHED_SCHEMAS_PER_TABLE;
194+
for &old_snapshot_id in snapshots_for_table.iter().take(to_remove) {
195+
self.table_schemas.remove(&(table_id, old_snapshot_id));
196+
}
197+
}
198+
}
160199
}
161200

162201
/// Postgres-backed storage for ETL pipeline state and schema information.
@@ -573,9 +612,7 @@ impl SchemaStore for PostgresStore {
573612
let mut inner = self.inner.lock().await;
574613

575614
let table_schema = Arc::new(table_schema);
576-
inner
577-
.table_schemas
578-
.insert((*table_id, table_schema.snapshot_id), table_schema.clone());
615+
inner.insert_schema_with_eviction(table_schema.clone());
579616

580617
Some(table_schema)
581618
};
@@ -643,7 +680,6 @@ impl SchemaStore for PostgresStore {
643680

644681
let pool = self.connect_to_source().await?;
645682

646-
let mut inner = self.inner.lock().await;
647683
schema::store_table_schema(&pool, self.pipeline_id as i64, &table_schema)
648684
.await
649685
.map_err(|err| {
@@ -654,9 +690,9 @@ impl SchemaStore for PostgresStore {
654690
)
655691
})?;
656692

657-
let key = (table_schema.id, table_schema.snapshot_id);
693+
let mut inner = self.inner.lock().await;
658694
let table_schema = Arc::new(table_schema);
659-
inner.table_schemas.insert(key, table_schema.clone());
695+
inner.insert_schema_with_eviction(table_schema.clone());
660696

661697
Ok(table_schema)
662698
}

0 commit comments

Comments
 (0)