Skip to content

Commit a0337a9

Browse files
committed
Improve
1 parent 987be79 commit a0337a9

File tree

1 file changed

+5
-9
lines changed

1 file changed

+5
-9
lines changed

etl-postgres/src/replication/schema.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -277,24 +277,21 @@ pub async fn load_table_schemas_at_snapshot(
277277
pipeline_id: i64,
278278
snapshot_id: SnapshotId,
279279
) -> Result<Vec<TableSchema>, sqlx::Error> {
280-
// Use a window function to find the latest schema version for each table
281-
// at or before the requested snapshot
280+
// Use DISTINCT ON to efficiently find the latest schema version for each table.
281+
// PostgreSQL optimizes DISTINCT ON with ORDER BY using index scans when possible.
282282
let rows = sqlx::query(
283283
r#"
284284
with latest_schemas as (
285-
select
285+
select distinct on (ts.table_id)
286286
ts.id,
287287
ts.table_id,
288288
ts.schema_name,
289289
ts.table_name,
290-
ts.snapshot_id,
291-
row_number() over (
292-
partition by ts.table_id
293-
order by ts.snapshot_id desc
294-
) as rn
290+
ts.snapshot_id
295291
from etl.table_schemas ts
296292
where ts.pipeline_id = $1
297293
and ts.snapshot_id <= $2
294+
order by ts.table_id, ts.snapshot_id desc
298295
)
299296
select
300297
ls.table_id,
@@ -310,7 +307,6 @@ pub async fn load_table_schemas_at_snapshot(
310307
tc.primary_key_ordinal_position
311308
from latest_schemas ls
312309
inner join etl.table_columns tc on ls.id = tc.table_schema_id
313-
where ls.rn = 1
314310
order by ls.table_id, tc.column_order
315311
"#,
316312
)

0 commit comments

Comments
 (0)