Skip to content

Commit 6332cfd

Browse files
ndyakovCopilot
andauthored
feat(maintnotif): lazy cluster topology reload (#3614)
* lazy cluster topology reload * fix discrepancies between options structs * Update osscluster_lazy_reload_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update osscluster.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent d8f1d60 commit 6332cfd

File tree

5 files changed

+491
-126
lines changed

5 files changed

+491
-126
lines changed

commands_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8922,27 +8922,37 @@ var _ = Describe("Commands", func() {
89228922
const key = "latency-monitor-threshold"
89238923

89248924
old := client.ConfigGet(ctx, key).Val()
8925-
client.ConfigSet(ctx, key, "1")
8925+
// Use a higher threshold (100ms) to avoid capturing normal operations
8926+
// that could cause flakiness due to timing variations
8927+
client.ConfigSet(ctx, key, "100")
89268928
defer client.ConfigSet(ctx, key, old[key])
89278929

89288930
result, err := client.Latency(ctx).Result()
89298931
Expect(err).NotTo(HaveOccurred())
89308932
Expect(len(result)).Should(Equal(0))
89318933

8932-
err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err()
8934+
// Use a longer sleep (150ms) to ensure it exceeds the 100ms threshold
8935+
err = client.Do(ctx, "DEBUG", "SLEEP", 0.15).Err()
89338936
Expect(err).NotTo(HaveOccurred())
89348937

89358938
result, err = client.Latency(ctx).Result()
89368939
Expect(err).NotTo(HaveOccurred())
8937-
Expect(len(result)).Should(Equal(1))
8940+
Expect(len(result)).Should(BeNumerically(">=", 1))
89388941

89398942
// reset latency by event name
8940-
err = client.LatencyReset(ctx, result[0].Name).Err()
8943+
eventName := result[0].Name
8944+
err = client.LatencyReset(ctx, eventName).Err()
89418945
Expect(err).NotTo(HaveOccurred())
89428946

8947+
// Verify the specific event was reset (not that all events are gone)
8948+
// This avoids flakiness from other operations triggering latency events
89438949
result, err = client.Latency(ctx).Result()
89448950
Expect(err).NotTo(HaveOccurred())
8945-
Expect(len(result)).Should(Equal(0))
8951+
for _, event := range result {
8952+
if event.Name == eventName {
8953+
Fail("Event " + eventName + " should have been reset")
8954+
}
8955+
}
89468956
})
89478957
})
89488958
})

osscluster.go

