diff --git a/.gitignore b/.gitignore index 22d1f250c..94d382168 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,6 @@ pyvenv.cfg # Log files *.log + +# Bin examples +!etl-examples/src/bin diff --git a/README.md b/README.md index e598b2e34..d58c7fcee 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,18 @@ etl = { git = "https://github.com/supabase/etl" } etl-destinations = { git = "https://github.com/supabase/etl", features = ["bigquery"] } ``` +- Redis + +Out-of-the-box destinations are available in the `etl-destinations` crate: + +```toml +[dependencies] +etl = { git = "https://github.com/supabase/etl" } +etl-destinations = { git = "https://github.com/supabase/etl", features = ["redis"] } +``` + +> Current limitation: doesn't support Redis in cluster mode yet. + ## License Apache‑2.0. See `LICENSE` for details. diff --git a/etl-destinations/Cargo.toml b/etl-destinations/Cargo.toml index c962347e2..68c11fed2 100644 --- a/etl-destinations/Cargo.toml +++ b/etl-destinations/Cargo.toml @@ -8,10 +8,17 @@ repository.workspace = true homepage.workspace = true [features] +redis = [ + "dep:tracing", + "dep:tokio", + "dep:fred", + "dep:uuid", + "dep:serde", + "dep:serde_json", +] bigquery = [ "dep:gcp-bigquery-client", "dep:prost", - "dep:rustls", "dep:tracing", "dep:tokio", "dep:base64", @@ -20,7 +27,6 @@ iceberg = [ "dep:iceberg", "dep:iceberg-catalog-rest", "dep:arrow", - "dep:rustls", "dep:tracing", "dep:parquet", "dep:uuid", @@ -47,15 +53,17 @@ iceberg-catalog-rest = { workspace = true, optional = true } parquet = { workspace = true, optional = true, features = ["async", "arrow"] } prost = { workspace = true, optional = true } reqwest = { workspace = true, optional = true, features = ["json"] } -rustls = { workspace = true, optional = true, features = [ +rustls = { workspace = true, features = [ "aws-lc-rs", "logging", ] } serde = { workspace = true, optional = true, features = ["derive"] } -serde_json = { workspace = true, optional = true } -tokio = { workspace = true, optional = true, features = ["sync"] } +serde_json = {optional = true, workspace = true } +tokio = { workspace = true, optional = true, features = ["full", "sync"] } tracing = { workspace = true, optional = true, default-features = true } -uuid = { workspace = true, optional = true, features = ["v4"] } +uuid = { workspace = true, optional = true, features = ["v4", "serde"] } +fred = { version = "10.1.0", optional = true, features = ["i-redis-json"] } +futures.workspace = true [dev-dependencies] etl = { workspace = true, features = ["test-utils"] } diff --git a/etl-destinations/src/lib.rs b/etl-destinations/src/lib.rs index da4453412..7c149181c 100644 --- a/etl-destinations/src/lib.rs +++ b/etl-destinations/src/lib.rs @@ -8,3 +8,5 @@ pub mod bigquery; pub mod encryption; #[cfg(feature = "iceberg")] pub mod iceberg; +#[cfg(feature = "redis")] +pub mod redis; diff --git a/etl-destinations/src/redis/client.rs b/etl-destinations/src/redis/client.rs new file mode 100644 index 000000000..5b848e6b6 --- /dev/null +++ b/etl-destinations/src/redis/client.rs @@ -0,0 +1,199 @@ +use etl::types::{TableRow, TableSchema}; +use fred::clients::Pipeline; +use fred::prelude::{ + Client, ClientLike, EventInterface, FredResult, KeysInterface, Pool, ReconnectPolicy, Server, + ServerConfig, TcpConfig, +}; +use fred::types::config::UnresponsiveConfig; +use fred::types::{Builder, Expiration, Key}; +use futures::future::join_all; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; +use tracing::{debug, error}; + +use crate::redis::RedisConfig; +use crate::redis::json_cell::JsonCell; + +#[derive(Clone)] +pub(super) struct RedisClient { + client: Pool, + ttl: Option, +} + +impl RedisClient { + pub(super) async fn new(config: RedisConfig) -> FredResult { + let pooled_client = Builder::default_centralized() + .with_config(|redis_config| { + redis_config.password = config.password; + redis_config.username = config.username; + redis_config.server = ServerConfig::Centralized { + server: Server::new(config.host, config.port), + }; + }) + .with_connection_config(|config| { + config.internal_command_timeout = Duration::from_secs(5); + config.reconnect_on_auth_error = true; + config.tcp = TcpConfig { + #[cfg(target_os = "linux")] + user_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }; + config.unresponsive = UnresponsiveConfig { + max_timeout: Some(Duration::from_secs(10)), + interval: Duration::from_secs(3), + }; + }) + .with_performance_config(|config| { + config.default_command_timeout = Duration::from_secs(5); + }) + .set_policy(ReconnectPolicy::new_exponential(0, 1, 2000, 5)) + .build_pool(5)?; + + for client in pooled_client.clients() { + // spawn tasks that listen for connection close or reconnect events + let mut error_rx = client.error_rx(); + let mut reconnect_rx = client.reconnect_rx(); + let mut unresponsive_rx = client.unresponsive_rx(); + + tokio::spawn(async move { + loop { + match error_rx.recv().await { + Ok((error, Some(server))) => { + error!("Redis client ({server:?}) error: {error:?}",); + } + Ok((error, None)) => { + error!("Redis client error: {error:?}",); + } + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + }); + + tokio::spawn(async move { + loop { + match unresponsive_rx.recv().await { + Ok(server) => { + error!("Redis client ({server:?}) unresponsive"); + } + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + }); + + tokio::spawn(async move { + loop { + match reconnect_rx.recv().await { + Ok(server) => { + debug!("Redis client connected to {server:?}") + } + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + }); + } + let client_handles = pooled_client.connect_pool(); + + debug!("Wait for connect"); + pooled_client.wait_for_connect().await?; + debug!("Connected"); + + tokio::spawn(async move { + let _results = join_all(client_handles).await; + }); + + Ok(Self { + client: pooled_client, + ttl: config.ttl.map(Expiration::EX), + }) + } + + // Doesn't work with redis cluster + pub(super) async fn delete_by_table_name(&self, table_name: &str) -> FredResult { + let pattern = format!("{}::::*", table_name); + let mut cursor = "0".to_string(); + let mut total_deleted = 0u64; + + loop { + let (next_cursor, keys): (String, Vec) = self + .client + .scan_page(cursor, pattern.clone(), Some(100), None) + .await?; + + if !keys.is_empty() { + let deleted: i64 = self.client.unlink(keys).await?; + total_deleted += deleted as u64; + } + + cursor = next_cursor; + if cursor == "0" { + break; + } + } + + Ok(total_deleted) + } + + pub(super) async fn delete(&self, key: RedisKey) -> FredResult<()> { + self.client.del::<(), _>(key).await + } + + pub(super) fn pipeline(&self) -> Pipeline { + self.client.next_connected().pipeline() + } + + pub(super) async fn set( + &self, + pipeline: &Pipeline, + key: RedisKey, + map: HashMap>, + ) -> Result<(), Box> { + // We don't use json to be compliant with older versions of redis + pipeline + .set::<(), _, _>( + key, + serde_json::to_string(&map)?, + self.ttl.clone(), + None, + false, + ) + .await?; + + Ok(()) + } +} + +pub(super) struct RedisKey(String); + +impl RedisKey { + pub(super) fn new(table_schema: &TableSchema, table_row: &TableRow) -> Self { + let table_name = &table_schema.name.name; + let primary_key = table_schema + .column_schemas + .iter() + .enumerate() + .filter_map(|(idx, col_schema)| { + if col_schema.primary { + let value = + serde_json::to_string(&JsonCell::Ref(table_row.values.get(idx)?)).ok()?; // FIXME: should be a parameter to avoid serializing twice + let col_name = &col_schema.name; + Some(format!("{col_name}:{value}")) + } else { + None + } + }) + .collect::>() + .join("::::"); + + Self(format!("{table_name}::::{primary_key}")) + } +} + +impl From for Key { + fn from(val: RedisKey) -> Self { + Key::from(val.0) + } +} diff --git a/etl-destinations/src/redis/json_cell.rs b/etl-destinations/src/redis/json_cell.rs new file mode 100644 index 000000000..10681915a --- /dev/null +++ b/etl-destinations/src/redis/json_cell.rs @@ -0,0 +1,125 @@ +use etl::types::{ArrayCell, Cell, PgNumeric, Sign}; +use serde::ser::SerializeSeq; +use serde::{Serialize, Serializer}; + +pub(super) enum JsonCell<'a> { + Value(Cell), + Ref(&'a Cell), +} + +impl<'a> AsRef for JsonCell<'a> { + fn as_ref(&self) -> &Cell { + match &self { + JsonCell::Value(cell) => cell, + JsonCell::Ref(cell) => cell, + } + } +} + +impl<'a> Serialize for JsonCell<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self.as_ref() { + Cell::Null => serializer.serialize_none(), + Cell::Bool(b) => b.serialize(serializer), + Cell::String(string) => string.serialize(serializer), + Cell::I16(number) => number.serialize(serializer), + Cell::I32(number) => number.serialize(serializer), + Cell::U32(number) => number.serialize(serializer), + Cell::I64(number) => number.serialize(serializer), + Cell::F32(number) => number.serialize(serializer), + Cell::F64(number) => number.serialize(serializer), + Cell::Numeric(pg_numeric) => match pg_numeric { + etl::types::PgNumeric::NaN => serializer.serialize_none(), + etl::types::PgNumeric::PositiveInfinity => serializer.serialize_none(), + etl::types::PgNumeric::NegativeInfinity => serializer.serialize_none(), + etl::types::PgNumeric::Value { sign, .. } => match sign { + Sign::Positive => pg_numeric + .to_string() + .parse::() + .map_err(|err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to u64: {err}" + )) + })? + .serialize(serializer), + Sign::Negative => pg_numeric + .to_string() + .parse::() + .map_err(|err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to i64: {err}" + )) + })? + .serialize(serializer), + }, + }, + Cell::Date(naive_date) => naive_date.serialize(serializer), + Cell::Time(naive_time) => naive_time.serialize(serializer), + Cell::Timestamp(naive_date_time) => naive_date_time.serialize(serializer), + Cell::TimestampTz(date_time) => date_time.serialize(serializer), + Cell::Uuid(uuid) => uuid.serialize(serializer), + Cell::Json(value) => value.serialize(serializer), + Cell::Bytes(items) => items.serialize(serializer), + Cell::Array(array_cell) => match array_cell { + ArrayCell::Bool(items) => items.serialize(serializer), + ArrayCell::String(items) => items.serialize(serializer), + ArrayCell::I16(items) => items.serialize(serializer), + ArrayCell::I32(items) => items.serialize(serializer), + ArrayCell::U32(items) => items.serialize(serializer), + ArrayCell::I64(items) => items.serialize(serializer), + ArrayCell::F32(items) => items.serialize(serializer), + ArrayCell::F64(items) => items.serialize(serializer), + ArrayCell::Numeric(pg_numerics) => { + let mut seq = serializer.serialize_seq(Some(pg_numerics.len()))?; + for elt in pg_numerics { + match elt { + Some(PgNumeric::NaN) + | Some(PgNumeric::NegativeInfinity) + | Some(PgNumeric::PositiveInfinity) => { + seq.serialize_element::>(&None)? + } + Some(PgNumeric::Value { sign, .. }) => match sign { + Sign::Positive => { + seq.serialize_element( + &elt.as_ref().unwrap().to_string().parse::().map_err( + |err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to u64: {err}" + )) + }, + )?, + )?; + } + Sign::Negative => { + seq.serialize_element( + &elt.as_ref().unwrap().to_string().parse::().map_err( + |err| { + serde::ser::Error::custom(format!( + "cannot parse pg numeric to i64: {err}" + )) + }, + )?, + )?; + } + }, + None => { + seq.serialize_element::>(&None)?; + } + } + } + seq.end() + } + ArrayCell::Date(naive_dates) => naive_dates.serialize(serializer), + ArrayCell::Time(naive_times) => naive_times.serialize(serializer), + ArrayCell::Timestamp(naive_date_times) => naive_date_times.serialize(serializer), + ArrayCell::TimestampTz(date_times) => date_times.serialize(serializer), + ArrayCell::Uuid(uuids) => uuids.serialize(serializer), + ArrayCell::Json(values) => values.serialize(serializer), + ArrayCell::Bytes(items) => items.serialize(serializer), + }, + } + } +} diff --git a/etl-destinations/src/redis/mod.rs b/etl-destinations/src/redis/mod.rs new file mode 100644 index 000000000..0c9b0d0ca --- /dev/null +++ b/etl-destinations/src/redis/mod.rs @@ -0,0 +1,268 @@ +mod client; +mod json_cell; + +use etl::destination::Destination; +use etl::error::{ErrorKind, EtlError, EtlResult}; +use etl::store::schema::SchemaStore; +use etl::store::state::StateStore; +use etl::types::{Cell, ColumnSchema, Event, TableId, TableRow, TableSchema}; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::debug; + +use crate::redis::client::{RedisClient, RedisKey}; +use crate::redis::json_cell::JsonCell; + +pub type MapFunction = Arc Cell + Send + Sync>; +pub type FilterFunction = Arc bool + Send + Sync>; + +#[derive(Clone)] +pub struct RedisDestination { + redis_client: RedisClient, + store: S, + /// If you want to change data before inserting it in Redis + filter: Option, + /// If you want to change data before inserting it in Redis + map_insert: Option, + /// If you want to change data before updating it in Redis + map_update: Option, +} + +#[derive(Clone, Debug)] +pub struct RedisConfig { + /// Host on which Redis is running (e.g., localhost or IP address) (default: 127.0.0.1) + pub host: String, + /// Port on which Redis is running (default: 6379) + pub port: u16, + /// Redis database user name + pub username: Option, + /// Redis database user password (optional if using trust authentication) + pub password: Option, + /// Redis TTL if you want to set a TTL on replicated data in Redis + pub ttl: Option, +} + +impl RedisDestination +where + S: StateStore + SchemaStore, +{ + /// Creates a new Redis destination. + pub async fn new(config: RedisConfig, store: S) -> EtlResult { + let redis_client = RedisClient::new(config).await.map_err(|err| { + EtlError::from(( + ErrorKind::DestinationConnectionFailed, + "cannot connect to redis", + err.to_string(), + )) + })?; + + Ok(Self { + redis_client, + store, + filter: None, + map_insert: None, + map_update: None, + }) + } + + /// Filter the table row to insert/update + pub fn filter(mut self, f: F) -> Self + where + F: Fn(&TableRow, &TableSchema) -> bool + Send + Sync + 'static, + { + self.filter = Some(Arc::new(f)); + self + } + + /// If you want to change data before inserting it in Redis + pub fn map_insert(mut self, f: F) -> Self + where + F: Fn(Cell, &TableSchema, &ColumnSchema) -> Cell + Send + Sync + 'static, + { + self.map_insert = Some(Arc::new(f)); + self + } + + /// If you want to change data before updating it in Redis + pub fn map_update(mut self, f: F) -> Self + where + F: Fn(Cell, &TableSchema, &ColumnSchema) -> Cell + Send + Sync + 'static, + { + self.map_update = Some(Arc::new(f)); + self + } + + async fn _write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + is_update: bool, + ) -> EtlResult<()> { + let table_schema = self + .store + .get_table_schema(&table_id) + .await? + .ok_or_else(|| { + EtlError::from((ErrorKind::MissingTableSchema, "cannot get table schema")) + })?; + + debug!("writing a batch of {} table rows:", table_rows.len()); + let pipeline = self.redis_client.pipeline(); + for table_row in table_rows { + if let Some(filter) = &self.filter + && !filter(&table_row, &table_schema) + { + debug!("skipping table row"); + continue; + } + let key = RedisKey::new(table_schema.as_ref(), &table_row); + let map: HashMap = table_row + .values + .into_iter() + .enumerate() + .filter_map(|(idx, cell)| { + let column_schema = table_schema.column_schemas.get(idx)?; + let map_func = if is_update { + &self.map_update + } else { + &self.map_insert + }; + let cell = match map_func { + Some(map_func) => map_func(cell, table_schema.as_ref(), column_schema), + None => cell, + }; + + Some((column_schema.name.clone(), JsonCell::Value(cell))) + }) + .collect(); + self.redis_client + .set(&pipeline, key, map) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot set in redis", + err.to_string(), + )) + })?; + } + + pipeline.all().await.map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot execute pipeline in redis", + err.to_string(), + )) + }) + } +} + +impl Destination for RedisDestination +where + S: StateStore + SchemaStore + Send + Sync, +{ + fn name() -> &'static str { + "redis" + } + + async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> { + let Some(current_table_name) = self.store.get_table_mapping(&table_id).await? else { + return Ok(()); + }; + + debug!("truncating table {}", current_table_name); + self.redis_client + .delete_by_table_name(¤t_table_name) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot delete key in Redis", + err.to_string(), + )) + })?; + + Ok(()) + } + + async fn write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + ) -> EtlResult<()> { + self._write_table_rows(table_id, table_rows, false).await + } + + async fn write_events(&self, events: Vec) -> EtlResult<()> { + debug!("writing a batch of {} events:", events.len()); + + for event in events { + debug!(?event); + match event { + Event::Insert(insert_event) => { + self.write_table_rows(insert_event.table_id, vec![insert_event.table_row]) + .await?; + } + Event::Update(update_event) => { + self._write_table_rows( + update_event.table_id, + vec![update_event.table_row], + true, + ) + .await?; + } + Event::Delete(delete_event) => { + let Some(table_schema) = self + .store + .get_table_schema(&delete_event.table_id) + .await + .ok() + .flatten() + else { + continue; + }; + let Some((_only_keys, old_table_row)) = delete_event.old_table_row else { + continue; + }; + self.redis_client + .delete(RedisKey::new(table_schema.as_ref(), &old_table_row)) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot del in redis", + err.to_string(), + )) + })?; + } + Event::Truncate(truncate_event) => { + for table_id in truncate_event.rel_ids { + let Some(table_schema) = self + .store + .get_table_schema(&TableId(table_id)) + .await + .ok() + .flatten() + else { + continue; + }; + + self.redis_client + .delete_by_table_name(&table_schema.name.name) + .await + .map_err(|err| { + EtlError::from(( + ErrorKind::DestinationError, + "cannot del in redis", + err.to_string(), + )) + })?; + } + } + Event::Begin(_) | Event::Commit(_) | Event::Relation(_) | Event::Unsupported => {} + } + } + + Ok(()) + } +} diff --git a/etl-destinations/tests/redis.rs b/etl-destinations/tests/redis.rs new file mode 100644 index 000000000..a0ea5e2de --- /dev/null +++ b/etl-destinations/tests/redis.rs @@ -0,0 +1,166 @@ +#![cfg(feature = "redis")] + +use etl::state::table::TableReplicationPhaseType; +use etl::test_utils::database::spawn_source_database; +use etl::test_utils::notify::NotifyingStore; +use etl::test_utils::pipeline::create_pipeline; +use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; +use etl::test_utils::test_schema::{TableSelection, setup_test_database_schema}; +use etl::types::{EventType, PipelineId}; +use etl_destinations::encryption::install_crypto_provider; +use etl_destinations::redis::{RedisConfig, RedisDestination}; +use etl_telemetry::tracing::init_test_tracing; +use fred::prelude::KeysInterface; +use rand::random; +use serde_json::Value; + +mod support; + +#[tokio::test(flavor = "multi_thread")] +async fn table_insert_update_delete() { + init_test_tracing(); + install_crypto_provider(); + + let database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let pipeline_id: PipelineId = random(); + let redis_cfg = RedisConfig { + host: String::from("127.0.0.1"), + port: 6379, + username: None, + password: None, + ttl: None, + }; + let redis = RedisDestination::new(redis_cfg.clone(), store.clone()) + .await + .unwrap(); + let destination = TestDestinationWrapper::wrap(redis); + + // Start pipeline from scratch. + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + destination.clone(), + ); + + // Register notifications for table copy completion. + let users_state_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::SyncDone, + ) + .await; + + pipeline.start().await.unwrap(); + + users_state_notify.notified().await; + + // Wait for the first insert. + let event_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + // Insert a row. + database + .insert_values( + database_schema.users_schema().name.clone(), + &["name", "age"], + &[&"user_1", &1], + ) + .await + .unwrap(); + + event_notify.notified().await; + + let redis_client = crate::support::redis::create_redis_connection(redis_cfg) + .await + .unwrap(); + + // Test if it has been inserted in redis + let user_row: String = redis_client + .get(format!( + "{}::::id:1", + database_schema.users_schema().name.name + )) + .await + .unwrap(); + let user_value: Value = serde_json::from_str(&user_row).unwrap(); + assert_eq!( + serde_json::json!({ + "id": 1, + "name": "user_1", + "age": 1 + }), + user_value + ); + + // Wait for the update. + let event_notify = destination + .wait_for_events_count(vec![(EventType::Update, 1)]) + .await; + + // Update the row. + database + .update_values( + database_schema.users_schema().name.clone(), + &["name", "age"], + &[&"user_10", &10], + ) + .await + .unwrap(); + + event_notify.notified().await; + + // Test if it has been updated in redis + let user_row: String = redis_client + .get(format!( + "{}::::id:1", + database_schema.users_schema().name.name + )) + .await + .unwrap(); + let user_value: Value = serde_json::from_str(&user_row).unwrap(); + assert_eq!( + serde_json::json!({ + "id": 1, + "name": "user_10", + "age": 10 + }), + user_value + ); + + // Wait for the update. + let event_notify = destination + .wait_for_events_count(vec![(EventType::Delete, 1)]) + .await; + + // Update the row. + database + .delete_values( + database_schema.users_schema().name.clone(), + &["name"], + &["'user_10'"], + "", + ) + .await + .unwrap(); + + event_notify.notified().await; + + pipeline.shutdown_and_wait().await.unwrap(); + + // Test if it has been removed in redis + assert!( + redis_client + .get::(format!( + "{}::::id:1", + database_schema.users_schema().name.name + )) + .await + .is_err() + ); +} diff --git a/etl-destinations/tests/support/mod.rs b/etl-destinations/tests/support/mod.rs index 99cac4bcf..77aab2824 100644 --- a/etl-destinations/tests/support/mod.rs +++ b/etl-destinations/tests/support/mod.rs @@ -1,3 +1,4 @@ pub mod bigquery; pub mod iceberg; pub mod lakekeeper; +pub mod redis; diff --git a/etl-destinations/tests/support/redis.rs b/etl-destinations/tests/support/redis.rs new file mode 100644 index 000000000..9ce866eab --- /dev/null +++ b/etl-destinations/tests/support/redis.rs @@ -0,0 +1,49 @@ +#![cfg(feature = "redis")] + +use etl_destinations::redis::RedisConfig; +use fred::prelude::{ + ClientLike, FredResult, Pool, ReconnectPolicy, Server, ServerConfig, TcpConfig, +}; +use fred::types::Builder; +use fred::types::config::UnresponsiveConfig; +use futures::future::join_all; +use std::time::Duration; + +#[allow(dead_code)] +pub async fn create_redis_connection(config: RedisConfig) -> FredResult { + let pooled_client = Builder::default_centralized() + .with_config(|redis_config| { + redis_config.password = config.password; + redis_config.username = config.username; + redis_config.server = ServerConfig::Centralized { + server: Server::new(config.host, config.port), + }; + }) + .with_connection_config(|config| { + config.internal_command_timeout = Duration::from_secs(5); + config.reconnect_on_auth_error = true; + config.tcp = TcpConfig { + #[cfg(target_os = "linux")] + user_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }; + config.unresponsive = UnresponsiveConfig { + max_timeout: Some(Duration::from_secs(10)), + interval: Duration::from_secs(3), + }; + }) + .with_performance_config(|config| { + config.default_command_timeout = Duration::from_secs(5); + }) + .set_policy(ReconnectPolicy::new_exponential(0, 1, 2000, 5)) + .build_pool(1)?; + + let client_handles = pooled_client.connect_pool(); + pooled_client.wait_for_connect().await?; + + tokio::spawn(async move { + let _results = join_all(client_handles).await; + }); + + Ok(pooled_client) +} diff --git a/etl-examples/Cargo.toml b/etl-examples/Cargo.toml index 447f71d49..af81e2ced 100644 --- a/etl-examples/Cargo.toml +++ b/etl-examples/Cargo.toml @@ -9,7 +9,7 @@ homepage.workspace = true [dependencies] etl = { workspace = true } -etl-destinations = { workspace = true, features = ["bigquery"] } +etl-destinations = { workspace = true, features = ["bigquery", "redis"] } clap = { workspace = true, default-features = true, features = [ "std", @@ -18,3 +18,9 @@ clap = { workspace = true, default-features = true, features = [ tokio = { workspace = true, features = ["macros", "signal"] } tracing = { workspace = true, default-features = true } tracing-subscriber = { workspace = true, default-features = true, features = ["env-filter"] } + +[[bin]] +name = "bigquery" + +[[bin]] +name = "redis" diff --git a/etl-examples/README.md b/etl-examples/README.md index a9c95a3eb..bdca4b4b2 100644 --- a/etl-examples/README.md +++ b/etl-examples/README.md @@ -5,8 +5,16 @@ This crate contains practical examples demonstrating how to use the ETL system f ## Available Examples - **BigQuery Integration**: Demonstrates replicating Postgres data to Google BigQuery +- **Redis Integration**: Demonstrates replicating Postgres data to Redis (to benefit from Redis as a cache for auth tokens for example) -## Quick Start +## Prerequisites + +Before running the examples, you'll need to set up a Postgres database with logical replication enabled. + + +## BigQuery + +### Quick Start To quickly try out `etl`, you can run the BigQuery example. First, create a publication in Postgres which includes the tables you want to replicate: @@ -18,7 +26,7 @@ for table table1, table2; Then run the BigQuery example: ```bash -cargo run -p etl-examples -- \ +cargo run --bin bigquery -p etl-examples -- \ --db-host localhost \ --db-port 5432 \ --db-name postgres \ @@ -32,11 +40,7 @@ cargo run -p etl-examples -- \ In the above example, `etl` connects to a Postgres database named `postgres` running on `localhost:5432` with a username `postgres` and password `password`. -## Prerequisites - -Before running the examples, you'll need to set up a Postgres database with logical replication enabled. - -## BigQuery Setup +### BigQuery Setup To run the BigQuery example, you'll need: @@ -45,3 +49,63 @@ To run the BigQuery example, you'll need: 3. A BigQuery dataset created in your project The example will automatically create tables in the specified dataset based on your Postgres schema. + + +## Redis + +### Quick Start + +To quickly try out `etl`, you can run the Redis example. First, create a publication in Postgres which includes the tables you want to replicate: + +```sql +create publication my_publication +for table table1, table2; +``` + +Then run the Redis example: + +```bash +cargo run --bin redis -p etl-examples -- \ + --db-host localhost \ + --db-port 5432 \ + --db-name postgres \ + --db-username postgres \ + --db-password password \ + --publication my_publication +``` + +Usage of the cli: + +```bash +Usage: redis [OPTIONS] --db-name --db-username --publication + +Options: + --db-host + Host on which Postgres is running (e.g., localhost or IP address) (default: 127.0.0.1) [default: 127.0.0.1] + --db-port + Port on which Postgres is running (default: 5432) [default: 5432] + --db-name + Postgres database name to connect to + --db-username + Postgres database user name (must have REPLICATION privileges) + --db-password + Postgres database user password (optional if using trust authentication) + --redis-host + Host on which Redis is running (e.g., localhost or IP address) (default: 127.0.0.1) [default: 127.0.0.1] + --redis-port + Port on which Redis is running (default: 6379) [default: 6379] + --redis-username + Redis database user name + --redis-password + Redis database user password (optional if using trust authentication) + --redis-ttl + Set a TTL (in seconds) for data replicated in Redis (optional) + --publication + Postgres publication name (must be created beforehand with CREATE PUBLICATION) + -h, --help + Print help + -V, --version + Print version +``` + +In the above example, `etl` connects to a Postgres database named `postgres` running on `localhost:5432` with a username `postgres` and password `password` on a local instance of Redis. diff --git a/etl-examples/src/main.rs b/etl-examples/src/bin/bigquery.rs similarity index 99% rename from etl-examples/src/main.rs rename to etl-examples/src/bin/bigquery.rs index 37995a044..f6ab433e1 100644 --- a/etl-examples/src/main.rs +++ b/etl-examples/src/bin/bigquery.rs @@ -1,4 +1,4 @@ -/* +/*! BigQuery Example diff --git a/etl-examples/src/bin/redis.rs b/etl-examples/src/bin/redis.rs new file mode 100644 index 000000000..f847eccbe --- /dev/null +++ b/etl-examples/src/bin/redis.rs @@ -0,0 +1,208 @@ +use clap::{Args, Parser}; +use etl::config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig}; +use etl::pipeline::Pipeline; +use etl::store::both::memory::MemoryStore; +use etl_destinations::redis::{RedisConfig, RedisDestination}; +use std::error::Error; +use tokio::signal; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +// Main application arguments combining database and Redis configurations +#[derive(Debug, Parser)] +#[command(name = "redis", version, about, arg_required_else_help = true)] +struct AppArgs { + // Postgres connection parameters + #[clap(flatten)] + db_args: DbArgs, + // Redis destination parameters + #[clap(flatten)] + redis_args: RedisArgs, + /// Postgres publication name (must be created beforehand with CREATE PUBLICATION) + #[arg(long)] + publication: String, +} + +// Postgres database connection configuration +#[derive(Debug, Args)] +struct DbArgs { + /// Host on which Postgres is running (e.g., localhost or IP address) (default: 127.0.0.1) + #[arg(long, default_value = "127.0.0.1")] + db_host: String, + /// Port on which Postgres is running (default: 5432) + #[arg(long, default_value = "5432")] + db_port: u16, + /// Postgres database name to connect to + #[arg(long)] + db_name: String, + /// Postgres database user name (must have REPLICATION privileges) + #[arg(long)] + db_username: String, + /// Postgres database user password (optional if using trust authentication) + #[arg(long)] + db_password: Option, +} + +// Redis destination configuration +#[derive(Debug, Args)] +struct RedisArgs { + /// Host on which Redis is running (e.g., localhost or IP address) (default: 127.0.0.1) + #[arg(long, default_value = "127.0.0.1")] + redis_host: String, + /// Port on which Redis is running (default: 6379) + #[arg(long, default_value = "6379")] + redis_port: u16, + /// Redis database user name + #[arg(long)] + redis_username: Option, + /// Redis database user password (optional if using trust authentication) + #[arg(long)] + redis_password: Option, + /// Set a TTL (in seconds) for data replicated in Redis (optional) + #[arg(long)] + redis_ttl: Option, +} + +impl From for RedisConfig { + fn from(args: RedisArgs) -> Self { + Self { + host: args.redis_host, + port: args.redis_port, + username: args.redis_username, + password: args.redis_password, + ttl: args.redis_ttl, + } + } +} + +// Entry point - handles error reporting and process exit +#[tokio::main] +async fn main() -> Result<(), Box> { + if let Err(e) = main_impl().await { + error!("{e}"); + std::process::exit(1); + } + + Ok(()) +} + +// Initialize structured logging with configurable log levels via RUST_LOG environment variable +fn init_tracing() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "redis=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); +} + +// Set default log level if RUST_LOG environment variable is not set +fn set_log_level() { + if std::env::var("RUST_LOG").is_err() { + unsafe { + std::env::set_var("RUST_LOG", "info"); + } + } +} + +// Main implementation function containing all the pipeline setup and execution logic +async fn main_impl() -> Result<(), Box> { + // Set up logging and tracing + set_log_level(); + init_tracing(); + + // Parse command line arguments + let args = AppArgs::parse(); + + // Configure Postgres connection settings + // Note: TLS is disabled in this example - enable for production use + let pg_connection_config = PgConnectionConfig { + host: args.db_args.db_host, + port: args.db_args.db_port, + name: args.db_args.db_name, + username: args.db_args.db_username, + password: args.db_args.db_password.map(Into::into), + tls: TlsConfig { + trusted_root_certs: String::new(), + enabled: false, // Set to true and provide certs for production + }, + }; + + // Create in-memory store for tracking table replication states and table schemas + // In production, you might want to use a persistent store like PostgresStore + let store = MemoryStore::new(); + + // Create pipeline configuration with batching and retry settings + let pipeline_config = PipelineConfig { + id: 1, // Using a simple ID for the example + publication_name: "my_publication_accounts".to_string(), + pg_connection: pg_connection_config, + batch: BatchConfig { + max_size: 1000, + max_fill_ms: 5000, + }, + table_error_retry_delay_ms: 10000, + table_error_retry_max_attempts: 5, + max_table_sync_workers: 4, + }; + + info!("Connection to Redis"); + let redis = RedisDestination::new(args.redis_args.into(), store.clone()).await?; + // .filter(|_table_row, table_schema| table_schema.name.name == "accounts") + // If you want to edit data before inserts/updates + // .map_insert(|mut cell, table_schema, column_schema| { + // if table_schema.name.name == "accounts" && column_schema.name == "password" { + // // Do not save password in cache + // cell.clear(); + // } + + // cell + // }) + // .map_update(|mut cell, table_schema, column_schema| { + // if table_schema.name.name == "accounts" && column_schema.name == "password" { + // // Do not save password in cache + // cell.clear(); + // } + + // cell + // }); + info!("Connected to Redis"); + // Create the pipeline instance with all components + let mut pipeline = Pipeline::new(pipeline_config, store, redis); + + info!("Starting Redis CDC pipeline - connecting to Postgres and initializing replication..."); + + // Start the pipeline - this will: + // 1. Connect to Postgres + // 2. Initialize table states based on the publication + // 3. Start apply and table sync workers + // 4. Begin streaming replication data + pipeline.start().await?; + + info!("Pipeline started successfully! Data replication is now active. Press Ctrl+C to stop."); + + // Set up signal handler for graceful shutdown on Ctrl+C + let shutdown_signal = async { + signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + info!("Received Ctrl+C signal, initiating graceful shutdown..."); + }; + + // Wait for either the pipeline to complete naturally or receive a shutdown signal + // The pipeline will run indefinitely unless an error occurs or it's manually stopped + tokio::select! { + result = pipeline.wait() => { + info!("Pipeline completed normally (this usually indicates an error condition)"); + result?; + } + _ = shutdown_signal => { + info!("Gracefully shutting down pipeline and cleaning up resources..."); + } + } + + info!("Pipeline stopped successfully. All resources cleaned up."); + + Ok(()) +} diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index a1f48e3da..1ddc87898 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -61,9 +61,10 @@ pub enum ApplyLoopResult { /// Action that should be taken during the apply loop. /// /// An action defines what to do after one iteration of the apply loop. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] pub enum ApplyLoopAction { /// The apply loop can continue on the next element. + #[default] Continue, /// The apply loop should pause processing. Pause, @@ -129,12 +130,6 @@ impl ApplyLoopAction { } } -impl Default for ApplyLoopAction { - fn default() -> Self { - Self::Continue - } -} - /// Hook trait for customizing apply loop behavior. /// /// [`ApplyLoopHook`] allows external components to inject custom logic into diff --git a/scripts/docker-compose.yaml b/scripts/docker-compose.yaml index bc4d07fc0..fa248baa9 100644 --- a/scripts/docker-compose.yaml +++ b/scripts/docker-compose.yaml @@ -8,6 +8,10 @@ x-lakekeeper-env: &lakekeeper-env name: etl-pg-${POSTGRES_VERSION:-17} services: + redis: + image: redis:latest + ports: + - 6379:6379 # Start Postgres postgres: image: postgres:${POSTGRES_VERSION:-17} @@ -33,19 +37,23 @@ services: retries: 5 lakekeeper-postgres: - image: postgres:17 - environment: - POSTGRES_USER: ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} - POSTGRES_PASSWORD: ${LAKEKEEPER_POSTGRES_PASSWORD:-lakekeeper-postgres} - POSTGRES_DB: ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog} - volumes: - - ${LAKEKEEPER_POSTGRES_DATA_VOLUME:-lakekeeper_postgres_data}:/var/lib/postgresql/data - restart: unless-stopped - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} -d ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog}"] - interval: 5s - timeout: 5s - retries: 5 + image: postgres:17 + environment: + POSTGRES_USER: ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} + POSTGRES_PASSWORD: ${LAKEKEEPER_POSTGRES_PASSWORD:-lakekeeper-postgres} + POSTGRES_DB: ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog} + volumes: + - ${LAKEKEEPER_POSTGRES_DATA_VOLUME:-lakekeeper_postgres_data}:/var/lib/postgresql/data + restart: unless-stopped + healthcheck: + test: + [ + "CMD-SHELL", + "pg_isready -U ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} -d ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog}", + ] + interval: 5s + timeout: 5s + retries: 5 # Start MinIO for S3-compatible object storage minio: