Skip to content

Commit ee0e561

Browse files
feat: Add Debezium-style Snapshot for Initial Data Capture (#15)
* feat: Add Debezium-style Snapshot for Initial Data Capture * chore: fix security gates on pipeline * chore: fix lint * chore: fix lint
1 parent d8efcb6 commit ee0e561

File tree

12 files changed

+669
-22
lines changed

12 files changed

+669
-22
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@ jobs:
4848
actions: read
4949
contents: read
5050
security-events: write
51+
secrets: inherit

.golangci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ linters:
4040

4141
issues:
4242
exclude-rules:
43+
- path: example/
44+
linters:
45+
- funlen
4346
- path: (.+)_test.go
4447
linters:
4548
- funlen

README.md

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,75 @@ go-pq-cdc-elasticsearch streams documents from PostgreSql and writes to Elastics
66

77
### Contents
88

9-
* [Usage](#usage)
10-
* [Examples](#examples)
11-
* [Availability](#availability)
12-
* [Configuration](#configuration)
13-
* [API](#api)
14-
* [Exposed Metrics](#exposed-metrics)
15-
* [Compatibility](#compatibility)
16-
* [Breaking Changes](#breaking-changes)
9+
- [go-pq-cdc-elasticsearch ](#go-pq-cdc-elasticsearch---)
10+
- [Contents](#contents)
11+
- [📸 Snapshot Feature](#-snapshot-feature)
12+
- [Snapshot Modes](#snapshot-modes)
13+
- [How It Works](#how-it-works)
14+
- [Identifying Snapshot vs CDC Messages](#identifying-snapshot-vs-cdc-messages)
15+
- [Usage](#usage)
16+
- [Examples](#examples)
17+
- [Availability](#availability)
18+
- [Configuration](#configuration)
19+
- [API](#api)
20+
- [Exposed Metrics](#exposed-metrics)
21+
- [Snapshot Metrics](#snapshot-metrics)
22+
- [Compatibility](#compatibility)
23+
- [Elasticsearch Version Compatibility](#elasticsearch-version-compatibility)
24+
- [Breaking Changes](#breaking-changes)
25+
26+
## 📸 Snapshot Feature
27+
28+
**Capture existing data before starting CDC!** The snapshot feature enables initial data synchronization, ensuring Elasticsearch receives both historical and real-time data.
29+
30+
**Key Highlights:**
31+
32+
- **Zero Data Loss**: Consistent point-in-time snapshot using PostgreSQL's `pg_export_snapshot()`
33+
- **Chunk-Based Processing**: Memory-efficient processing of large tables
34+
- **Multi-Instance Support**: Parallel processing across multiple instances for faster snapshots
35+
- **Crash Recovery**: Automatic resume from failures with chunk-level tracking
36+
- **No Duplicates**: Seamless transition from snapshot to CDC mode
37+
- **Flexible Modes**: Choose between `initial`, `never`, or `snapshot_only` based on your needs
38+
39+
### Snapshot Modes
40+
41+
| Mode | Description | Use Case |
42+
|-----------------|--------------------------------------------------------------------------------------------------|-------------------------------------------------|
43+
| `initial` | Takes snapshot only if no previous snapshot exists, then starts CDC | First-time setup with existing data |
44+
| `never` | Skips snapshot entirely, starts CDC immediately | New tables or when historical data not needed |
45+
| `snapshot_only` | Takes snapshot and exits (no CDC, no replication slot required) | One-time data migration or backfill |
46+
47+
### How It Works
48+
49+
1. **Snapshot Phase**: Captures existing data in chunks for memory efficiency
50+
2. **Consistent Point**: Uses PostgreSQL's `pg_export_snapshot()` to ensure data consistency
51+
3. **CDC Phase**: Seamlessly transitions to real-time change data capture
52+
4. **No Gaps**: Ensures all changes during snapshot are captured via CDC
53+
54+
### Identifying Snapshot vs CDC Messages
55+
56+
Your handler function can distinguish between snapshot and CDC messages:
57+
58+
```go
59+
func Handler(msg cdc.Message) []elasticsearch.Action {
60+
// Check if this is a snapshot message (historical data)
61+
if msg.Type.IsSnapshot() {
62+
// Handle snapshot data - index to Elasticsearch
63+
id := strconv.Itoa(int(msg.NewData["id"].(int32)))
64+
data, _ := json.Marshal(msg.NewData)
65+
return []elasticsearch.Action{
66+
elasticsearch.NewIndexAction([]byte(id), data, nil),
67+
}
68+
}
69+
70+
// Handle real-time CDC operations
71+
if msg.Type.IsInsert() { /* ... */ }
72+
if msg.Type.IsUpdate() { /* ... */ }
73+
if msg.Type.IsDelete() { /* ... */ }
74+
}
75+
```
76+
77+
For detailed configuration and usage, see the [snapshot example](./example/snapshot).
1778

1879
### Usage
1980

@@ -125,6 +186,7 @@ func Handler(msg cdc.Message) []elasticsearch.Action {
125186
### Examples
126187

127188
* [Simple](./example/simple)
189+
* [Snapshot](./example/snapshot)
128190

129191
### Availability
130192

@@ -162,6 +224,13 @@ This setup ensures continuous data synchronization and minimal downtime in captu
162224
| `cdc.slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
163225
| `cdc.slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
164226
| `cdc.slot.slotActivityCheckerInterval` | int | yes | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
227+
| `cdc.snapshot.enabled` | bool | no | false | Enable initial snapshot feature | When enabled, captures existing data before starting CDC. |
228+
| `cdc.snapshot.mode` | string | no | never | Snapshot mode: `initial`, `never`, or `snapshot_only` | **initial:** Take snapshot only if no previous snapshot exists, then start CDC. <br> **never:** Skip snapshot, start CDC immediately. <br> **snapshot_only:** Take snapshot and exit (no CDC). |
229+
| `cdc.snapshot.chunkSize` | int64 | no | 8000 | Number of rows per chunk during snapshot | Adjust based on table size. Larger chunks = fewer chunks but more memory per chunk. |
230+
| `cdc.snapshot.claimTimeout` | time.Duration | no | 30s | Timeout to reclaim stale chunks | If a worker doesn't send heartbeat for this duration, chunk is reclaimed by another worker. |
231+
| `cdc.snapshot.heartbeatInterval` | time.Duration | no | 5s | Interval for worker heartbeat updates | Workers send heartbeat every N seconds to indicate they're processing a chunk. |
232+
| `cdc.snapshot.instanceId` | string | no | auto | Custom instance identifier (optional) | Auto-generated as hostname-pid if not specified. Useful for tracking workers in multi-instance scenarios. |
233+
| `cdc.snapshot.tables` | []Table | no* | - | Tables to snapshot (required for `snapshot_only` mode, optional for `initial` mode) | **snapshot_only:** Must be specified here (independent from publication). <br> **initial:** If specified, must be a subset of publication tables. If not specified, all publication tables are snapshotted. |
165234
| `elasticsearch.username` | string | no (yes, if the auth enabled) | - | The username for authenticating to Elasticsearch. | Maps table names to Elasticsearch indices. |
166235
| `elasticsearch.password` | string | no (yes, if the auth enabled) | - | The password associated with the elasticsearch.username for authenticating to Elasticsearch. | Maps table names to Elasticsearch indices. |
167236
| `elasticsearch.tableIndexMapping` | map[string]string | yes | - | Mapping of PostgreSQL table events to Elasticsearch indices | Maps table names to Elasticsearch indices. |
@@ -198,6 +267,17 @@ the `/metrics` endpoint.
198267
| go_pq_cdc_elasticsearch_index_total | Total number of index operation. | slot_name, host, index_name | Counter |
199268
| go_pq_cdc_elasticsearch_delete_total | Total number of delete operation. | slot_name, host, index_name | Counter |
200269

270+
### Snapshot Metrics
271+
272+
| Metric Name | Description | Labels | Value Type |
273+
|--------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|-----------------------------|--------------|
274+
| go_pq_cdc_snapshot_in_progress | Indicates whether snapshot is currently in progress (1 for active, 0 for inactive). | slot_name, host | Gauge |
275+
| go_pq_cdc_snapshot_total_tables | Total number of tables to snapshot. | slot_name, host | Gauge |
276+
| go_pq_cdc_snapshot_total_chunks | Total number of chunks to process across all tables. | slot_name, host | Gauge |
277+
| go_pq_cdc_snapshot_completed_chunks | Number of chunks completed in snapshot. | slot_name, host | Gauge |
278+
| go_pq_cdc_snapshot_total_rows | Total number of rows read during snapshot. | slot_name, host | Counter |
279+
| go_pq_cdc_snapshot_duration_seconds | Duration of the last snapshot operation in seconds. | slot_name, host | Gauge |
280+
201281
You can also use all cdc related metrics explained [here](https://github.com/Trendyol/go-pq-cdc#exposed-metrics).
202282
All cdc related metrics are automatically injected. It means you don't need to do anything.
203283

config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ type RejectionLog struct {
3030
}
3131

3232
type Config struct {
33-
CDC config.Config
3433
Elasticsearch Elasticsearch
34+
CDC config.Config
3535
}
3636

3737
func (c *Config) SetDefault() {

connector.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@ import (
2424

2525
type Connector interface {
2626
Start(ctx context.Context)
27+
WaitUntilReady(ctx context.Context) error
2728
Close()
2829
}
2930

3031
type connector struct {
31-
partitionCache sync.Map
32-
handler Handler
3332
responseHandler elasticsearch.ResponseHandler
34-
cfg *config.Config
35-
esClient *es.Client
3633
cdc cdc.Connector
3734
bulk bulk.Indexer
35+
handler Handler
36+
cfg *config.Config
37+
esClient *es.Client
38+
readyCh chan struct{}
39+
partitionCache sync.Map
3840
metrics []prometheus.Collector
3941
}
4042

@@ -44,6 +46,7 @@ func NewConnector(ctx context.Context, cfg config.Config, handler Handler, optio
4446
esConnector := &connector{
4547
cfg: &cfg,
4648
handler: handler,
49+
readyCh: make(chan struct{}, 1),
4750
}
4851

4952
Options(options).Apply(esConnector)
@@ -77,18 +80,48 @@ func NewConnector(ctx context.Context, cfg config.Config, handler Handler, optio
7780
}
7881

7982
func (c *connector) Start(ctx context.Context) {
83+
// Snapshot-only mode: different flow since upstream CDC exits immediately
84+
if c.cfg.CDC.IsSnapshotOnlyMode() {
85+
logger.Info("starting snapshot-only mode")
86+
logger.Info("bulk process started")
87+
c.bulk.StartBulk()
88+
89+
// Signal ready immediately since there's no CDC to wait for
90+
c.readyCh <- struct{}{}
91+
92+
// Start CDC synchronously - it will execute snapshot and return
93+
c.cdc.Start(ctx)
94+
logger.Info("snapshot-only mode completed")
95+
return
96+
}
97+
98+
// Normal CDC mode: async flow
8099
go func() {
81100
logger.Info("waiting for connector start...")
82101
if err := c.cdc.WaitUntilReady(ctx); err != nil {
83102
panic(err)
84103
}
85104
logger.Info("bulk process started")
86105
c.bulk.StartBulk()
106+
c.readyCh <- struct{}{}
87107
}()
88108
c.cdc.Start(ctx)
89109
}
90110

111+
func (c *connector) WaitUntilReady(ctx context.Context) error {
112+
select {
113+
case <-c.readyCh:
114+
return nil
115+
case <-ctx.Done():
116+
return ctx.Err()
117+
}
118+
}
119+
91120
func (c *connector) Close() {
121+
if !isClosed(c.readyCh) {
122+
close(c.readyCh)
123+
}
124+
92125
c.cdc.Close()
93126
c.bulk.Close()
94127
}
@@ -102,6 +135,8 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
102135
msg = NewUpdateMessage(c.esClient, m)
103136
case *format.Delete:
104137
msg = NewDeleteMessage(c.esClient, m)
138+
case *format.Snapshot:
139+
msg = NewSnapshotMessage(c.esClient, m)
105140
default:
106141
return
107142
}
@@ -212,3 +247,13 @@ func (c *connector) findParentTable(tableNamespace, tableName string) string {
212247

213248
return ""
214249
}
250+
251+
func isClosed[T any](ch <-chan T) bool {
252+
select {
253+
case <-ch:
254+
return true
255+
default:
256+
}
257+
258+
return false
259+
}

0 commit comments

Comments
 (0)