Skip to content

Commit 987be79

Browse files
committed
Improve
1 parent 43b49df commit 987be79

File tree

12 files changed

+567
-154
lines changed

12 files changed

+567
-154
lines changed

etl-postgres/src/replication/schema.rs

Lines changed: 118 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use sqlx::{PgExecutor, PgPool, Row};
44
use std::collections::HashMap;
55
use tokio_postgres::types::Type as PgType;
66

7-
use crate::types::{ColumnSchema, TableId, TableName, TableSchema};
7+
use crate::types::{ColumnSchema, SnapshotId, TableId, TableName, TableSchema};
88

99
macro_rules! define_type_mappings {
1010
(
@@ -134,44 +134,35 @@ define_type_mappings! {
134134
DATE_RANGE => "DATE_RANGE"
135135
}
136136

137-
/// Stores a table schema in the database.
137+
/// Stores a table schema in the database with a specific snapshot ID.
138138
///
139-
/// Inserts or updates table schema and column information in schema storage tables
140-
/// using a transaction to ensure atomicity.
139+
/// Inserts a new table schema version and column information in schema storage tables
140+
/// using a transaction to ensure atomicity. Unlike upsert, this creates a new version
141+
/// entry for schema versioning.
141142
pub async fn store_table_schema(
142143
pool: &PgPool,
143144
pipeline_id: i64,
144145
table_schema: &TableSchema,
145146
) -> Result<(), sqlx::Error> {
146147
let mut tx = pool.begin().await?;
147148

148-
// Insert or update table schema record
149+
// Insert new table schema version
149150
let table_schema_id: i64 = sqlx::query(
150151
r#"
151-
insert into etl.table_schemas (pipeline_id, table_id, schema_name, table_name)
152-
values ($1, $2, $3, $4)
153-
on conflict (pipeline_id, table_id)
154-
do update set
155-
schema_name = excluded.schema_name,
156-
table_name = excluded.table_name,
157-
updated_at = now()
152+
insert into etl.table_schemas (pipeline_id, table_id, schema_name, table_name, snapshot_id)
153+
values ($1, $2, $3, $4, $5)
158154
returning id
159155
"#,
160156
)
161157
.bind(pipeline_id)
162158
.bind(table_schema.id.into_inner() as i64)
163159
.bind(&table_schema.name.schema)
164160
.bind(&table_schema.name.name)
161+
.bind(table_schema.snapshot_id)
165162
.fetch_one(&mut *tx)
166163
.await?
167164
.get(0);
168165

169-
// Delete existing columns for this table schema to handle schema changes
170-
sqlx::query("delete from etl.table_columns where table_schema_id = $1")
171-
.bind(table_schema_id)
172-
.execute(&mut *tx)
173-
.await?;
174-
175166
// Insert all columns
176167
for column_schema in table_schema.column_schemas.iter() {
177168
sqlx::query(
@@ -199,20 +190,34 @@ pub async fn store_table_schema(
199190
Ok(())
200191
}
201192

202-
/// Loads all table schemas for a pipeline from the database.
193+
/// Loads all table schemas for a pipeline from the database at the latest snapshot.
203194
///
204195
/// Retrieves table schemas and columns from schema storage tables,
205-
/// reconstructing complete [`TableSchema`] objects.
196+
/// reconstructing complete [`TableSchema`] objects. This is equivalent to
197+
/// calling [`load_table_schemas_at_snapshot`] with `i64::MAX`.
206198
pub async fn load_table_schemas(
207199
pool: &PgPool,
208200
pipeline_id: i64,
209201
) -> Result<Vec<TableSchema>, sqlx::Error> {
202+
load_table_schemas_at_snapshot(pool, pipeline_id, i64::MAX).await
203+
}
204+
205+
/// Loads a single table schema with the largest snapshot_id <= the requested snapshot.
206+
///
207+
/// Returns `None` if no schema version exists for the table at or before the given snapshot.
208+
pub async fn load_table_schema_at_snapshot(
209+
pool: &PgPool,
210+
pipeline_id: i64,
211+
table_id: TableId,
212+
snapshot_id: SnapshotId,
213+
) -> Result<Option<TableSchema>, sqlx::Error> {
210214
let rows = sqlx::query(
211215
r#"
212216
select
213217
ts.table_id,
214218
ts.schema_name,
215219
ts.table_name,
220+
ts.snapshot_id,
216221
tc.column_name,
217222
tc.column_type,
218223
tc.type_modifier,
@@ -222,11 +227,95 @@ pub async fn load_table_schemas(
222227
tc.primary_key_ordinal_position
223228
from etl.table_schemas ts
224229
inner join etl.table_columns tc on ts.id = tc.table_schema_id
225-
where ts.pipeline_id = $1
226-
order by ts.table_id, tc.column_order
230+
where ts.id = (
231+
select id from etl.table_schemas
232+
where pipeline_id = $1 and table_id = $2 and snapshot_id <= $3
233+
order by snapshot_id desc
234+
limit 1
235+
)
236+
order by tc.column_order
237+
"#,
238+
)
239+
.bind(pipeline_id)
240+
.bind(SqlxTableId(table_id.into_inner()))
241+
.bind(snapshot_id)
242+
.fetch_all(pool)
243+
.await?;
244+
245+
if rows.is_empty() {
246+
return Ok(None);
247+
}
248+
249+
let first_row = &rows[0];
250+
let table_oid: SqlxTableId = first_row.get("table_id");
251+
let table_id = TableId::new(table_oid.0);
252+
let schema_name: String = first_row.get("schema_name");
253+
let table_name: String = first_row.get("table_name");
254+
let snapshot_id: SnapshotId = first_row.get("snapshot_id");
255+
256+
let mut table_schema = TableSchema::with_snapshot_id(
257+
table_id,
258+
TableName::new(schema_name, table_name),
259+
vec![],
260+
snapshot_id,
261+
);
262+
263+
for row in rows {
264+
table_schema.add_column_schema(parse_column_schema(&row));
265+
}
266+
267+
Ok(Some(table_schema))
268+
}
269+
270+
/// Loads all table schemas for a pipeline at a specific snapshot point.
271+
///
272+
/// For each table, retrieves the schema version with the largest snapshot_id
273+
/// that is <= the requested snapshot_id. Tables without any schema version
274+
/// at or before the snapshot are excluded from the result.
275+
pub async fn load_table_schemas_at_snapshot(
276+
pool: &PgPool,
277+
pipeline_id: i64,
278+
snapshot_id: SnapshotId,
279+
) -> Result<Vec<TableSchema>, sqlx::Error> {
280+
// Use a window function to find the latest schema version for each table
281+
// at or before the requested snapshot
282+
let rows = sqlx::query(
283+
r#"
284+
with latest_schemas as (
285+
select
286+
ts.id,
287+
ts.table_id,
288+
ts.schema_name,
289+
ts.table_name,
290+
ts.snapshot_id,
291+
row_number() over (
292+
partition by ts.table_id
293+
order by ts.snapshot_id desc
294+
) as rn
295+
from etl.table_schemas ts
296+
where ts.pipeline_id = $1
297+
and ts.snapshot_id <= $2
298+
)
299+
select
300+
ls.table_id,
301+
ls.schema_name,
302+
ls.table_name,
303+
ls.snapshot_id,
304+
tc.column_name,
305+
tc.column_type,
306+
tc.type_modifier,
307+
tc.nullable,
308+
tc.primary_key,
309+
tc.column_order,
310+
tc.primary_key_ordinal_position
311+
from latest_schemas ls
312+
inner join etl.table_columns tc on ls.id = tc.table_schema_id
313+
where ls.rn = 1
314+
order by ls.table_id, tc.column_order
227315
"#,
228316
)
229317
.bind(pipeline_id)
318+
.bind(snapshot_id)
230319
.fetch_all(pool)
231320
.await?;
232321

@@ -237,9 +326,15 @@ pub async fn load_table_schemas(
237326
let table_id = TableId::new(table_oid.0);
238327
let schema_name: String = row.get("schema_name");
239328
let table_name: String = row.get("table_name");
329+
let row_snapshot_id: SnapshotId = row.get("snapshot_id");
240330

241331
let entry = table_schemas.entry(table_id).or_insert_with(|| {
242-
TableSchema::new(table_id, TableName::new(schema_name, table_name), vec![])
332+
TableSchema::with_snapshot_id(
333+
table_id,
334+
TableName::new(schema_name, table_name),
335+
vec![],
336+
row_snapshot_id,
337+
)
243338
});
244339

245340
entry.add_column_schema(parse_column_schema(&row));

etl-postgres/src/types/schema.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ pub enum SchemaError {
1717
/// An object identifier in Postgres.
1818
type Oid = u32;
1919

20+
/// Snapshot identifier for schema versioning.
21+
///
22+
/// The value represents the start_lsn of the DDL message that created this schema version.
23+
/// A value of 0 indicates the initial schema before any DDL changes.
24+
pub type SnapshotId = i64;
25+
26+
/// The initial snapshot ID used for the first schema version.
27+
pub const INITIAL_SNAPSHOT_ID: SnapshotId = 0;
28+
2029
/// A fully qualified Postgres table name consisting of a schema and table name.
2130
///
2231
/// This type represents a table identifier in Postgres, which requires both a schema name
@@ -189,23 +198,39 @@ impl ToSql for TableId {
189198
/// Represents the complete schema of a Postgres table.
190199
///
191200
/// This type contains all metadata about a table including its name, OID,
192-
/// and the schemas of all its columns.
201+
/// the schemas of all its columns, and a snapshot identifier for versioning.
193202
#[derive(Debug, Clone, Eq, PartialEq)]
194203
pub struct TableSchema {
195-
/// The Postgres OID of the table
204+
/// The Postgres OID of the table.
196205
pub id: TableId,
197-
/// The fully qualified name of the table
206+
/// The fully qualified name of the table.
198207
pub name: TableName,
199-
/// The schemas of all columns in the table
208+
/// The schemas of all columns in the table.
200209
pub column_schemas: Vec<ColumnSchema>,
210+
/// The snapshot identifier for this schema version.
211+
///
212+
/// Value 0 indicates the initial schema, other values are start_lsn positions of DDL changes.
213+
pub snapshot_id: SnapshotId,
201214
}
202215

203216
impl TableSchema {
217+
/// Creates a new [`TableSchema`] with the initial snapshot ID (0).
204218
pub fn new(id: TableId, name: TableName, column_schemas: Vec<ColumnSchema>) -> Self {
219+
Self::with_snapshot_id(id, name, column_schemas, INITIAL_SNAPSHOT_ID)
220+
}
221+
222+
/// Creates a new [`TableSchema`] with a specific snapshot ID.
223+
pub fn with_snapshot_id(
224+
id: TableId,
225+
name: TableName,
226+
column_schemas: Vec<ColumnSchema>,
227+
snapshot_id: SnapshotId,
228+
) -> Self {
205229
Self {
206230
id,
207231
name,
208232
column_schemas,
233+
snapshot_id,
209234
}
210235
}
211236

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- Add snapshot_id column to table_schemas for schema versioning.
2+
-- The snapshot_id value is the start_lsn of the DDL message that created this schema version.
3+
-- Initial schemas use snapshot_id=0.
4+
5+
ALTER TABLE etl.table_schemas
6+
ADD COLUMN IF NOT EXISTS snapshot_id BIGINT NOT NULL DEFAULT 0;
7+
8+
-- Change unique constraint from (pipeline_id, table_id) to (pipeline_id, table_id, snapshot_id)
9+
-- to allow multiple schema versions per table.
10+
ALTER TABLE etl.table_schemas
11+
DROP CONSTRAINT IF EXISTS table_schemas_pipeline_id_table_id_key;
12+
13+
ALTER TABLE etl.table_schemas
14+
ADD CONSTRAINT table_schemas_pipeline_id_table_id_snapshot_id_key
15+
UNIQUE (pipeline_id, table_id, snapshot_id);
16+
17+
-- Index for efficient "find largest snapshot_id <= X" queries.
18+
CREATE INDEX IF NOT EXISTS idx_table_schemas_pipeline_table_snapshot_id
19+
ON etl.table_schemas (pipeline_id, table_id, snapshot_id DESC);

etl/src/conversions/event.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use core::str;
22
use std::collections::HashSet;
33

44
use etl_postgres::types::{
5-
ColumnSchema, ReplicatedTableSchema, TableId, TableName, TableSchema, convert_type_oid_to_type,
5+
ColumnSchema, ReplicatedTableSchema, SnapshotId, TableId, TableName, TableSchema,
6+
convert_type_oid_to_type,
67
};
78
use postgres_replication::protocol;
89
use serde::Deserialize;
@@ -325,11 +326,14 @@ pub fn parse_ddl_schema_change_message(content: &str) -> EtlResult<DdlSchemaChan
325326
})
326327
}
327328

328-
/// Converts a [`DdlSchemaChangeMessage`] to a [`TableSchema`].
329+
/// Converts a [`DdlSchemaChangeMessage`] to a [`TableSchema`] with a specific snapshot ID.
329330
///
330331
/// This is used to update the stored table schema when a DDL change is detected.
331-
#[allow(dead_code)]
332-
pub fn ddl_message_to_table_schema(message: &DdlSchemaChangeMessage) -> TableSchema {
332+
/// The snapshot_id should be the start_lsn of the DDL message.
333+
pub fn ddl_message_to_table_schema(
334+
message: &DdlSchemaChangeMessage,
335+
snapshot_id: SnapshotId,
336+
) -> TableSchema {
333337
let table_name = TableName::new(message.schema_name.clone(), message.table_name.clone());
334338
let column_schemas = message
335339
.columns
@@ -347,9 +351,10 @@ pub fn ddl_message_to_table_schema(message: &DdlSchemaChangeMessage) -> TableSch
347351
})
348352
.collect();
349353

350-
TableSchema::new(
354+
TableSchema::with_snapshot_id(
351355
TableId::new(message.table_id as u32),
352356
table_name,
353357
column_schemas,
358+
snapshot_id,
354359
)
355360
}

0 commit comments

Comments
 (0)