From ab66ec8f35c5ae8501d78b4d3b4687cafc72abeb Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Fri, 14 Nov 2025 21:16:52 +0100 Subject: [PATCH 1/5] feat(destinations): add redis destination Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- README.md | 12 ++ etl-destinations/Cargo.toml | 20 +- etl-destinations/src/lib.rs | 2 + etl-destinations/src/redis/client.rs | 199 +++++++++++++++++++ etl-destinations/src/redis/json_cell.rs | 113 +++++++++++ etl-destinations/src/redis/mod.rs | 249 ++++++++++++++++++++++++ etl-destinations/tests/redis.rs | 166 ++++++++++++++++ etl-destinations/tests/support/mod.rs | 1 + etl-destinations/tests/support/redis.rs | 48 +++++ etl-examples/Cargo.toml | 8 +- etl-examples/README.md | 78 +++++++- etl-examples/src/main.rs | 229 ---------------------- etl/src/replication/apply.rs | 9 +- 13 files changed, 884 insertions(+), 250 deletions(-) create mode 100644 etl-destinations/src/redis/client.rs create mode 100644 etl-destinations/src/redis/json_cell.rs create mode 100644 etl-destinations/src/redis/mod.rs create mode 100644 etl-destinations/tests/redis.rs create mode 100644 etl-destinations/tests/support/redis.rs delete mode 100644 etl-examples/src/main.rs diff --git a/README.md b/README.md index e598b2e34..d58c7fcee 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,18 @@ etl = { git = "https://github.com/supabase/etl" } etl-destinations = { git = "https://github.com/supabase/etl", features = ["bigquery"] } ``` +- Redis + +Out-of-the-box destinations are available in the `etl-destinations` crate: + +```toml +[dependencies] +etl = { git = "https://github.com/supabase/etl" } +etl-destinations = { git = "https://github.com/supabase/etl", features = ["redis"] } +``` + +> Current limitation: doesn't support Redis in cluster mode yet. + ## License Apache‑2.0. See `LICENSE` for details. diff --git a/etl-destinations/Cargo.toml b/etl-destinations/Cargo.toml index c962347e2..68c11fed2 100644 --- a/etl-destinations/Cargo.toml +++ b/etl-destinations/Cargo.toml @@ -8,10 +8,17 @@ repository.workspace = true homepage.workspace = true [features] +redis = [ + "dep:tracing", + "dep:tokio", + "dep:fred", + "dep:uuid", + "dep:serde", + "dep:serde_json", +] bigquery = [ "dep:gcp-bigquery-client", "dep:prost", - "dep:rustls", "dep:tracing", "dep:tokio", "dep:base64", @@ -20,7 +27,6 @@ iceberg = [ "dep:iceberg", "dep:iceberg-catalog-rest", "dep:arrow", - "dep:rustls", "dep:tracing", "dep:parquet", "dep:uuid", @@ -47,15 +53,17 @@ iceberg-catalog-rest = { workspace = true, optional = true } parquet = { workspace = true, optional = true, features = ["async", "arrow"] } prost = { workspace = true, optional = true } reqwest = { workspace = true, optional = true, features = ["json"] } -rustls = { workspace = true, optional = true, features = [ +rustls = { workspace = true, features = [ "aws-lc-rs", "logging", ] } serde = { workspace = true, optional = true, features = ["derive"] } -serde_json = { workspace = true, optional = true } -tokio = { workspace = true, optional = true, features = ["sync"] } +serde_json = {optional = true, workspace = true } +tokio = { workspace = true, optional = true, features = ["full", "sync"] } tracing = { workspace = true, optional = true, default-features = true } -uuid = { workspace = true, optional = true, features = ["v4"] } +uuid = { workspace = true, optional = true, features = ["v4", "serde"] } +fred = { version = "10.1.0", optional = true, features = ["i-redis-json"] } +futures.workspace = true [dev-dependencies] etl = { workspace = true, features = ["test-utils"] } diff --git a/etl-destinations/src/lib.rs b/etl-destinations/src/lib.rs index da4453412..7c149181c 100644 --- a/etl-destinations/src/lib.rs +++ b/etl-destinations/src/lib.rs @@ -8,3 +8,5 @@ pub mod bigquery; pub mod encryption; #[cfg(feature = "iceberg")] pub mod iceberg; +#[cfg(feature = "redis")] +pub mod redis; diff --git a/etl-destinations/src/redis/client.rs b/etl-destinations/src/redis/client.rs new file mode 100644 index 000000000..5b848e6b6 --- /dev/null +++ b/etl-destinations/src/redis/client.rs @@ -0,0 +1,199 @@ +use etl::types::{TableRow, TableSchema}; +use fred::clients::Pipeline; +use fred::prelude::{ + Client, ClientLike, EventInterface, FredResult, KeysInterface, Pool, ReconnectPolicy, Server, + ServerConfig, TcpConfig, +}; +use fred::types::config::UnresponsiveConfig; +use fred::types::{Builder, Expiration, Key}; +use futures::future::join_all; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; +use tracing::{debug, error}; + +use crate::redis::RedisConfig; +use crate::redis::json_cell::JsonCell; + +#[derive(Clone)] +pub(super) struct RedisClient { + client: Pool, + ttl: Option, +} + +impl RedisClient { + pub(super) async fn new(config: RedisConfig) -> FredResult { + let pooled_client = Builder::default_centralized() + .with_config(|redis_config| { + redis_config.password = config.password; + redis_config.username = config.username; + redis_config.server = ServerConfig::Centralized { + server: Server::new(config.host, config.port), + }; + }) + .with_connection_config(|config| { + config.internal_command_timeout = Duration::from_secs(5); + config.reconnect_on_auth_error = true; + config.tcp = TcpConfig { + #[cfg(target_os = "linux")] + user_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }; + config.unresponsive = UnresponsiveConfig { + max_timeout: Some(Duration::from_secs(10)), + interval: Duration::from_secs(3), + }; + }) + .with_performance_config(|config| { + config.default_command_timeout = Duration::from_secs(5); + }) + .set_policy(ReconnectPolicy::new_exponential(0, 1, 2000, 5)) + .build_pool(5)?; + + for client in pooled_client.clients() { + // spawn tasks that listen for connection close or reconnect events + let mut error_rx = client.error_rx(); + let mut reconnect_rx = client.reconnect_rx(); + let mut unresponsive_rx = client.unresponsive_rx(); + + tokio::spawn(async move { + loop { + match error_rx.recv().await { + Ok((error, Some(server))) => { + error!("Redis client ({server:?}) error: {error:?}",); + } + Ok((error, None)) => { + error!("Redis client error: {error:?}",); + } + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + }); + + tokio::spawn(async move { + loop { + match unresponsive_rx.recv().await { + Ok(server) => { + error!("Redis client ({server:?}) unresponsive"); + } + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + }); + + tokio::spawn(async move { + loop { + match reconnect_rx.recv().await { + Ok(server) => { + debug!("Redis client connected to {server:?}") + } + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + }); + } + let client_handles = pooled_client.connect_pool(); + + debug!("Wait for connect"); + pooled_client.wait_for_connect().await?; + debug!("Connected"); + + tokio::spawn(async move { + let _results = join_all(client_handles).await; + }); + + Ok(Self { + client: pooled_client, + ttl: config.ttl.map(Expiration::EX), + }) + } + + // Doesn't work with redis cluster + pub(super) async fn delete_by_table_name(&self, table_name: &str) -> FredResult { + let pattern = format!("{}::::*", table_name); + let mut cursor = "0".to_string(); + let mut total_deleted = 0u64; + + loop { + let (next_cursor, keys): (String, Vec) = self + .client + .scan_page(cursor, pattern.clone(), Some(100), None) + .await?; + + if !keys.is_empty() { + let deleted: i64 = self.client.unlink(keys).await?; + total_deleted += deleted as u64; + } + + cursor = next_cursor; + if cursor == "0" { + break; + } + } + + Ok(total_deleted) + } + + pub(super) async fn delete(&self, key: RedisKey) -> FredResult<()> { + self.client.del::<(), _>(key).await + } + + pub(super) fn pipeline(&self) -> Pipeline { + self.client.next_connected().pipeline() + } + + pub(super) async fn set( + &self, + pipeline: &Pipeline, + key: RedisKey, + map: HashMap>, + ) -> Result<(), Box> { + // We don't use json to be compliant with older versions of redis + pipeline + .set::<(), _, _>( + key, + serde_json::to_string(&map)?, + self.ttl.clone(), + None, + false, + ) + .await?; + + Ok(()) + } +} + +pub(super) struct RedisKey(String); + +impl RedisKey { + pub(super) fn new(table_schema: &TableSchema, table_row: &TableRow) -> Self { + let table_name = &table_schema.name.name; + let primary_key = table_schema + .column_schemas + .iter() + .enumerate() + .filter_map(|(idx, col_schema)| { + if col_schema.primary { + let value = + serde_json::to_string(&JsonCell::Ref(table_row.values.get(idx)?)).ok()?; // FIXME: should be a parameter to avoid serializing twice + let col_name = &col_schema.name; + Some(format!("{col_name}:{value}")) + } else { + None + } + }) + .collect::>() + .join("::::"); + + Self(format!("{table_name}::::{primary_key}")) + } +} + +impl From for Key { + fn from(val: RedisKey) -> Self { + Key::from(val.0) + } +} diff --git a/etl-destinations/src/redis/json_cell.rs b/etl-destinations/src/redis/json_cell.rs new file mode 100644 index 000000000..6723c25c3 --- /dev/null +++ b/etl-destinations/src/redis/json_cell.rs @@ -0,0 +1,113 @@ +use etl::types::{ArrayCell, Cell, PgNumeric, Sign}; +use serde::ser::SerializeSeq; +use serde::{Serialize, Serializer}; + +pub(super) enum JsonCell<'a> { + Value(Cell), + Ref(&'a Cell), +} + +impl<'a> AsRef for JsonCell<'a> { + fn as_ref(&self) -> &Cell { + match &self { + JsonCell::Value(cell) => cell, + JsonCell::Ref(cell) => cell, + } + } +} + +impl<'a> Serialize for JsonCell<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self.as_ref() { + Cell::Null => serializer.serialize_none(), + Cell::Bool(b) => b.serialize(serializer), + Cell::String(string) => string.serialize(serializer), + Cell::I16(number) => number.serialize(serializer), + Cell::I32(number) => number.serialize(serializer), + Cell::U32(number) => number.serialize(serializer), + Cell::I64(number) => number.serialize(serializer), + Cell::F32(number) => number.serialize(serializer), + Cell::F64(number) => number.serialize(serializer), + Cell::Numeric(pg_numeric) => match pg_numeric { + etl::types::PgNumeric::NaN => serializer.serialize_none(), + etl::types::PgNumeric::PositiveInfinity => serializer.serialize_none(), + etl::types::PgNumeric::NegativeInfinity => serializer.serialize_none(), + etl::types::PgNumeric::Value { sign, .. } => match sign { + Sign::Positive => pg_numeric + .to_string() + .parse::() + .unwrap() //FIXME + // .map_err(|err| format!("cannot parse pg numeric to u64: {err}"))? + .serialize(serializer), + Sign::Negative => pg_numeric + .to_string() + .parse::() + .unwrap() //FIXME + // .map_err(|err| format!("cannot parse pg numeric to i64: {err}"))? + .serialize(serializer), + }, + }, + Cell::Date(naive_date) => naive_date.serialize(serializer), + Cell::Time(naive_time) => naive_time.serialize(serializer), + Cell::Timestamp(naive_date_time) => naive_date_time.serialize(serializer), + Cell::TimestampTz(date_time) => date_time.serialize(serializer), + Cell::Uuid(uuid) => uuid.serialize(serializer), + Cell::Json(value) => value.serialize(serializer), + Cell::Bytes(items) => items.serialize(serializer), + Cell::Array(array_cell) => match array_cell { + ArrayCell::Bool(items) => items.serialize(serializer), + ArrayCell::String(items) => items.serialize(serializer), + ArrayCell::I16(items) => items.serialize(serializer), + ArrayCell::I32(items) => items.serialize(serializer), + ArrayCell::U32(items) => items.serialize(serializer), + ArrayCell::I64(items) => items.serialize(serializer), + ArrayCell::F32(items) => items.serialize(serializer), + ArrayCell::F64(items) => items.serialize(serializer), + ArrayCell::Numeric(pg_numerics) => { + let mut seq = serializer.serialize_seq(Some(pg_numerics.len()))?; + for elt in pg_numerics { + match elt { + Some(PgNumeric::NaN) + | Some(PgNumeric::NegativeInfinity) + | Some(PgNumeric::PositiveInfinity) => { + seq.serialize_element::>(&None)? + } + Some(PgNumeric::Value { sign, .. }) => match sign { + Sign::Positive => { + seq.serialize_element( + &elt.as_ref().unwrap().to_string().parse::().unwrap(), // FIXME + // .map_err( + // |err| format!("cannot parse pg numeric to u64: {err}"), + // )? + )?; + } + Sign::Negative => { + seq.serialize_element( + &elt.as_ref().unwrap().to_string().parse::().unwrap(), //FIXME + // .map_err( + // |err| format!("cannot parse pg numeric to i64: {err}"), + // )?, + )?; + } + }, + None => { + seq.serialize_element::>(&None)?; + } + } + } + seq.end() + } + ArrayCell::Date(naive_dates) => naive_dates.serialize(serializer), + ArrayCell::Time(naive_times) => naive_times.serialize(serializer), + ArrayCell::Timestamp(naive_date_times) => naive_date_times.serialize(serializer), + ArrayCell::TimestampTz(date_times) => date_times.serialize(serializer), + ArrayCell::Uuid(uuids) => uuids.serialize(serializer), + ArrayCell::Json(values) => values.serialize(serializer), + ArrayCell::Bytes(items) => items.serialize(serializer), + }, + } + } +} diff --git a/etl-destinations/src/redis/mod.rs b/etl-destinations/src/redis/mod.rs new file mode 100644 index 000000000..06819d3a8 --- /dev/null +++ b/etl-destinations/src/redis/mod.rs @@ -0,0 +1,249 @@ +mod client; +mod json_cell; + +use etl::destination::Destination; +use etl::error::{ErrorKind, EtlError, EtlResult}; +use etl::store::schema::SchemaStore; +use etl::store::state::StateStore; +use etl::types::{Cell, ColumnSchema, Event, TableId, TableRow, TableSchema}; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::debug; + +use crate::redis::client::{RedisClient, RedisKey}; +use crate::redis::json_cell::JsonCell; + +pub type MapFunction = Arc Cell + Send + Sync>; + +#[derive(Clone)] +pub struct RedisDestination { + redis_client: RedisClient, + store: S, + /// If you want to change data before inserting it in Redis + map_insert: Option, + /// If you want to change data before updating it in Redis + map_update: Option, +} + +#[derive(Clone, Debug)] +pub struct RedisConfig { + /// Host on which Redis is running (e.g., localhost or IP address) (default: 127.0.0.1) + pub host: String, + /// Port on which Redis is running (default: 6379) + pub port: u16, + /// Redis database user name + pub username: Option, + /// Redis database user password (optional if using trust authentication) + pub password: Option, + /// Redis TTL if you want to set a TTL on replicated data in Redis + pub ttl: Option, +} + +impl RedisDestination +where + S: StateStore + SchemaStore, +{ + /// Creates a new Redis destination. + pub async fn new(config: RedisConfig, store: S) -> EtlResult { + let redis_client = RedisClient::new(config).await.map_err(|err| { + EtlError::from(( + ErrorKind::DestinationConnectionFailed, + "cannot connect to redis", + err.to_string(), + )) + })?; + + Ok(Self { + redis_client, + store, + map_insert: None, + map_update: None, + }) + } + + /// If you want to change data before inserting it in Redis + pub fn map_insert(mut self, f: F) -> Self + where + F: Fn(Cell, &TableSchema, &ColumnSchema) -> Cell + Send + Sync + 'static, + { + self.map_insert = Some(Arc::new(f)); + self + } + + /// If you want to change data before updating it in Redis + pub fn map_update(mut self, f: F) -> Self + where + F: Fn(Cell, &TableSchema, &ColumnSchema) -> Cell + Send + Sync + 'static, + { + self.map_update = Some(Arc::new(f)); + self + } + + async fn _write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + is_update: bool, + ) -> EtlResult<()> { + let table_schema = self + .store + .get_table_schema(&table_id) + .await? + .ok_or_else(|| { + EtlError::from((ErrorKind::MissingTableSchema, "cannot get table schema")) + })?; + + debug!("writing a batch of {} table rows:", table_rows.len()); + let pipeline = self.redis_client.pipeline(); + for table_row in table_rows { + let key = RedisKey::new(table_schema.as_ref(), &table_row); + let map: HashMap = table_row + .values + .into_iter() + .enumerate() + .filter_map(|(idx, cell)| { + let column_schema = table_schema.column_schemas.get(idx)?; + let map_func = if is_update { + &self.map_update + } else { + &self.map_insert + }; + let cell = match map_func { + Some(map_func) => map_func(cell, table_schema.as_ref(), column_schema), + None => cell, + }; + + Some((column_schema.name.clone(), JsonCell::Value(cell))) + }) + .collect(); + self.redis_client + .set(&pipeline, key, map) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot set in redis", + err.to_string(), + )) + })?; + } + + pipeline.all().await.map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot execute pipeline in redis", + err.to_string(), + )) + }) + } +} + +impl Destination for RedisDestination +where + S: StateStore + SchemaStore + Send + Sync, +{ + fn name() -> &'static str { + "redis" + } + + async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> { + let Some(current_table_name) = self.store.get_table_mapping(&table_id).await? else { + return Ok(()); + }; + + debug!("truncating table {}", current_table_name); + self.redis_client + .delete_by_table_name(¤t_table_name) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot delete key in Redis", + err.to_string(), + )) + })?; + + Ok(()) + } + + async fn write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + ) -> EtlResult<()> { + self._write_table_rows(table_id, table_rows, false).await + } + + async fn write_events(&self, events: Vec) -> EtlResult<()> { + debug!("writing a batch of {} events:", events.len()); + + for event in events { + debug!(?event); + match event { + Event::Insert(insert_event) => { + self.write_table_rows(insert_event.table_id, vec![insert_event.table_row]) + .await?; + } + Event::Update(update_event) => { + self._write_table_rows( + update_event.table_id, + vec![update_event.table_row], + true, + ) + .await?; + } + Event::Delete(delete_event) => { + let Some(table_schema) = self + .store + .get_table_schema(&delete_event.table_id) + .await + .ok() + .flatten() + else { + continue; + }; + let Some((_only_keys, old_table_row)) = delete_event.old_table_row else { + continue; + }; + self.redis_client + .delete(RedisKey::new(table_schema.as_ref(), &old_table_row)) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot del in redis", + err.to_string(), + )) + })?; + } + Event::Truncate(truncate_event) => { + for table_id in truncate_event.rel_ids { + let Some(table_schema) = self + .store + .get_table_schema(&TableId(table_id)) + .await + .ok() + .flatten() + else { + continue; + }; + + self.redis_client + .delete_by_table_name(&table_schema.name.name) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot del in redis", + err.to_string(), + )) + })?; + } + } + Event::Begin(_) | Event::Commit(_) | Event::Relation(_) | Event::Unsupported => {} + } + } + + Ok(()) + } +} diff --git a/etl-destinations/tests/redis.rs b/etl-destinations/tests/redis.rs new file mode 100644 index 000000000..a0ea5e2de --- /dev/null +++ b/etl-destinations/tests/redis.rs @@ -0,0 +1,166 @@ +#![cfg(feature = "redis")] + +use etl::state::table::TableReplicationPhaseType; +use etl::test_utils::database::spawn_source_database; +use etl::test_utils::notify::NotifyingStore; +use etl::test_utils::pipeline::create_pipeline; +use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; +use etl::test_utils::test_schema::{TableSelection, setup_test_database_schema}; +use etl::types::{EventType, PipelineId}; +use etl_destinations::encryption::install_crypto_provider; +use etl_destinations::redis::{RedisConfig, RedisDestination}; +use etl_telemetry::tracing::init_test_tracing; +use fred::prelude::KeysInterface; +use rand::random; +use serde_json::Value; + +mod support; + +#[tokio::test(flavor = "multi_thread")] +async fn table_insert_update_delete() { + init_test_tracing(); + install_crypto_provider(); + + let database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let pipeline_id: PipelineId = random(); + let redis_cfg = RedisConfig { + host: String::from("127.0.0.1"), + port: 6379, + username: None, + password: None, + ttl: None, + }; + let redis = RedisDestination::new(redis_cfg.clone(), store.clone()) + .await + .unwrap(); + let destination = TestDestinationWrapper::wrap(redis); + + // Start pipeline from scratch. + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + destination.clone(), + ); + + // Register notifications for table copy completion. + let users_state_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::SyncDone, + ) + .await; + + pipeline.start().await.unwrap(); + + users_state_notify.notified().await; + + // Wait for the first insert. + let event_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + // Insert a row. + database + .insert_values( + database_schema.users_schema().name.clone(), + &["name", "age"], + &[&"user_1", &1], + ) + .await + .unwrap(); + + event_notify.notified().await; + + let redis_client = crate::support::redis::create_redis_connection(redis_cfg) + .await + .unwrap(); + + // Test if it has been inserted in redis + let user_row: String = redis_client + .get(format!( + "{}::::id:1", + database_schema.users_schema().name.name + )) + .await + .unwrap(); + let user_value: Value = serde_json::from_str(&user_row).unwrap(); + assert_eq!( + serde_json::json!({ + "id": 1, + "name": "user_1", + "age": 1 + }), + user_value + ); + + // Wait for the update. + let event_notify = destination + .wait_for_events_count(vec![(EventType::Update, 1)]) + .await; + + // Update the row. + database + .update_values( + database_schema.users_schema().name.clone(), + &["name", "age"], + &[&"user_10", &10], + ) + .await + .unwrap(); + + event_notify.notified().await; + + // Test if it has been updated in redis + let user_row: String = redis_client + .get(format!( + "{}::::id:1", + database_schema.users_schema().name.name + )) + .await + .unwrap(); + let user_value: Value = serde_json::from_str(&user_row).unwrap(); + assert_eq!( + serde_json::json!({ + "id": 1, + "name": "user_10", + "age": 10 + }), + user_value + ); + + // Wait for the update. + let event_notify = destination + .wait_for_events_count(vec![(EventType::Delete, 1)]) + .await; + + // Update the row. + database + .delete_values( + database_schema.users_schema().name.clone(), + &["name"], + &["'user_10'"], + "", + ) + .await + .unwrap(); + + event_notify.notified().await; + + pipeline.shutdown_and_wait().await.unwrap(); + + // Test if it has been removed in redis + assert!( + redis_client + .get::(format!( + "{}::::id:1", + database_schema.users_schema().name.name + )) + .await + .is_err() + ); +} diff --git a/etl-destinations/tests/support/mod.rs b/etl-destinations/tests/support/mod.rs index 99cac4bcf..77aab2824 100644 --- a/etl-destinations/tests/support/mod.rs +++ b/etl-destinations/tests/support/mod.rs @@ -1,3 +1,4 @@ pub mod bigquery; pub mod iceberg; pub mod lakekeeper; +pub mod redis; diff --git a/etl-destinations/tests/support/redis.rs b/etl-destinations/tests/support/redis.rs new file mode 100644 index 000000000..d5a0696be --- /dev/null +++ b/etl-destinations/tests/support/redis.rs @@ -0,0 +1,48 @@ +#![cfg(feature = "redis")] + +use etl_destinations::redis::RedisConfig; +use fred::prelude::{ + ClientLike, FredResult, Pool, ReconnectPolicy, Server, ServerConfig, TcpConfig, +}; +use fred::types::Builder; +use fred::types::config::UnresponsiveConfig; +use futures::future::join_all; +use std::time::Duration; + +pub async fn create_redis_connection(config: RedisConfig) -> FredResult { + let pooled_client = Builder::default_centralized() + .with_config(|redis_config| { + redis_config.password = config.password; + redis_config.username = config.username; + redis_config.server = ServerConfig::Centralized { + server: Server::new(config.host, config.port), + }; + }) + .with_connection_config(|config| { + config.internal_command_timeout = Duration::from_secs(5); + config.reconnect_on_auth_error = true; + config.tcp = TcpConfig { + #[cfg(target_os = "linux")] + user_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }; + config.unresponsive = UnresponsiveConfig { + max_timeout: Some(Duration::from_secs(10)), + interval: Duration::from_secs(3), + }; + }) + .with_performance_config(|config| { + config.default_command_timeout = Duration::from_secs(5); + }) + .set_policy(ReconnectPolicy::new_exponential(0, 1, 2000, 5)) + .build_pool(1)?; + + let client_handles = pooled_client.connect_pool(); + pooled_client.wait_for_connect().await?; + + tokio::spawn(async move { + let _results = join_all(client_handles).await; + }); + + Ok(pooled_client) +} diff --git a/etl-examples/Cargo.toml b/etl-examples/Cargo.toml index 447f71d49..af81e2ced 100644 --- a/etl-examples/Cargo.toml +++ b/etl-examples/Cargo.toml @@ -9,7 +9,7 @@ homepage.workspace = true [dependencies] etl = { workspace = true } -etl-destinations = { workspace = true, features = ["bigquery"] } +etl-destinations = { workspace = true, features = ["bigquery", "redis"] } clap = { workspace = true, default-features = true, features = [ "std", @@ -18,3 +18,9 @@ clap = { workspace = true, default-features = true, features = [ tokio = { workspace = true, features = ["macros", "signal"] } tracing = { workspace = true, default-features = true } tracing-subscriber = { workspace = true, default-features = true, features = ["env-filter"] } + +[[bin]] +name = "bigquery" + +[[bin]] +name = "redis" diff --git a/etl-examples/README.md b/etl-examples/README.md index a9c95a3eb..bdca4b4b2 100644 --- a/etl-examples/README.md +++ b/etl-examples/README.md @@ -5,8 +5,16 @@ This crate contains practical examples demonstrating how to use the ETL system f ## Available Examples - **BigQuery Integration**: Demonstrates replicating Postgres data to Google BigQuery +- **Redis Integration**: Demonstrates replicating Postgres data to Redis (to benefit from Redis as a cache for auth tokens for example) -## Quick Start +## Prerequisites + +Before running the examples, you'll need to set up a Postgres database with logical replication enabled. + + +## BigQuery + +### Quick Start To quickly try out `etl`, you can run the BigQuery example. First, create a publication in Postgres which includes the tables you want to replicate: @@ -18,7 +26,7 @@ for table table1, table2; Then run the BigQuery example: ```bash -cargo run -p etl-examples -- \ +cargo run --bin bigquery -p etl-examples -- \ --db-host localhost \ --db-port 5432 \ --db-name postgres \ @@ -32,11 +40,7 @@ cargo run -p etl-examples -- \ In the above example, `etl` connects to a Postgres database named `postgres` running on `localhost:5432` with a username `postgres` and password `password`. -## Prerequisites - -Before running the examples, you'll need to set up a Postgres database with logical replication enabled. - -## BigQuery Setup +### BigQuery Setup To run the BigQuery example, you'll need: @@ -45,3 +49,63 @@ To run the BigQuery example, you'll need: 3. A BigQuery dataset created in your project The example will automatically create tables in the specified dataset based on your Postgres schema. + + +## Redis + +### Quick Start + +To quickly try out `etl`, you can run the Redis example. First, create a publication in Postgres which includes the tables you want to replicate: + +```sql +create publication my_publication +for table table1, table2; +``` + +Then run the Redis example: + +```bash +cargo run --bin redis -p etl-examples -- \ + --db-host localhost \ + --db-port 5432 \ + --db-name postgres \ + --db-username postgres \ + --db-password password \ + --publication my_publication +``` + +Usage of the cli: + +```bash +Usage: redis [OPTIONS] --db-name --db-username --publication + +Options: + --db-host + Host on which Postgres is running (e.g., localhost or IP address) (default: 127.0.0.1) [default: 127.0.0.1] + --db-port + Port on which Postgres is running (default: 5432) [default: 5432] + --db-name + Postgres database name to connect to + --db-username + Postgres database user name (must have REPLICATION privileges) + --db-password + Postgres database user password (optional if using trust authentication) + --redis-host + Host on which Redis is running (e.g., localhost or IP address) (default: 127.0.0.1) [default: 127.0.0.1] + --redis-port + Port on which Redis is running (default: 6379) [default: 6379] + --redis-username + Redis database user name + --redis-password + Redis database user password (optional if using trust authentication) + --redis-ttl + Set a TTL (in seconds) for data replicated in Redis (optional) + --publication + Postgres publication name (must be created beforehand with CREATE PUBLICATION) + -h, --help + Print help + -V, --version + Print version +``` + +In the above example, `etl` connects to a Postgres database named `postgres` running on `localhost:5432` with a username `postgres` and password `password` on a local instance of Redis. diff --git a/etl-examples/src/main.rs b/etl-examples/src/main.rs deleted file mode 100644 index 37995a044..000000000 --- a/etl-examples/src/main.rs +++ /dev/null @@ -1,229 +0,0 @@ -/* - -BigQuery Example - -This example demonstrates how to use the pipeline to stream -data from Postgres to BigQuery using change data capture (CDC). - -Prerequisites: -1. Postgres server with logical replication enabled (wal_level = logical) -2. A publication created in Postgres (CREATE PUBLICATION my_publication FOR ALL TABLES;) -3. GCP service account with BigQuery Data Editor and Job User permissions -4. Service account key file downloaded from GCP console - -Usage: - cargo run --example bigquery --features bigquery -- \ - --db-host localhost \ - --db-port 5432 \ - --db-name mydb \ - --db-username postgres \ - --db-password mypassword \ - --bq-sa-key-file /path/to/service-account-key.json \ - --bq-project-id my-gcp-project \ - --bq-dataset-id my_dataset \ - --publication my_publication - -The pipeline will automatically: -- Create tables in BigQuery matching your Postgres schema -- Perform initial data sync for existing tables -- Stream real-time changes using logical replication -- Handle schema changes and DDL operations - -*/ - -use clap::{Args, Parser}; -use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; -use etl::pipeline::Pipeline; -use etl::store::both::memory::MemoryStore; -use etl_destinations::bigquery::BigQueryDestination; -use etl_destinations::encryption::install_crypto_provider; -use std::error::Error; -use tokio::signal; -use tracing::{error, info}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -// Main application arguments combining database and BigQuery configurations -#[derive(Debug, Parser)] -#[command(name = "bigquery", version, about, arg_required_else_help = true)] -struct AppArgs { - // Postgres connection parameters - #[clap(flatten)] - db_args: DbArgs, - // BigQuery destination parameters - #[clap(flatten)] - bq_args: BqArgs, - /// Postgres publication name (must be created beforehand with CREATE PUBLICATION) - #[arg(long)] - publication: String, -} - -// Postgres database connection configuration -#[derive(Debug, Args)] -struct DbArgs { - /// Host on which Postgres is running (e.g., localhost or IP address) - #[arg(long)] - db_host: String, - /// Port on which Postgres is running (default: 5432) - #[arg(long)] - db_port: u16, - /// Postgres database name to connect to - #[arg(long)] - db_name: String, - /// Postgres database user name (must have REPLICATION privileges) - #[arg(long)] - db_username: String, - /// Postgres database user password (optional if using trust authentication) - #[arg(long)] - db_password: Option, -} - -// BigQuery destination configuration -#[derive(Debug, Args)] -struct BqArgs { - /// Path to GCP service account key JSON file (download from GCP Console > IAM & Admin > Service Accounts) - #[arg(long)] - bq_sa_key_file: String, - /// BigQuery project ID (found in GCP Console project selector) - #[arg(long)] - bq_project_id: String, - /// BigQuery dataset ID (must exist in the specified project) - #[arg(long)] - bq_dataset_id: String, - /// Maximum batch size for processing events (higher values = better throughput, more memory usage) - #[arg(long, default_value = "1000")] - max_batch_size: usize, - /// Maximum time to wait for a batch to fill in milliseconds (lower values = lower latency, less throughput) - #[arg(long, default_value = "5000")] - max_batch_fill_duration_ms: u64, - /// Maximum number of concurrent table sync workers (higher values = faster initial sync, more resource usage) - #[arg(long, default_value = "4")] - max_table_sync_workers: u16, -} - -// Entry point - handles error reporting and process exit -#[tokio::main] -async fn main() -> Result<(), Box> { - if let Err(e) = main_impl().await { - error!("{e}"); - std::process::exit(1); - } - - Ok(()) -} - -// Initialize structured logging with configurable log levels via RUST_LOG environment variable -fn init_tracing() { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "bigquery=info".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); -} - -// Set default log level if RUST_LOG environment variable is not set -fn set_log_level() { - if std::env::var("RUST_LOG").is_err() { - unsafe { - std::env::set_var("RUST_LOG", "info"); - } - } -} - -// Main implementation function containing all the pipeline setup and execution logic -async fn main_impl() -> Result<(), Box> { - // Set up logging and tracing - set_log_level(); - init_tracing(); - - // Install required crypto provider for authentication - install_crypto_provider(); - - // Parse command line arguments - let args = AppArgs::parse(); - - // Configure Postgres connection settings - // Note: TLS is disabled in this example - enable for production use - let pg_connection_config = PgConnectionConfig { - host: args.db_args.db_host, - port: args.db_args.db_port, - name: args.db_args.db_name, - username: args.db_args.db_username, - password: args.db_args.db_password.map(Into::into), - tls: TlsConfig { - trusted_root_certs: String::new(), - enabled: false, // Set to true and provide certs for production - }, - }; - - // Create in-memory store for tracking table replication states and table schemas - // In production, you might want to use a persistent store like PostgresStore - let store = MemoryStore::new(); - - // Create pipeline configuration with batching and retry settings - let pipeline_config = PipelineConfig { - id: 1, // Using a simple ID for the example - publication_name: args.publication, - pg_connection: pg_connection_config, - batch: BatchConfig { - max_size: args.bq_args.max_batch_size, - max_fill_ms: args.bq_args.max_batch_fill_duration_ms, - }, - table_error_retry_delay_ms: 10000, - table_error_retry_max_attempts: 5, - max_table_sync_workers: args.bq_args.max_table_sync_workers, - }; - - // Initialize BigQuery destination with service account authentication - // Tables will be automatically created to match Postgres schema - let bigquery_destination = BigQueryDestination::new_with_key_path( - args.bq_args.bq_project_id, - args.bq_args.bq_dataset_id, - &args.bq_args.bq_sa_key_file, - None, - 1, - store.clone(), - ) - .await?; - - // Create the pipeline instance with all components - let mut pipeline = Pipeline::new(pipeline_config, store, bigquery_destination); - - info!( - "Starting BigQuery CDC pipeline - connecting to Postgres and initializing replication..." - ); - - // Start the pipeline - this will: - // 1. Connect to Postgres - // 2. Initialize table states based on the publication - // 3. Start apply and table sync workers - // 4. Begin streaming replication data - pipeline.start().await?; - - info!("Pipeline started successfully! Data replication is now active. Press Ctrl+C to stop."); - - // Set up signal handler for graceful shutdown on Ctrl+C - let shutdown_signal = async { - signal::ctrl_c() - .await - .expect("Failed to install Ctrl+C handler"); - info!("Received Ctrl+C signal, initiating graceful shutdown..."); - }; - - // Wait for either the pipeline to complete naturally or receive a shutdown signal - // The pipeline will run indefinitely unless an error occurs or it's manually stopped - tokio::select! { - result = pipeline.wait() => { - info!("Pipeline completed normally (this usually indicates an error condition)"); - result?; - } - _ = shutdown_signal => { - info!("Gracefully shutting down pipeline and cleaning up resources..."); - } - } - - info!("Pipeline stopped successfully. All resources cleaned up."); - - Ok(()) -} diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index a1f48e3da..1ddc87898 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -61,9 +61,10 @@ pub enum ApplyLoopResult { /// Action that should be taken during the apply loop. /// /// An action defines what to do after one iteration of the apply loop. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] pub enum ApplyLoopAction { /// The apply loop can continue on the next element. + #[default] Continue, /// The apply loop should pause processing. Pause, @@ -129,12 +130,6 @@ impl ApplyLoopAction { } } -impl Default for ApplyLoopAction { - fn default() -> Self { - Self::Continue - } -} - /// Hook trait for customizing apply loop behavior. /// /// [`ApplyLoopHook`] allows external components to inject custom logic into From 4bf35a9e4d6f2ce29fa6a7b0162d554b48cb8fbd Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Fri, 14 Nov 2025 21:26:34 +0100 Subject: [PATCH 2/5] add redis in docker-compose Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- scripts/docker-compose.yaml | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/scripts/docker-compose.yaml b/scripts/docker-compose.yaml index bc4d07fc0..fa248baa9 100644 --- a/scripts/docker-compose.yaml +++ b/scripts/docker-compose.yaml @@ -8,6 +8,10 @@ x-lakekeeper-env: &lakekeeper-env name: etl-pg-${POSTGRES_VERSION:-17} services: + redis: + image: redis:latest + ports: + - 6379:6379 # Start Postgres postgres: image: postgres:${POSTGRES_VERSION:-17} @@ -33,19 +37,23 @@ services: retries: 5 lakekeeper-postgres: - image: postgres:17 - environment: - POSTGRES_USER: ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} - POSTGRES_PASSWORD: ${LAKEKEEPER_POSTGRES_PASSWORD:-lakekeeper-postgres} - POSTGRES_DB: ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog} - volumes: - - ${LAKEKEEPER_POSTGRES_DATA_VOLUME:-lakekeeper_postgres_data}:/var/lib/postgresql/data - restart: unless-stopped - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} -d ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog}"] - interval: 5s - timeout: 5s - retries: 5 + image: postgres:17 + environment: + POSTGRES_USER: ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} + POSTGRES_PASSWORD: ${LAKEKEEPER_POSTGRES_PASSWORD:-lakekeeper-postgres} + POSTGRES_DB: ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog} + volumes: + - ${LAKEKEEPER_POSTGRES_DATA_VOLUME:-lakekeeper_postgres_data}:/var/lib/postgresql/data + restart: unless-stopped + healthcheck: + test: + [ + "CMD-SHELL", + "pg_isready -U ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} -d ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog}", + ] + interval: 5s + timeout: 5s + retries: 5 # Start MinIO for S3-compatible object storage minio: From edca1ff6feb701aaf3d18ddc61cae7261db7a8fb Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Fri, 14 Nov 2025 21:36:25 +0100 Subject: [PATCH 3/5] add filter Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- etl-destinations/src/redis/mod.rs | 19 +++++++++++++++++++ etl-destinations/tests/support/redis.rs | 1 + 2 files changed, 20 insertions(+) diff --git a/etl-destinations/src/redis/mod.rs b/etl-destinations/src/redis/mod.rs index 06819d3a8..0c9b0d0ca 100644 --- a/etl-destinations/src/redis/mod.rs +++ b/etl-destinations/src/redis/mod.rs @@ -14,12 +14,15 @@ use crate::redis::client::{RedisClient, RedisKey}; use crate::redis::json_cell::JsonCell; pub type MapFunction = Arc Cell + Send + Sync>; +pub type FilterFunction = Arc bool + Send + Sync>; #[derive(Clone)] pub struct RedisDestination { redis_client: RedisClient, store: S, /// If you want to change data before inserting it in Redis + filter: Option, + /// If you want to change data before inserting it in Redis map_insert: Option, /// If you want to change data before updating it in Redis map_update: Option, @@ -56,11 +59,21 @@ where Ok(Self { redis_client, store, + filter: None, map_insert: None, map_update: None, }) } + /// Filter the table row to insert/update + pub fn filter(mut self, f: F) -> Self + where + F: Fn(&TableRow, &TableSchema) -> bool + Send + Sync + 'static, + { + self.filter = Some(Arc::new(f)); + self + } + /// If you want to change data before inserting it in Redis pub fn map_insert(mut self, f: F) -> Self where @@ -96,6 +109,12 @@ where debug!("writing a batch of {} table rows:", table_rows.len()); let pipeline = self.redis_client.pipeline(); for table_row in table_rows { + if let Some(filter) = &self.filter + && !filter(&table_row, &table_schema) + { + debug!("skipping table row"); + continue; + } let key = RedisKey::new(table_schema.as_ref(), &table_row); let map: HashMap = table_row .values diff --git a/etl-destinations/tests/support/redis.rs b/etl-destinations/tests/support/redis.rs index d5a0696be..9ce866eab 100644 --- a/etl-destinations/tests/support/redis.rs +++ b/etl-destinations/tests/support/redis.rs @@ -9,6 +9,7 @@ use fred::types::config::UnresponsiveConfig; use futures::future::join_all; use std::time::Duration; +#[allow(dead_code)] pub async fn create_redis_connection(config: RedisConfig) -> FredResult { let pooled_client = Builder::default_centralized() .with_config(|redis_config| { From e77ed08437ac0f22240e068bcb04d286b723efdb Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Fri, 14 Nov 2025 21:41:24 +0100 Subject: [PATCH 4/5] better error handling Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- etl-destinations/src/redis/json_cell.rs | 36 ++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/etl-destinations/src/redis/json_cell.rs b/etl-destinations/src/redis/json_cell.rs index 6723c25c3..10681915a 100644 --- a/etl-destinations/src/redis/json_cell.rs +++ b/etl-destinations/src/redis/json_cell.rs @@ -39,14 +39,20 @@ impl<'a> Serialize for JsonCell<'a> { Sign::Positive => pg_numeric .to_string() .parse::() - .unwrap() //FIXME - // .map_err(|err| format!("cannot parse pg numeric to u64: {err}"))? + .map_err(|err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to u64: {err}" + )) + })? .serialize(serializer), Sign::Negative => pg_numeric .to_string() .parse::() - .unwrap() //FIXME - // .map_err(|err| format!("cannot parse pg numeric to i64: {err}"))? + .map_err(|err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to i64: {err}" + )) + })? .serialize(serializer), }, }, @@ -78,18 +84,24 @@ impl<'a> Serialize for JsonCell<'a> { Some(PgNumeric::Value { sign, .. }) => match sign { Sign::Positive => { seq.serialize_element( - &elt.as_ref().unwrap().to_string().parse::().unwrap(), // FIXME - // .map_err( - // |err| format!("cannot parse pg numeric to u64: {err}"), - // )? + &elt.as_ref().unwrap().to_string().parse::().map_err( + |err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to u64: {err}" + )) + }, + )?, )?; } Sign::Negative => { seq.serialize_element( - &elt.as_ref().unwrap().to_string().parse::().unwrap(), //FIXME - // .map_err( - // |err| format!("cannot parse pg numeric to i64: {err}"), - // )?, + &elt.as_ref().unwrap().to_string().parse::().map_err( + |err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to i64: {err}" + )) + }, + )?, )?; } }, From 98e0d2aa7ee8332a5342af1f07c4d745f2d6f3e3 Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Sun, 16 Nov 2025 21:36:20 +0100 Subject: [PATCH 5/5] add bin files from example Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- .gitignore | 3 + etl-examples/src/bin/bigquery.rs | 229 +++++++++++++++++++++++++++++++ etl-examples/src/bin/redis.rs | 208 ++++++++++++++++++++++++++++ 3 files changed, 440 insertions(+) create mode 100644 etl-examples/src/bin/bigquery.rs create mode 100644 etl-examples/src/bin/redis.rs diff --git a/.gitignore b/.gitignore index 22d1f250c..94d382168 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,6 @@ pyvenv.cfg # Log files *.log + +# Bin examples +!etl-examples/src/bin diff --git a/etl-examples/src/bin/bigquery.rs b/etl-examples/src/bin/bigquery.rs new file mode 100644 index 000000000..f6ab433e1 --- /dev/null +++ b/etl-examples/src/bin/bigquery.rs @@ -0,0 +1,229 @@ +/*! + +BigQuery Example + +This example demonstrates how to use the pipeline to stream +data from Postgres to BigQuery using change data capture (CDC). + +Prerequisites: +1. Postgres server with logical replication enabled (wal_level = logical) +2. A publication created in Postgres (CREATE PUBLICATION my_publication FOR ALL TABLES;) +3. GCP service account with BigQuery Data Editor and Job User permissions +4. Service account key file downloaded from GCP console + +Usage: + cargo run --example bigquery --features bigquery -- \ + --db-host localhost \ + --db-port 5432 \ + --db-name mydb \ + --db-username postgres \ + --db-password mypassword \ + --bq-sa-key-file /path/to/service-account-key.json \ + --bq-project-id my-gcp-project \ + --bq-dataset-id my_dataset \ + --publication my_publication + +The pipeline will automatically: +- Create tables in BigQuery matching your Postgres schema +- Perform initial data sync for existing tables +- Stream real-time changes using logical replication +- Handle schema changes and DDL operations + +*/ + +use clap::{Args, Parser}; +use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; +use etl::pipeline::Pipeline; +use etl::store::both::memory::MemoryStore; +use etl_destinations::bigquery::BigQueryDestination; +use etl_destinations::encryption::install_crypto_provider; +use std::error::Error; +use tokio::signal; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +// Main application arguments combining database and BigQuery configurations +#[derive(Debug, Parser)] +#[command(name = "bigquery", version, about, arg_required_else_help = true)] +struct AppArgs { + // Postgres connection parameters + #[clap(flatten)] + db_args: DbArgs, + // BigQuery destination parameters + #[clap(flatten)] + bq_args: BqArgs, + /// Postgres publication name (must be created beforehand with CREATE PUBLICATION) + #[arg(long)] + publication: String, +} + +// Postgres database connection configuration +#[derive(Debug, Args)] +struct DbArgs { + /// Host on which Postgres is running (e.g., localhost or IP address) + #[arg(long)] + db_host: String, + /// Port on which Postgres is running (default: 5432) + #[arg(long)] + db_port: u16, + /// Postgres database name to connect to + #[arg(long)] + db_name: String, + /// Postgres database user name (must have REPLICATION privileges) + #[arg(long)] + db_username: String, + /// Postgres database user password (optional if using trust authentication) + #[arg(long)] + db_password: Option, +} + +// BigQuery destination configuration +#[derive(Debug, Args)] +struct BqArgs { + /// Path to GCP service account key JSON file (download from GCP Console > IAM & Admin > Service Accounts) + #[arg(long)] + bq_sa_key_file: String, + /// BigQuery project ID (found in GCP Console project selector) + #[arg(long)] + bq_project_id: String, + /// BigQuery dataset ID (must exist in the specified project) + #[arg(long)] + bq_dataset_id: String, + /// Maximum batch size for processing events (higher values = better throughput, more memory usage) + #[arg(long, default_value = "1000")] + max_batch_size: usize, + /// Maximum time to wait for a batch to fill in milliseconds (lower values = lower latency, less throughput) + #[arg(long, default_value = "5000")] + max_batch_fill_duration_ms: u64, + /// Maximum number of concurrent table sync workers (higher values = faster initial sync, more resource usage) + #[arg(long, default_value = "4")] + max_table_sync_workers: u16, +} + +// Entry point - handles error reporting and process exit +#[tokio::main] +async fn main() -> Result<(), Box> { + if let Err(e) = main_impl().await { + error!("{e}"); + std::process::exit(1); + } + + Ok(()) +} + +// Initialize structured logging with configurable log levels via RUST_LOG environment variable +fn init_tracing() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "bigquery=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); +} + +// Set default log level if RUST_LOG environment variable is not set +fn set_log_level() { + if std::env::var("RUST_LOG").is_err() { + unsafe { + std::env::set_var("RUST_LOG", "info"); + } + } +} + +// Main implementation function containing all the pipeline setup and execution logic +async fn main_impl() -> Result<(), Box> { + // Set up logging and tracing + set_log_level(); + init_tracing(); + + // Install required crypto provider for authentication + install_crypto_provider(); + + // Parse command line arguments + let args = AppArgs::parse(); + + // Configure Postgres connection settings + // Note: TLS is disabled in this example - enable for production use + let pg_connection_config = PgConnectionConfig { + host: args.db_args.db_host, + port: args.db_args.db_port, + name: args.db_args.db_name, + username: args.db_args.db_username, + password: args.db_args.db_password.map(Into::into), + tls: TlsConfig { + trusted_root_certs: String::new(), + enabled: false, // Set to true and provide certs for production + }, + }; + + // Create in-memory store for tracking table replication states and table schemas + // In production, you might want to use a persistent store like PostgresStore + let store = MemoryStore::new(); + + // Create pipeline configuration with batching and retry settings + let pipeline_config = PipelineConfig { + id: 1, // Using a simple ID for the example + publication_name: args.publication, + pg_connection: pg_connection_config, + batch: BatchConfig { + max_size: args.bq_args.max_batch_size, + max_fill_ms: args.bq_args.max_batch_fill_duration_ms, + }, + table_error_retry_delay_ms: 10000, + table_error_retry_max_attempts: 5, + max_table_sync_workers: args.bq_args.max_table_sync_workers, + }; + + // Initialize BigQuery destination with service account authentication + // Tables will be automatically created to match Postgres schema + let bigquery_destination = BigQueryDestination::new_with_key_path( + args.bq_args.bq_project_id, + args.bq_args.bq_dataset_id, + &args.bq_args.bq_sa_key_file, + None, + 1, + store.clone(), + ) + .await?; + + // Create the pipeline instance with all components + let mut pipeline = Pipeline::new(pipeline_config, store, bigquery_destination); + + info!( + "Starting BigQuery CDC pipeline - connecting to Postgres and initializing replication..." + ); + + // Start the pipeline - this will: + // 1. Connect to Postgres + // 2. Initialize table states based on the publication + // 3. Start apply and table sync workers + // 4. Begin streaming replication data + pipeline.start().await?; + + info!("Pipeline started successfully! Data replication is now active. Press Ctrl+C to stop."); + + // Set up signal handler for graceful shutdown on Ctrl+C + let shutdown_signal = async { + signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + info!("Received Ctrl+C signal, initiating graceful shutdown..."); + }; + + // Wait for either the pipeline to complete naturally or receive a shutdown signal + // The pipeline will run indefinitely unless an error occurs or it's manually stopped + tokio::select! { + result = pipeline.wait() => { + info!("Pipeline completed normally (this usually indicates an error condition)"); + result?; + } + _ = shutdown_signal => { + info!("Gracefully shutting down pipeline and cleaning up resources..."); + } + } + + info!("Pipeline stopped successfully. All resources cleaned up."); + + Ok(()) +} diff --git a/etl-examples/src/bin/redis.rs b/etl-examples/src/bin/redis.rs new file mode 100644 index 000000000..f847eccbe --- /dev/null +++ b/etl-examples/src/bin/redis.rs @@ -0,0 +1,208 @@ +use clap::{Args, Parser}; +use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; +use etl::pipeline::Pipeline; +use etl::store::both::memory::MemoryStore; +use etl_destinations::redis::{RedisConfig, RedisDestination}; +use std::error::Error; +use tokio::signal; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +// Main application arguments combining database and Redis configurations +#[derive(Debug, Parser)] +#[command(name = "redis", version, about, arg_required_else_help = true)] +struct AppArgs { + // Postgres connection parameters + #[clap(flatten)] + db_args: DbArgs, + // Redis destination parameters + #[clap(flatten)] + redis_args: RedisArgs, + /// Postgres publication name (must be created beforehand with CREATE PUBLICATION) + #[arg(long)] + publication: String, +} + +// Postgres database connection configuration +#[derive(Debug, Args)] +struct DbArgs { + /// Host on which Postgres is running (e.g., localhost or IP address) (default: 127.0.0.1) + #[arg(long, default_value = "127.0.0.1")] + db_host: String, + /// Port on which Postgres is running (default: 5432) + #[arg(long, default_value = "5432")] + db_port: u16, + /// Postgres database name to connect to + #[arg(long)] + db_name: String, + /// Postgres database user name (must have REPLICATION privileges) + #[arg(long)] + db_username: String, + /// Postgres database user password (optional if using trust authentication) + #[arg(long)] + db_password: Option, +} + +// Redis destination configuration +#[derive(Debug, Args)] +struct RedisArgs { + /// Host on which Redis is running (e.g., localhost or IP address) (default: 127.0.0.1) + #[arg(long, default_value = "127.0.0.1")] + redis_host: String, + /// Port on which Redis is running (default: 6379) + #[arg(long, default_value = "6379")] + redis_port: u16, + /// Redis database user name + #[arg(long)] + redis_username: Option, + /// Redis database user password (optional if using trust authentication) + #[arg(long)] + redis_password: Option, + /// Set a TTL (in seconds) for data replicated in Redis (optional) + #[arg(long)] + redis_ttl: Option, +} + +impl From for RedisConfig { + fn from(args: RedisArgs) -> Self { + Self { + host: args.redis_host, + port: args.redis_port, + username: args.redis_username, + password: args.redis_password, + ttl: args.redis_ttl, + } + } +} + +// Entry point - handles error reporting and process exit +#[tokio::main] +async fn main() -> Result<(), Box> { + if let Err(e) = main_impl().await { + error!("{e}"); + std::process::exit(1); + } + + Ok(()) +} + +// Initialize structured logging with configurable log levels via RUST_LOG environment variable +fn init_tracing() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "redis=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); +} + +// Set default log level if RUST_LOG environment variable is not set +fn set_log_level() { + if std::env::var("RUST_LOG").is_err() { + unsafe { + std::env::set_var("RUST_LOG", "info"); + } + } +} + +// Main implementation function containing all the pipeline setup and execution logic +async fn main_impl() -> Result<(), Box> { + // Set up logging and tracing + set_log_level(); + init_tracing(); + + // Parse command line arguments + let args = AppArgs::parse(); + + // Configure Postgres connection settings + // Note: TLS is disabled in this example - enable for production use + let pg_connection_config = PgConnectionConfig { + host: args.db_args.db_host, + port: args.db_args.db_port, + name: args.db_args.db_name, + username: args.db_args.db_username, + password: args.db_args.db_password.map(Into::into), + tls: TlsConfig { + trusted_root_certs: String::new(), + enabled: false, // Set to true and provide certs for production + }, + }; + + // Create in-memory store for tracking table replication states and table schemas + // In production, you might want to use a persistent store like PostgresStore + let store = MemoryStore::new(); + + // Create pipeline configuration with batching and retry settings + let pipeline_config = PipelineConfig { + id: 1, // Using a simple ID for the example + publication_name: "my_publication_accounts".to_string(), + pg_connection: pg_connection_config, + batch: BatchConfig { + max_size: 1000, + max_fill_ms: 5000, + }, + table_error_retry_delay_ms: 10000, + table_error_retry_max_attempts: 5, + max_table_sync_workers: 4, + }; + + info!("Connection to Redis"); + let redis = RedisDestination::new(args.redis_args.into(), store.clone()).await?; + // .filter(|_table_row, table_schema| table_schema.name.name == "accounts") + // If you want to edit data before inserts/updates + // .map_insert(|mut cell, table_schema, column_schema| { + // if table_schema.name.name == "accounts" && column_schema.name == "password" { + // // Do not save password in cache + // cell.clear(); + // } + + // cell + // }) + // .map_update(|mut cell, table_schema, column_schema| { + // if table_schema.name.name == "accounts" && column_schema.name == "password" { + // // Do not save password in cache + // cell.clear(); + // } + + // cell + // }); + info!("Connected to Redis"); + // Create the pipeline instance with all components + let mut pipeline = Pipeline::new(pipeline_config, store, redis); + + info!("Starting Redis CDC pipeline - connecting to Postgres and initializing replication..."); + + // Start the pipeline - this will: + // 1. Connect to Postgres + // 2. Initialize table states based on the publication + // 3. Start apply and table sync workers + // 4. Begin streaming replication data + pipeline.start().await?; + + info!("Pipeline started successfully! Data replication is now active. Press Ctrl+C to stop."); + + // Set up signal handler for graceful shutdown on Ctrl+C + let shutdown_signal = async { + signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + info!("Received Ctrl+C signal, initiating graceful shutdown..."); + }; + + // Wait for either the pipeline to complete naturally or receive a shutdown signal + // The pipeline will run indefinitely unless an error occurs or it's manually stopped + tokio::select! { + result = pipeline.wait() => { + info!("Pipeline completed normally (this usually indicates an error condition)"); + result?; + } + _ = shutdown_signal => { + info!("Gracefully shutting down pipeline and cleaning up resources..."); + } + } + + info!("Pipeline stopped successfully. All resources cleaned up."); + + Ok(()) +}