Skip to content

Commit 364d90c

Browse files
committed
Improve
1 parent 5900822 commit 364d90c

File tree

2 files changed

+56
-52
lines changed

2 files changed

+56
-52
lines changed

etl/src/conversions/event.rs

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ use crate::{bail, etl_error};
2020
/// event trigger. Messages with this prefix contain JSON-encoded schema information.
2121
pub const DDL_MESSAGE_PREFIX: &str = "supabase_etl_ddl";
2222

23-
/// Represents a DDL schema change message emitted by Postgres event trigger.
23+
/// Represents a schema change message emitted by Postgres event trigger.
2424
///
2525
/// This message is emitted when ALTER TABLE commands are executed on tables
2626
/// that are part of a publication.
2727
#[derive(Debug, Clone, Deserialize)]
28-
pub struct DdlSchemaChangeMessage {
28+
pub struct SchemaChangeMessage {
2929
/// The DDL command that triggered this message (e.g., "ALTER TABLE").
3030
pub event: String,
3131
/// The schema name of the affected table.
@@ -35,13 +35,49 @@ pub struct DdlSchemaChangeMessage {
3535
/// The OID of the affected table.
3636
pub table_id: i64,
3737
/// The columns of the table after the schema change.
38-
pub columns: Vec<DdlColumnSchema>,
38+
pub columns: Vec<ColumnSchemaMessage>,
3939
}
4040

41-
/// Represents a column schema in a DDL schema change message.
41+
impl SchemaChangeMessage {
42+
43+
/// Converts a [`SchemaChangeMessage`] to a [`TableSchema`] with a specific snapshot ID.
44+
///
45+
/// This is used to update the stored table schema when a DDL change is detected.
46+
/// The snapshot_id should be the start_lsn of the DDL message.
47+
pub fn into_table_schema(
48+
self,
49+
snapshot_id: SnapshotId,
50+
) -> TableSchema {
51+
let table_name = TableName::new(self.schema_name, self.table_name);
52+
let column_schemas = self
53+
.columns
54+
.into_iter()
55+
.map(|column| {
56+
let typ = convert_type_oid_to_type(column.type_oid);
57+
ColumnSchema::new(
58+
column.name,
59+
typ,
60+
column.type_modifier,
61+
column.ordinal_position,
62+
column.primary_key_ordinal_position,
63+
column.nullable,
64+
)
65+
})
66+
.collect();
67+
68+
TableSchema::with_snapshot_id(
69+
TableId::new(self.table_id as u32),
70+
table_name,
71+
column_schemas,
72+
snapshot_id,
73+
)
74+
}
75+
}
76+
77+
/// Represents a column schema in a schema change message.
4278
#[allow(dead_code)]
4379
#[derive(Debug, Clone, Deserialize)]
44-
pub struct DdlColumnSchema {
80+
pub struct ColumnSchemaMessage {
4581
/// The name of the column.
4682
pub name: String,
4783
/// The OID of the column's data type.
@@ -316,45 +352,12 @@ pub fn convert_tuple_to_row<'a>(
316352
/// Parses a DDL schema change message from its JSON content.
317353
///
318354
/// Returns the parsed message if successful, or an error if the JSON is malformed.
319-
pub fn parse_ddl_schema_change_message(content: &str) -> EtlResult<DdlSchemaChangeMessage> {
355+
pub fn parse_schema_change_message(content: &str) -> EtlResult<SchemaChangeMessage> {
320356
serde_json::from_str(content).map_err(|e| {
321357
etl_error!(
322358
ErrorKind::ConversionError,
323-
"Failed to parse DDL schema change message",
324-
format!("Invalid JSON in DDL message: {}", e)
359+
"Failed to parse schema change message",
360+
format!("Invalid JSON in schema change message: {}", e)
325361
)
326362
})
327363
}
328-
329-
/// Converts a [`DdlSchemaChangeMessage`] to a [`TableSchema`] with a specific snapshot ID.
330-
///
331-
/// This is used to update the stored table schema when a DDL change is detected.
332-
/// The snapshot_id should be the start_lsn of the DDL message.
333-
pub fn ddl_message_to_table_schema(
334-
message: &DdlSchemaChangeMessage,
335-
snapshot_id: SnapshotId,
336-
) -> TableSchema {
337-
let table_name = TableName::new(message.schema_name.clone(), message.table_name.clone());
338-
let column_schemas = message
339-
.columns
340-
.iter()
341-
.map(|col| {
342-
let typ = convert_type_oid_to_type(col.type_oid);
343-
ColumnSchema::new(
344-
col.name.clone(),
345-
typ,
346-
col.type_modifier,
347-
col.ordinal_position,
348-
col.primary_key_ordinal_position,
349-
col.nullable,
350-
)
351-
})
352-
.collect();
353-
354-
TableSchema::with_snapshot_id(
355-
TableId::new(message.table_id as u32),
356-
table_name,
357-
column_schemas,
358-
snapshot_id,
359-
)
360-
}

etl/src/replication/apply.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::concurrency::shutdown::ShutdownRx;
33
use crate::concurrency::signal::SignalRx;
44
use crate::concurrency::stream::{TimeoutStream, TimeoutStreamResult};
55
use crate::conversions::event::{
6-
DDL_MESSAGE_PREFIX, ddl_message_to_table_schema, parse_ddl_schema_change_message,
6+
DDL_MESSAGE_PREFIX, parse_schema_change_message,
77
parse_event_from_begin_message, parse_event_from_commit_message,
88
parse_event_from_delete_message, parse_event_from_insert_message,
99
parse_event_from_truncate_message, parse_event_from_update_message,
@@ -1524,14 +1524,14 @@ where
15241524
}
15251525

15261526
let content = message.content()?;
1527-
let Ok(ddl_message) = parse_ddl_schema_change_message(content) else {
1527+
let Ok(schema_change_message) = parse_schema_change_message(content) else {
15281528
bail!(
15291529
ErrorKind::SourceConnectionFailed,
15301530
"PostgreSQL connection has been closed during the apply loop"
15311531
);
15321532
};
15331533

1534-
let table_id = TableId::new(ddl_message.table_id as u32);
1534+
let table_id = TableId::new(schema_change_message.table_id as u32);
15351535
// TODO: check if this check is required or we can leverage the idempotency of schema writing and
15361536
// we always unconditionally update the schema.
15371537
if let Some(remote_final_lsn) = state.remote_final_lsn {
@@ -1544,26 +1544,27 @@ where
15441544
}
15451545

15461546
info!(
1547-
table_id = ddl_message.table_id,
1548-
table_name = %ddl_message.table_name,
1549-
schema_name = %ddl_message.schema_name,
1550-
event = %ddl_message.event,
1551-
columns = ddl_message.columns.len(),
1547+
table_id = schema_change_message.table_id,
1548+
table_name = %schema_change_message.table_name,
1549+
schema_name = %schema_change_message.schema_name,
1550+
event = %schema_change_message.event,
1551+
columns = schema_change_message.columns.len(),
15521552
"received ddl schema change message"
15531553
);
15541554

15551555
// Build table schema from DDL message with start_lsn as the snapshot_id.
15561556
let snapshot_id: SnapshotId = u64::from(start_lsn) as i64;
1557-
let table_schema = ddl_message_to_table_schema(&ddl_message, snapshot_id);
1557+
let table_schema = schema_change_message.into_table_schema(snapshot_id);
15581558

1559-
// Store the new schema version.
1559+
// Store the new schema version in the store.
15601560
schema_store.store_table_schema(table_schema).await?;
15611561

15621562
// Update the current schema snapshot in the state.
15631563
state.update_schema_snapshot(snapshot_id);
15641564

1565+
let table_id: u32 = table_id.into();
15651566
info!(
1566-
table_id = ddl_message.table_id,
1567+
table_id = table_id,
15671568
snapshot_id = snapshot_id,
15681569
"stored new schema version from ddl message"
15691570
);

0 commit comments

Comments
 (0)