Lines changed: 108 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,29 @@ type ClusterOptions struct {
9191
MinRetryBackoff time.Duration
9292
MaxRetryBackoff time.Duration
9393

94-
DialTimeout time.Duration
95-
ReadTimeout time.Duration
96-
WriteTimeout time.Duration
94+
DialTimeout time.Duration
95+
ReadTimeout time.Duration
96+
WriteTimeout time.Duration
97+
98+
// DialerRetries is the maximum number of retry attempts when dialing fails.
99+
//
100+
// default: 5
101+
DialerRetries int
102+
103+
// DialerRetryTimeout is the backoff duration between retry attempts.
104+
//
105+
// default: 100 milliseconds
106+
DialerRetryTimeout time.Duration
107+
97108
ContextTimeoutEnabled bool
98109

99-
PoolFIFO bool
100-
PoolSize int // applies per cluster node and not for the whole cluster
110+
PoolFIFO bool
111+
PoolSize int // applies per cluster node and not for the whole cluster
112+
113+
// MaxConcurrentDials is the maximum number of concurrent connection creation goroutines.
114+
// If <= 0, defaults to PoolSize. If > PoolSize, it will be capped at PoolSize.
115+
MaxConcurrentDials int
116+
101117
PoolTimeout time.Duration
102118
MinIdleConns int
103119
MaxIdleConns int
@@ -157,7 +173,8 @@ type ClusterOptions struct {
157173
// cluster upgrade notifications gracefully and manage connection/pool state
158174
// transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications.
159175
// If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it.
160-
// The ClusterClient does not directly work with maintnotifications, it is up to the clients in the Nodes map to work with maintnotifications.
176+
// The ClusterClient supports SMIGRATING and SMIGRATED notifications for cluster state management.
177+
// Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.).
161178
MaintNotificationsConfig *maintnotifications.Config
162179
// ShardPicker is used to pick a shard when the request_policy is
163180
// ReqDefault and the command has no keys.
@@ -176,9 +193,24 @@ func (opt *ClusterOptions) init() {
176193
opt.ReadOnly = true
177194
}
178195

196+
if opt.DialTimeout == 0 {
197+
opt.DialTimeout = 5 * time.Second
198+
}
199+
if opt.DialerRetries == 0 {
200+
opt.DialerRetries = 5
201+
}
202+
if opt.DialerRetryTimeout == 0 {
203+
opt.DialerRetryTimeout = 100 * time.Millisecond
204+
}
205+
179206
if opt.PoolSize == 0 {
180207
opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
181208
}
209+
if opt.MaxConcurrentDials <= 0 {
210+
opt.MaxConcurrentDials = opt.PoolSize
211+
} else if opt.MaxConcurrentDials > opt.PoolSize {
212+
opt.MaxConcurrentDials = opt.PoolSize
213+
}
182214
if opt.ReadBufferSize == 0 {
183215
opt.ReadBufferSize = proto.DefaultBufferSize
184216
}
@@ -320,10 +352,13 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
320352
o.MinRetryBackoff = q.duration("min_retry_backoff")
321353
o.MaxRetryBackoff = q.duration("max_retry_backoff")
322354
o.DialTimeout = q.duration("dial_timeout")
355+
o.DialerRetries = q.int("dialer_retries")
356+
o.DialerRetryTimeout = q.duration("dialer_retry_timeout")
323357
o.ReadTimeout = q.duration("read_timeout")
324358
o.WriteTimeout = q.duration("write_timeout")
325359
o.PoolFIFO = q.bool("pool_fifo")
326360
o.PoolSize = q.int("pool_size")
361+
o.MaxConcurrentDials = q.int("max_concurrent_dials")
327362
o.MinIdleConns = q.int("min_idle_conns")
328363
o.MaxIdleConns = q.int("max_idle_conns")
329364
o.MaxActiveConns = q.int("max_active_conns")
@@ -379,21 +414,25 @@ func (opt *ClusterOptions) clientOptions() *Options {
379414
MinRetryBackoff: opt.MinRetryBackoff,
380415
MaxRetryBackoff: opt.MaxRetryBackoff,
381416

382-
DialTimeout: opt.DialTimeout,
383-
ReadTimeout: opt.ReadTimeout,
384-
WriteTimeout: opt.WriteTimeout,
417+
DialTimeout: opt.DialTimeout,
418+
DialerRetries: opt.DialerRetries,
419+
DialerRetryTimeout: opt.DialerRetryTimeout,
420+
ReadTimeout: opt.ReadTimeout,
421+
WriteTimeout: opt.WriteTimeout,
422+
385423
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
386424

387-
PoolFIFO: opt.PoolFIFO,
388-
PoolSize: opt.PoolSize,
389-
PoolTimeout: opt.PoolTimeout,
390-
MinIdleConns: opt.MinIdleConns,
391-
MaxIdleConns: opt.MaxIdleConns,
392-
MaxActiveConns: opt.MaxActiveConns,
393-
ConnMaxIdleTime: opt.ConnMaxIdleTime,
394-
ConnMaxLifetime: opt.ConnMaxLifetime,
395-
ReadBufferSize: opt.ReadBufferSize,
396-
WriteBufferSize: opt.WriteBufferSize,
425+
PoolFIFO: opt.PoolFIFO,
426+
PoolSize: opt.PoolSize,
427+
MaxConcurrentDials: opt.MaxConcurrentDials,
428+
PoolTimeout: opt.PoolTimeout,
429+
MinIdleConns: opt.MinIdleConns,
430+
MaxIdleConns: opt.MaxIdleConns,
431+
MaxActiveConns: opt.MaxActiveConns,
432+
ConnMaxIdleTime: opt.ConnMaxIdleTime,
433+
ConnMaxLifetime: opt.ConnMaxLifetime,
434+
ReadBufferSize: opt.ReadBufferSize,
435+
WriteBufferSize: opt.WriteBufferSize,
397436
DisableIdentity: opt.DisableIdentity,
398437
DisableIndentity: opt.DisableIdentity,
399438
IdentitySuffix: opt.IdentitySuffix,
@@ -984,9 +1023,11 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
9841023
//------------------------------------------------------------------------------
9851024

9861025
type clusterStateHolder struct {
987-
load func(ctx context.Context) (*clusterState, error)
988-
state atomic.Value
989-
reloading uint32 // atomic
1026+
load func(ctx context.Context) (*clusterState, error)
1027+
1028+
state atomic.Value
1029+
reloading uint32 // atomic
1030+
reloadPending uint32 // atomic - set to 1 when reload is requested during active reload
9901031
}
9911032

9921033
func newClusterStateHolder(load func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
@@ -1005,17 +1046,37 @@ func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error)
10051046
}
10061047

10071048
func (c *clusterStateHolder) LazyReload() {
1049+
// If already reloading, mark that another reload is pending
10081050
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
1051+
atomic.StoreUint32(&c.reloadPending, 1)
10091052
return
10101053
}
1054+
10111055
go func() {
1012-
defer atomic.StoreUint32(&c.reloading, 0)
1056+
for {
1057+
_, err := c.Reload(context.Background())
1058+
if err != nil {
1059+
atomic.StoreUint32(&c.reloadPending, 0)
1060+
atomic.StoreUint32(&c.reloading, 0)
1061+
return
1062+
}
10131063

1014-
_, err := c.Reload(context.Background())
1015-
if err != nil {
1016-
return
1064+
// Clear pending flag after reload completes, before cooldown
1065+
// This captures notifications that arrived during the reload
1066+
atomic.StoreUint32(&c.reloadPending, 0)
1067+
1068+
// Wait cooldown period
1069+
time.Sleep(200 * time.Millisecond)
1070+
1071+
// Check if another reload was requested during cooldown
1072+
if atomic.LoadUint32(&c.reloadPending) == 0 {
1073+
// No pending reload, we're done
1074+
atomic.StoreUint32(&c.reloading, 0)
1075+
return
1076+
}
1077+
1078+
// Pending reload requested, loop to reload again
10171079
}
1018-
time.Sleep(200 * time.Millisecond)
10191080
}()
10201081
}
10211082

@@ -1079,6 +1140,26 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
10791140
txPipeline: c.processTxPipeline,
10801141
})
10811142

1143+
// Set up SMIGRATED notification handling for cluster state reload
1144+
// When a node client receives a SMIGRATED notification, it should trigger
1145+
// cluster state reload on the parent ClusterClient
1146+
if opt.MaintNotificationsConfig != nil {
1147+
c.nodes.OnNewNode(func(nodeClient *Client) {
1148+
manager := nodeClient.GetMaintNotificationsManager()
1149+
if manager != nil {
1150+
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
1151+
// Log the migration details for now
1152+
if internal.LogLevel.InfoOrAbove() {
1153+
internal.Logger.Printf(ctx, "cluster: slots %v migrated to %s, reloading cluster state", slotRanges, hostPort)
1154+
}
1155+
// Currently we reload the entire cluster state
1156+
// In the future, this could be optimized to reload only the specific slots
1157+
c.state.LazyReload()
1158+
})
1159+
}
1160+
})
1161+
}
1162+
10821163
return c
10831164
}
10841165

0 commit comments

Comments
 (0)