diff --git a/README.md b/README.md index 3f9f2f3b..cc5e3f03 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,8 @@ RedisShake is a powerful tool for Redis data transformation and migration, offer 6. **Advanced Data Processing**: Enables custom [script-based data transformation](https://tair-opensource.github.io/RedisShake/zh/filter/function.html) and easy-to-use [data filter rules](https://tair-opensource.github.io/RedisShake/zh/filter/filter.html). +7. **New Features**: Supported acyclic two-way synchronization, enabling bidirectional data migration between Redis instances. But the two-way synchronization feature only supports sync_reader mode, not rdb_reader and scan_reader mode. You must confirm that the redis mode is consistent with the cluster-enable configuration, otherwise the two-way synchronization feature will not work. + ## How to Get RedisShake 1. Download from [Releases](https://github.com/tair-opensource/RedisShake/releases). @@ -67,6 +69,21 @@ address = "127.0.0.1:6380" block_key_prefix = ["temp:", "cache:"] ``` +2. Using the acyclic two-way synchronization feature, you need to add the following configuration to the `shake.toml` file: +```toml +cluster = false # Set to true if the source is a Redis cluster +address = "127.0.0.1:9001" # For clusters, specify the address of any cluster node; use the master or slave address in master-slave mode +sync_rdb = true # Set to false if RDB synchronization is not required +sync_aof = true # Set to false if AOF synchronization is not required +bisync = true # set to true for Data Synchronization Between Different Two Redis (Preventing Infinite Sync Loops) +#prefix = "bsy&88@*" # as prefix of marking keys + + +[redis_writer] +cluster = true # set to true if target is a redis cluster +address = "127.0.0.1:8001" # when cluster is true, set address to one of the cluster node +``` + 2. Run RedisShake: ```shell ./redis-shake shake.toml diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index b72ac35e..ab665a11 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -3,9 +3,11 @@ package main import ( "RedisShake/internal/client" "context" + "github.com/cespare/xxhash/v2" _ "net/http/pprof" "os" "os/signal" + "strconv" "strings" "sync/atomic" "syscall" @@ -66,6 +68,8 @@ func main() { // create reader var theReader reader.Reader + var isolation entry.Isolation + switch { case v.IsSet("sync_reader"): opts := new(reader.SyncReaderOptions) @@ -74,6 +78,12 @@ func main() { if err != nil { log.Panicf("failed to read the SyncReader config entry. err: %v", err) } + isolation.OriginAddr = opts.Address + isolation.OriginUser = opts.Username + isolation.OriginIsCluster = opts.Cluster + isolation.BySync = opts.BiSync + isolation.Prefix = opts.Prefix + if opts.Cluster { log.Infof("create SyncClusterReader") log.Infof("* address (should be the address of one node in the Redis cluster): %s", opts.Address) @@ -146,6 +156,7 @@ func main() { if err != nil { log.Panicf("failed to read the FileWriter config entry. err: %v", err) } + isolation.TargetAddr = opts.Filepath theWriter = writer.NewFileWriter(ctx, opts) case v.IsSet("redis_writer"): opts := new(writer.RedisWriterOptions) @@ -154,6 +165,10 @@ func main() { if err != nil { log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err) } + isolation.TargetUser = opts.Username + isolation.TargetAddr = opts.Address + isolation.TargetIsCLuster = opts.Cluster + if opts.OffReply && config.Opt.Advanced.RDBRestoreCommandBehavior == "panic" { log.Panicf("the RDBRestoreCommandBehavior can't be 'panic' when the server not reply to commands") } @@ -230,17 +245,144 @@ func main() { entries := luaRuntime.RunFunction(e) log.Debugf("function after: %v", entries) + var is_rep_key = false // Is it a loopback command? + // write for _, theEntry := range entries { theEntry.Parse() - theWriter.Write(theEntry) + if isolation.BySync { + argv := theEntry.Argv + + if len(theEntry.Keys) == 0 && argv[0] != "FLUSHALL" && argv[0] != "FLUSHDB" { + continue + } + + command := strings.Join(argv, " ") + timestamp := time.Now().Unix() + bisyncCommand := []string{ + "SETEX", // SETEX + isolation.Prefix + ":" + strconv.FormatUint(xxhash.Sum64String(command), 10), // new key + "60", // ttl + isolation.OriginAddr + "@" + strconv.FormatInt(timestamp, 10), // value + } + + if isolation.TargetIsCLuster && isolation.OriginIsCluster { + clusterReader, ok := theReader.(*reader.SyncClusterReader) + if !ok { + log.Panicf("Type assertion failed: reader is not *syncClusterReader") + } + clusterWrite, ok := theWriter.(*writer.RedisClusterWriter) + if !ok { + log.Panicf("Type assertion failed: writer is not *RedisClusterWriter") + } + + value, err := clusterReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result() + //If the source key does not begin with the prefix and the command hash does not exist, immediately mark it in the target cluster. + if err != nil && value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) { + if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" { + log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr) + } else { + log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr) + } + + seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64) + err = clusterWrite.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err() + if err != nil { + log.Panicf("SETEX failed: %w", err) + } + } else { + is_rep_key = true + } + } else if !isolation.TargetIsCLuster && !isolation.OriginIsCluster { + standaloneReader, ok := theReader.(*reader.SyncStandaloneReader) + if !ok { + log.Panicf("Type assertion failed: reader is not *SyncStandaloneReader") + } + standalonWriter, ok := theWriter.(*writer.RedisStandaloneWriter) + if !ok { + log.Panicf("Type assertion failed: writer is not *RedisStandaloneWriter") + } + + value, _ := standaloneReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result() + if value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) { + if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" { + log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr) + } else { + log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr) + } + seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64) + err = standalonWriter.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err() + if err != nil { + log.Panicf("SETEX failed: %w", err) + } + } else { + is_rep_key = true + } + } else if isolation.TargetIsCLuster && !isolation.OriginIsCluster { + + standaloneReader, ok := theReader.(*reader.SyncStandaloneReader) + if !ok { + log.Panicf("Type assertion failed: reader is not *SyncStandaloneReader") + } + + clusterWrite, ok := theWriter.(*writer.RedisClusterWriter) + if !ok { + log.Panicf("Type assertion failed: writer is not *RedisClusterWriter") + } + + value, err := standaloneReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result() + if err != nil && value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) { + if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" { + log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr) + } else { + log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr) + } + seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64) + err = clusterWrite.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err() + if err != nil { + log.Panicf("SETEX failed: %w", err) + } + } else { + is_rep_key = true + } + + } else { + clusterReader, ok := theReader.(*reader.SyncClusterReader) + if !ok { + log.Panicf("Type assertion failed: reader is not *syncClusterReader") + } + standalonWriter, ok := theWriter.(*writer.RedisStandaloneWriter) + if !ok { + log.Panicf("Type assertion failed: writer is not *RedisStandaloneWriter") + } + value, _ := clusterReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result() + if value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) { + if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" { + log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr) + } else { + log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr) + } + // immediately set the command of BISYNC to the target cluster + seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64) + err = standalonWriter.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err() + if err != nil { + log.Panicf("SETEX failed: %w", err) + } + } else { + is_rep_key = true + } + } + } - // update writer status - if config.Opt.Advanced.StatusPort != 0 { - status.AddWriteCount(theEntry.CmdName) + if !is_rep_key { + theWriter.Write(theEntry) + // update writer status + if config.Opt.Advanced.StatusPort != 0 { + status.AddWriteCount(theEntry.CmdName) + } + // update log entry count + atomic.AddUint64(&logEntryCount.WriteCount, 1) } - // update log entry count - atomic.AddUint64(&logEntryCount.WriteCount, 1) } } readerDone <- true diff --git a/docs/src/zh/guide/architecture.md b/docs/src/zh/guide/architecture.md index 8326b073..371af8d9 100644 --- a/docs/src/zh/guide/architecture.md +++ b/docs/src/zh/guide/architecture.md @@ -18,6 +18,9 @@ Cluster Reader 即集群读入类,其根据源端分片数量创建同等数量的 Standalone Reader,每个 Standalone Reader 开启一个协程(Goroutinue)并行的从每个源端分片进行读入,并将数据存入相应的管道(Reader Channel)交付给下一环节处理。 +### 双向无环数据同步实现 +采用目标 Redis 标记法,当 Redis-A 向 Redis-B 同步数据【set name jay】时,首先检查源Redis-A是否存在标记,如果不存在则将命令进行 Hash 为MD5码 【8we34o4ew9msd03i83sfs01syer】,向目标 Redis-B 写入标记【 setex prefix:8we34o4ew9msd03i83sfs01syer 120 源Redis地址@时间戳】。 + ### Main Main 即主函数,其根据 Reader Channel 数量开启多个协程,并行的对管道中数据分别执行 Parse、Filter、Function 操作,再调用 Cluster Writer 的 Write 方法,将数据分发给写入端。 diff --git a/go.mod b/go.mod index dcb810fc..f58f5181 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-stack/stack v1.8.1 github.com/gofrs/flock v0.8.1 github.com/mcuadros/go-defaults v1.2.0 + github.com/redis/go-redis/v9 v9.12.0 github.com/rs/zerolog v1.28.0 github.com/spf13/viper v1.18.1 github.com/stretchr/testify v1.8.4 @@ -16,8 +17,10 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect diff --git a/go.sum b/go.sum index 19b469d3..6e6e0ffd 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,9 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -5,6 +11,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -47,6 +55,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.12.0 h1:XlVPGlflh4nxfhsNXPA8Qp6EmEfTo0rp8oaBzPipXnU= +github.com/redis/go-redis/v9 v9.12.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/internal/client/func.go b/internal/client/func.go index 1957e153..0b93beae 100644 --- a/internal/client/func.go +++ b/internal/client/func.go @@ -1,11 +1,14 @@ package client import ( - "bytes" - "strings" - "RedisShake/internal/client/proto" "RedisShake/internal/log" + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "strings" ) func EncodeArgv(argv []string, buf *bytes.Buffer) { @@ -26,3 +29,49 @@ func (r *Redis) IsCluster() bool { reply := r.DoWithStringReply("INFO", "Cluster") return strings.Contains(reply, "cluster_enabled:1") } + +// createTLSConfig 根据配置创建 TLS 配置 +func CreateTLSConfig(keyFilePath, CACertFilePath, certFilePath string) (*tls.Config, error) { + if keyFilePath == "" && CACertFilePath == "" && certFilePath == "" { + return &tls.Config{ + InsecureSkipVerify: true, // 如果没有配置,默认不验证 + }, nil + } + + config := &tls.Config{ + MinVersion: tls.VersionTLS12, // 设置最低 TLS 版本 + } + + // 加载 CA 证书(如果配置) + if CACertFilePath != "" { + caCert, err := ioutil.ReadFile(CACertFilePath) + if err != nil { + return nil, err + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to append CA cert") + } + config.RootCAs = caCertPool + } + + // 加载客户端证书和私钥(如果配置,用于双向认证) + if certFilePath != "" && keyFilePath != "" { + cert, err := tls.LoadX509KeyPair(certFilePath, keyFilePath) + if err != nil { + return nil, err + } + config.Certificates = []tls.Certificate{cert} + } + + // 如果配置了CA证书,则启用服务器证书验证 + if CACertFilePath != "" { + config.InsecureSkipVerify = false + } else { + config.InsecureSkipVerify = true + log.Warnf("No CA certificate provided, using insecure TLS connection") + } + + return config, nil +} diff --git a/internal/entry/isolation.go b/internal/entry/isolation.go new file mode 100644 index 00000000..fd02e9f7 --- /dev/null +++ b/internal/entry/isolation.go @@ -0,0 +1,12 @@ +package entry + +type Isolation struct { + OriginAddr string + OriginUser string "Default" + TargetAddr string + TargetUser string "Default" + Prefix string + OriginIsCluster bool + TargetIsCLuster bool + BySync bool +} diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 9e0f6df7..f9e8c4d1 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -1,17 +1,20 @@ package reader import ( - "context" - "fmt" - + "RedisShake/internal/client" "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/utils" + "context" + "crypto/tls" + "fmt" + "github.com/redis/go-redis/v9" ) -type syncClusterReader struct { - readers []Reader - statusId int +type SyncClusterReader struct { + readers []Reader + statusId int + OriginClient *redis.ClusterClient } func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader { @@ -20,16 +23,32 @@ func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader { for _, address := range addresses { log.Debugf("%s", address) } - rd := &syncClusterReader{} + rd := &SyncClusterReader{} for _, address := range addresses { theOpts := *opts theOpts.Address = address rd.readers = append(rd.readers, NewSyncStandaloneReader(ctx, &theOpts)) } + + var tlsConfig *tls.Config + if opts.Tls { + tlsConfig, err := client.CreateTLSConfig(opts.TlsConfig.KeyFilePath, opts.TlsConfig.CACertFilePath, opts.TlsConfig.CertFilePath) + _ = tlsConfig + if err != nil { + log.Panicf("failed to load Tls config.") + } + } + rd.OriginClient = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{opts.Address}, + Username: opts.Username, + Password: opts.Password, + TLSConfig: tlsConfig, + }) + return rd } -func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry { +func (rd *SyncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry { chs := make([]chan *entry.Entry, 0) for _, r := range rd.readers { ch := r.StartRead(ctx) @@ -38,7 +57,7 @@ func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry return chs } -func (rd *syncClusterReader) Status() interface{} { +func (rd *SyncClusterReader) Status() interface{} { stat := make([]interface{}, 0) for _, r := range rd.readers { stat = append(stat, r.Status()) @@ -46,13 +65,13 @@ func (rd *syncClusterReader) Status() interface{} { return stat } -func (rd *syncClusterReader) StatusString() string { +func (rd *SyncClusterReader) StatusString() string { rd.statusId += 1 rd.statusId %= len(rd.readers) return fmt.Sprintf("src-%d, %s", rd.statusId, rd.readers[rd.statusId].StatusString()) } -func (rd *syncClusterReader) StatusConsistent() bool { +func (rd *SyncClusterReader) StatusConsistent() bool { for _, r := range rd.readers { if !r.StatusConsistent() { return false diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 4d872a47..4576a2db 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -4,9 +4,11 @@ import ( "bufio" "bytes" "context" + "crypto/tls" "encoding/json" "errors" "fmt" + "github.com/redis/go-redis/v9" "io" "os" "path/filepath" @@ -40,6 +42,8 @@ type SyncReaderOptions struct { PreferReplica bool `mapstructure:"prefer_replica" default:"false"` TryDiskless bool `mapstructure:"try_diskless" default:"false"` Sentinel client.SentinelOptions `mapstructure:"sentinel"` + BiSync bool `mapstructure:"bisync" default:"true"` + Prefix string `mapstructure:"prefix" default:"bsy&88@*"` } const RDB_EOF_MARKER_LEN = 40 @@ -95,10 +99,10 @@ func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) { return json.Marshal(aliasStat(s)) } -type syncStandaloneReader struct { +type SyncStandaloneReader struct { ctx context.Context opts *SyncReaderOptions - client *client.Redis + Client *client.Redis ch chan *entry.Entry DbId int @@ -107,23 +111,40 @@ type syncStandaloneReader struct { // version info isDiskless bool + + OriginClient *redis.Client } func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader { - r := new(syncStandaloneReader) + r := new(SyncStandaloneReader) r.opts = opts - r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.TlsConfig, opts.PreferReplica) + r.Client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.TlsConfig, opts.PreferReplica) r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) r.stat.Address = opts.Address r.stat.Status = kHandShake r.stat.Dir = utils.GetAbsPath(r.stat.Name) utils.CreateEmptyDir(r.stat.Dir) + var tlsConfig *tls.Config + if opts.Tls { + tlsConfig, err := client.CreateTLSConfig(opts.TlsConfig.KeyFilePath, opts.TlsConfig.CACertFilePath, opts.TlsConfig.CertFilePath) + _ = tlsConfig + if err != nil { + log.Panicf("failed to load Tls config.") + } + } + r.OriginClient = redis.NewClient(&redis.Options{ + Addr: opts.Address, + Username: opts.Username, + Password: opts.Password, + TLSConfig: tlsConfig, + }) + return r } -func (r *syncStandaloneReader) supportPSync() bool { - reply := r.client.DoWithStringReply("info", "server") +func (r *SyncStandaloneReader) supportPSync() bool { + reply := r.Client.DoWithStringReply("info", "server") for _, line := range strings.Split(reply, "\n") { if strings.HasPrefix(line, "redis_version:") { version := strings.Split(line, ":")[1] @@ -142,7 +163,7 @@ func (r *syncStandaloneReader) supportPSync() bool { return true } -func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry { +func (r *SyncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry { if r.supportPSync() { // Redis version >= 2.8 return r.StartReadWithPSync(ctx) } else { // Redis version < 2.8 @@ -151,7 +172,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr } // StartReadWithPSync is used in Redis version >= 2.8 -func (r *syncStandaloneReader) StartReadWithPSync(ctx context.Context) []chan *entry.Entry { +func (r *SyncStandaloneReader) StartReadWithPSync(ctx context.Context) []chan *entry.Entry { r.ctx = ctx r.ch = make(chan *entry.Entry, 1024) go func() { @@ -175,7 +196,7 @@ func (r *syncStandaloneReader) StartReadWithPSync(ctx context.Context) []chan *e } // StartReadWithSync is only used in Redis version < 2.8 -func (r *syncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *entry.Entry { +func (r *SyncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *entry.Entry { r.ctx = ctx r.ch = make(chan *entry.Entry, 1024) go func() { @@ -196,18 +217,18 @@ func (r *syncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *en return []chan *entry.Entry{r.ch} } -func (r *syncStandaloneReader) sendReplconfListenPort() { +func (r *SyncStandaloneReader) sendReplconfListenPort() { // use status_port as redis-shake port argv := []interface{}{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} - r.client.Send(argv...) - _, err := r.client.Receive() + r.Client.Send(argv...) + _, err := r.Client.Receive() if err != nil { log.Warnf("[%s] send replconf command to redis server failed. error=[%v]", r.stat.Name, err) } } // When BGSAVE is triggered by the source Redis itself, synchronization is blocked, so need to check it -func (r *syncStandaloneReader) checkBgsaveInProgress() { +func (r *SyncStandaloneReader) checkBgsaveInProgress() { for { select { case <-r.ctx.Done(): @@ -215,8 +236,8 @@ func (r *syncStandaloneReader) checkBgsaveInProgress() { runtime.Goexit() // stop goroutine default: argv := []interface{}{"INFO", "persistence"} - r.client.Send(argv...) - receiveString := r.client.ReceiveString() + r.Client.Send(argv...) + receiveString := r.Client.ReceiveString() if strings.Contains(receiveString, "rdb_bgsave_in_progress:1") || strings.Contains(receiveString, "aof_rewrite_in_progress:1") { log.Warnf("[%s] source db is doing bgsave, waiting for a while.", r.stat.Name) } else { @@ -228,10 +249,10 @@ func (r *syncStandaloneReader) checkBgsaveInProgress() { } } -func (r *syncStandaloneReader) sendPSync() { +func (r *SyncStandaloneReader) sendPSync() { if r.opts.TryDiskless { argv := []interface{}{"REPLCONF", "CAPA", "EOF"} - reply := r.client.DoWithStringReply(argv...) + reply := r.Client.DoWithStringReply(argv...) if reply != "OK" { log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply) } else { @@ -244,7 +265,7 @@ func (r *syncStandaloneReader) sendPSync() { if config.Opt.Advanced.AwsPSync != "" { argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} } - r.client.Send(argv...) + r.Client.Send(argv...) // format: \n\n\n+\r\n for { @@ -254,19 +275,19 @@ func (r *syncStandaloneReader) sendPSync() { runtime.Goexit() // stop goroutine default: } - peakByte, err := r.client.Peek() + peakByte, err := r.Client.Peek() if err != nil { log.Panicf(err.Error()) } if peakByte != '\n' { break } - _, err = r.client.ReadByte() + _, err = r.Client.ReadByte() if err != nil { log.Panicf("[%s] pop byte failed. error=[%v]", r.stat.Name, err) } } - reply := r.client.ReceiveString() + reply := r.Client.ReceiveString() masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2]) if err != nil { log.Panicf(err.Error()) @@ -274,10 +295,10 @@ func (r *syncStandaloneReader) sendPSync() { r.stat.AofReceivedOffset = int64(masterOffset) } -func (r *syncStandaloneReader) sendSync() { +func (r *SyncStandaloneReader) sendSync() { if r.opts.TryDiskless { argv := []interface{}{"REPLCONF", "CAPA", "EOF"} - reply := r.client.DoWithStringReply(argv...) + reply := r.Client.DoWithStringReply(argv...) if reply != "OK" { log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply) } @@ -288,7 +309,7 @@ func (r *syncStandaloneReader) sendSync() { if config.Opt.Advanced.AwsPSync != "" { argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} } - r.client.Send(argv...) + r.Client.Send(argv...) // format: \n\n\n+\r\n for { @@ -298,28 +319,28 @@ func (r *syncStandaloneReader) sendSync() { runtime.Goexit() // stop goroutine default: } - peekByte, err := r.client.Peek() + peekByte, err := r.Client.Peek() if err != nil { log.Panicf(err.Error()) } if peekByte != '\n' { break } - _, err = r.client.ReadByte() + _, err = r.Client.ReadByte() if err != nil { log.Panicf("[%s] pop byte failed. error=[%v]", r.stat.Name, err) } } } -func (r *syncStandaloneReader) receiveRDB() string { +func (r *SyncStandaloneReader) receiveRDB() string { log.Debugf("[%s] source db is doing bgsave.", r.stat.Name) r.stat.Status = kWaitBgsave timeStart := time.Now() // format: \n\n\n$\r\n // if source support repl-diskless-sync: \n\n\n$EOF:<40 characters EOF marker>\r\nstream data for { - b, err := r.client.ReadByte() + b, err := r.Client.ReadByte() if err != nil { log.Panicf(err.Error()) } @@ -332,7 +353,7 @@ func (r *syncStandaloneReader) receiveRDB() string { break } log.Debugf("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds()) - marker, err := r.client.ReadString('\n') + marker, err := r.Client.ReadString('\n') if err != nil { log.Panicf(err.Error()) } @@ -366,7 +387,7 @@ func (r *syncStandaloneReader) receiveRDB() string { return rdbFilePath } -func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Writer) { +func (r *SyncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Writer) { const bufSize int64 = 32 * 1024 * 1024 // 32MB buf := make([]byte, bufSize) @@ -380,7 +401,7 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write for { copy(buf, lastBytes) // copy previous tail bytes to head of buf - nread, err := r.client.Read(buf[len(lastBytes):]) + nread, err := r.Client.Read(buf[len(lastBytes):]) if err != nil { log.Panicf(err.Error()) } @@ -412,7 +433,7 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write } } -func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Writer) { +func (r *SyncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Writer) { length, err := strconv.ParseInt(marker, 10, 64) if err != nil { log.Panicf(err.Error()) @@ -428,7 +449,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr if remainder < readOnce { readOnce = remainder } - n, err := r.client.Read(buf[:readOnce]) + n, err := r.Client.Read(buf[:readOnce]) if err != nil { log.Panicf(err.Error()) } @@ -442,7 +463,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr } } -func (r *syncStandaloneReader) receiveAOF() { +func (r *SyncStandaloneReader) receiveAOF() { log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name) aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset) defer aofWriter.Close() @@ -452,7 +473,7 @@ func (r *syncStandaloneReader) receiveAOF() { case <-r.ctx.Done(): return default: - n, err := r.client.Read(buf) + n, err := r.Client.Read(buf) if err != nil { log.Panicf(err.Error()) } @@ -463,7 +484,7 @@ func (r *syncStandaloneReader) receiveAOF() { } } -func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { +func (r *SyncStandaloneReader) sendRDB(rdbFilePath string) { // start parse rdb log.Debugf("[%s] start sending RDB to target", r.stat.Name) r.stat.Status = kSyncRdb @@ -478,7 +499,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { log.Debugf("[%s] delete RDB file", r.stat.Name) } -func (r *syncStandaloneReader) sendAOF(offset int64) { +func (r *SyncStandaloneReader) sendAOF(offset int64) { aofReader := rotate.NewAOFReader(r.ctx, r.stat.Name, r.stat.Dir, offset) defer aofReader.Close() protoReader := proto.NewReader(bufio.NewReader(aofReader)) @@ -540,7 +561,7 @@ func (r *syncStandaloneReader) sendAOF(offset int64) { } // sendReplconfAck sends replconf ack to master to maintain heartbeat between redis-shake and source redis. -func (r *syncStandaloneReader) sendReplconfAck() { +func (r *SyncStandaloneReader) sendReplconfAck() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -549,17 +570,17 @@ func (r *syncStandaloneReader) sendReplconfAck() { return case <-ticker.C: if r.stat.AofReceivedOffset != 0 { - r.client.Send("replconf", "ack", strconv.FormatInt(r.stat.AofReceivedOffset, 10)) + r.Client.Send("replconf", "ack", strconv.FormatInt(r.stat.AofReceivedOffset, 10)) } } } } -func (r *syncStandaloneReader) Status() interface{} { +func (r *SyncStandaloneReader) Status() interface{} { return r.stat } -func (r *syncStandaloneReader) StatusString() string { +func (r *SyncStandaloneReader) StatusString() string { if r.stat.Status == kSyncRdb { return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes)) } @@ -575,7 +596,7 @@ func (r *syncStandaloneReader) StatusString() string { return string(r.stat.Status) } -func (r *syncStandaloneReader) StatusConsistent() bool { +func (r *SyncStandaloneReader) StatusConsistent() bool { return r.stat.AofReceivedOffset != 0 && r.stat.AofReceivedOffset == r.stat.AofSentOffset && len(r.ch) == 0 diff --git a/internal/reader/sync_standalone_reader_test.go b/internal/reader/sync_standalone_reader_test.go index 3485f092..9cd44bba 100644 --- a/internal/reader/sync_standalone_reader_test.go +++ b/internal/reader/sync_standalone_reader_test.go @@ -6,6 +6,7 @@ import ( "testing" "RedisShake/internal/log" + "github.com/stretchr/testify/require" ) @@ -55,7 +56,7 @@ func Test_syncStandaloneReader_Status(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := &syncStandaloneReader{ + r := &SyncStandaloneReader{ ctx: tt.fields.ctx, opts: tt.fields.opts, } diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 40f193f5..c7359781 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -2,8 +2,11 @@ package writer import ( "context" + "crypto/tls" + "github.com/redis/go-redis/v9" "sync" + "RedisShake/internal/client" "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/utils" @@ -12,19 +15,36 @@ import ( const KeySlots = 16384 type RedisClusterWriter struct { - addresses []string - writers []Writer - router [KeySlots]Writer - ch chan *entry.Entry - chWg sync.WaitGroup - stat []interface{} + addresses []string + writers []Writer + router [KeySlots]Writer + ch chan *entry.Entry + chWg sync.WaitGroup + stat []interface{} + TargetClient *redis.ClusterClient } func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer { - rw := new(RedisClusterWriter) + rw := &RedisClusterWriter{} rw.loadClusterNodes(ctx, opts) rw.ch = make(chan *entry.Entry, 1024) log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses) + + var tlsConfig *tls.Config + if opts.Tls { + tlsConfig, err := client.CreateTLSConfig(opts.TlsConfig.KeyFilePath, opts.TlsConfig.CACertFilePath, opts.TlsConfig.CertFilePath) + _ = tlsConfig + if err != nil { + log.Panicf("failed to load Tls config.") + } + } + rw.TargetClient = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{opts.Address}, + Username: opts.Username, + TLSConfig: tlsConfig, + Password: opts.Password, + }) + return rw } @@ -34,6 +54,9 @@ func (r *RedisClusterWriter) Close() { for _, writer := range r.writers { writer.Close() } + if r.TargetClient != nil { + r.TargetClient.Close() + } } func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWriterOptions) { diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index b8070947..3e3db5e8 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -2,8 +2,10 @@ package writer import ( "context" + "crypto/tls" "errors" "fmt" + "github.com/redis/go-redis/v9" "go.uber.org/ratelimit" "strconv" "strings" @@ -29,9 +31,9 @@ type RedisWriterOptions struct { Sentinel client.SentinelOptions `mapstructure:"sentinel"` } -type redisStandaloneWriter struct { +type RedisStandaloneWriter struct { address string - client *client.Redis + Client *client.Redis DbId int chWaitReply chan *entry.Entry @@ -45,27 +47,44 @@ type redisStandaloneWriter struct { UnansweredBytes int64 `json:"unanswered_bytes"` UnansweredEntries int64 `json:"unanswered_entries"` } + TargetClient *redis.Client } func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Writer { - rw := new(redisStandaloneWriter) + rw := new(RedisStandaloneWriter) rw.address = opts.Address rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) - rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.TlsConfig, false) + rw.Client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.TlsConfig, false) rw.ch = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit) if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true - rw.client.Send("CLIENT", "REPLY", "OFF") + rw.Client.Send("CLIENT", "REPLY", "OFF") } else { rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit*2) rw.chWaitWg.Add(1) go rw.processReply() } + + var tlsConfig *tls.Config + if opts.Tls { + tlsConfig, err := client.CreateTLSConfig(opts.TlsConfig.KeyFilePath, opts.TlsConfig.CACertFilePath, opts.TlsConfig.CertFilePath) + _ = tlsConfig + if err != nil { + log.Panicf("failed to load Tls config.") + } + } + rw.TargetClient = redis.NewClient(&redis.Options{ + Addr: opts.Address, + Username: opts.Username, + Password: opts.Password, + TLSConfig: tlsConfig, + }) + return rw } -func (w *redisStandaloneWriter) Close() { +func (w *RedisStandaloneWriter) Close() { if !w.offReply { close(w.ch) w.chWg.Wait() @@ -74,20 +93,20 @@ func (w *redisStandaloneWriter) Close() { } } -func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry { +func (w *RedisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry { w.chWg = sync.WaitGroup{} w.chWg.Add(1) go w.processWrite(ctx) return w.ch } -func (w *redisStandaloneWriter) Write(e *entry.Entry) { +func (w *RedisStandaloneWriter) Write(e *entry.Entry) { w.ch <- e } -func (w *redisStandaloneWriter) switchDbTo(newDbId int) { +func (w *RedisStandaloneWriter) switchDbTo(newDbId int) { log.Debugf("[%s] switch db to [%d]", w.stat.Name, newDbId) - w.client.Send("select", strconv.Itoa(newDbId)) + w.Client.Send("select", strconv.Itoa(newDbId)) w.DbId = newDbId if !w.offReply { w.chWaitReply <- &entry.Entry{ @@ -97,7 +116,7 @@ func (w *redisStandaloneWriter) switchDbTo(newDbId int) { } } -func (w *redisStandaloneWriter) processWrite(ctx context.Context) { +func (w *RedisStandaloneWriter) processWrite(ctx context.Context) { ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() @@ -109,11 +128,11 @@ func (w *redisStandaloneWriter) processWrite(ctx context.Context) { case <-ctx.Done(): // do nothing until w.ch is closed case <-ticker.C: - w.client.Flush() + w.Client.Flush() case e, ok := <-w.ch: if !ok { // clean up and exit - w.client.Flush() + w.Client.Flush() w.chWg.Done() return } @@ -132,20 +151,20 @@ func (w *redisStandaloneWriter) processWrite(ctx context.Context) { select { case w.chWaitReply <- e: default: - w.client.Flush() + w.Client.Flush() w.chWaitReply <- e } atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) atomic.AddInt64(&w.stat.UnansweredEntries, 1) } - w.client.SendBytesBuff(bytes) + w.Client.SendBytesBuff(bytes) } } } -func (w *redisStandaloneWriter) processReply() { +func (w *RedisStandaloneWriter) processReply() { for e := range w.chWaitReply { - reply, err := w.client.Receive() + reply, err := w.Client.Receive() log.Debugf("[%s] receive reply. reply=[%v], cmd=[%s]", w.stat.Name, reply, e.String()) // It's good to skip the nil error since some write commands will return the null reply. For example, @@ -170,14 +189,14 @@ func (w *redisStandaloneWriter) processReply() { w.chWaitWg.Done() } -func (w *redisStandaloneWriter) Status() interface{} { +func (w *RedisStandaloneWriter) Status() interface{} { return w.stat } -func (w *redisStandaloneWriter) StatusString() string { +func (w *RedisStandaloneWriter) StatusString() string { return fmt.Sprintf("[%s]: unanswered_entries=%d", w.stat.Name, atomic.LoadInt64(&w.stat.UnansweredEntries)) } -func (w *redisStandaloneWriter) StatusConsistent() bool { +func (w *RedisStandaloneWriter) StatusConsistent() bool { return atomic.LoadInt64(&w.stat.UnansweredBytes) == 0 && atomic.LoadInt64(&w.stat.UnansweredEntries) == 0 } diff --git a/shake.toml b/shake.toml index 957082ab..bd01b253 100644 --- a/shake.toml +++ b/shake.toml @@ -1,6 +1,6 @@ [sync_reader] cluster = false # Set to true if the source is a Redis cluster -address = "127.0.0.1:6379" # For clusters, specify the address of any cluster node; use the master or slave address in master-slave mode +address = "127.0.0.1:9001" # For clusters, specify the address of any cluster node; use the master or slave address in master-slave mode username = "" # Keep empty if ACL is not in use password = "" # Keep empty if no authentication is required tls = false # Set to true to enable TLS if needed @@ -8,6 +8,8 @@ sync_rdb = true # Set to false if RDB synchronization is not required sync_aof = true # Set to false if AOF synchronization is not required prefer_replica = false # Set to true to sync from a replica node try_diskless = false # Set to true for diskless sync if the source has repl-diskless-sync=yes +bisync = true # set to true for Data Synchronization Between Different Two Redis (Preventing Infinite Sync Loops) +#prefix = "bsy&88@*" # as prefix of marking keys #[scan_reader] #cluster = false # set to true if source is a redis cluster @@ -31,8 +33,8 @@ try_diskless = false # Set to true for diskless sync if the source has rep # timestamp = 0 # subsecond [redis_writer] -cluster = false # set to true if target is a redis cluster -address = "127.0.0.1:6380" # when cluster is true, set address to one of the cluster node +cluster = true # set to true if target is a redis cluster +address = "127.0.0.1:8001" # when cluster is true, set address to one of the cluster node username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false