Skip to content

Commit 1b25cda

Browse files
authored
ref(api): Improve performance of the replication status endpoint (#484)
1 parent 7efdc4e commit 1b25cda

File tree

2 files changed

+45
-27
lines changed

2 files changed

+45
-27
lines changed

etl-api/src/routes/pipelines.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use actix_web::{
44
post,
55
web::{Data, Json, Path},
66
};
7-
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, health, lag, state};
7+
use etl_postgres::replication::{
8+
TableLookupError, get_table_names_from_table_ids, health, lag, state,
9+
};
810
use etl_postgres::types::TableId;
911
use serde::{Deserialize, Serialize};
1012
use sqlx::PgPool;
@@ -939,7 +941,7 @@ pub async fn get_pipeline_replication_status(
939941

940942
txn.commit().await?;
941943

942-
// Connect to the source database to read replication state
944+
// Connect to the source database to read the necessary state
943945
let source_pool =
944946
connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?;
945947

@@ -953,11 +955,20 @@ pub async fn get_pipeline_replication_status(
953955
let mut lag_metrics = lag::get_pipeline_lag_metrics(&source_pool, pipeline_id as u64).await?;
954956
let apply_lag = lag_metrics.apply.map(Into::into);
955957

956-
// Convert database states to UI-friendly format and fetch table names
958+
// Collect all table IDs and fetch their names in a single batch query
959+
let table_ids: Vec<TableId> = state_rows
960+
.iter()
961+
.map(|row| TableId::new(row.table_id.0))
962+
.collect();
963+
let table_names = get_table_names_from_table_ids(&source_pool, &table_ids).await?;
964+
965+
// Convert database states to UI-friendly format
957966
let mut tables: Vec<TableReplicationStatus> = Vec::new();
958967
for row in state_rows {
959968
let table_id = TableId::new(row.table_id.0);
960-
let table_name = get_table_name_from_oid(&source_pool, table_id).await?;
969+
let table_name = table_names
970+
.get(&table_id)
971+
.ok_or(TableLookupError::TableNotFound(table_id))?;
961972

962973
// Extract the metadata row from the database
963974
let table_replication_state = row

etl-postgres/src/replication/db.rs

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::num::NonZeroI32;
23

34
use etl_config::shared::{IntoConnectOptions, PgConnectionConfig};
@@ -37,38 +38,44 @@ pub async fn connect_to_source_database(
3738
Ok(pool)
3839
}
3940

40-
/// Retrieves table name from table OID by querying system catalogs.
41+
/// Retrieves table names for multiple table ids in a single query.
4142
///
42-
/// Looks up the schema and table name for the given table OID using Postgres's
43-
/// pg_class and pg_namespace system tables.
44-
pub async fn get_table_name_from_oid(
43+
/// Looks up the schema and table names for all given table OIDs using Postgres's
44+
/// pg_class and pg_namespace system tables. Returns a HashMap mapping each TableId
45+
/// to its corresponding TableName.
46+
pub async fn get_table_names_from_table_ids(
4547
pool: &PgPool,
46-
table_id: TableId,
47-
) -> Result<TableName, TableLookupError> {
48-
let query = "
49-
select n.nspname as schema_name, c.relname as table_name
50-
from pg_class c
51-
join pg_namespace n on c.relnamespace = n.oid
52-
where c.oid = $1
48+
table_ids: &[TableId],
49+
) -> Result<HashMap<TableId, TableName>, TableLookupError> {
50+
if table_ids.is_empty() {
51+
return Ok(HashMap::new());
52+
}
53+
54+
let query = "select c.oid::int as oid, n.nspname as schema_name, c.relname as table_name
55+
from pg_class c
56+
join pg_namespace n on c.relnamespace = n.oid
57+
where c.oid = any($1::oid[])
5358
";
5459

55-
let row = sqlx::query(query)
56-
.bind(table_id.into_inner() as i64)
57-
.fetch_optional(pool)
58-
.await?;
60+
let oids: Vec<i32> = table_ids.iter().map(|id| id.into_inner() as i32).collect();
61+
let rows = sqlx::query(query).bind(&oids).fetch_all(pool).await?;
5962

60-
match row {
61-
Some(row) => {
62-
let schema_name: String = row.try_get("schema_name")?;
63-
let table_name: String = row.try_get("table_name")?;
63+
let mut result = HashMap::with_capacity(rows.len());
64+
for row in rows {
65+
let oid: i32 = row.try_get("oid")?;
66+
let schema_name: String = row.try_get("schema_name")?;
67+
let table_name: String = row.try_get("table_name")?;
6468

65-
Ok(TableName {
69+
result.insert(
70+
TableId::new(oid as u32),
71+
TableName {
6672
schema: schema_name,
6773
name: table_name,
68-
})
69-
}
70-
None => Err(TableLookupError::TableNotFound(table_id)),
74+
},
75+
);
7176
}
77+
78+
Ok(result)
7279
}
7380

7481
/// Extracts the PostgreSQL server version from a version string.

0 commit comments

Comments
 (0)