Skip to content

Commit f3e9126

Browse files
committed
lazy cluster topology reload
1 parent fd437ce commit f3e9126

File tree

3 files changed

+265
-13
lines changed

3 files changed

+265
-13
lines changed

commands_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8905,27 +8905,37 @@ var _ = Describe("Commands", func() {
89058905
const key = "latency-monitor-threshold"
89068906

89078907
old := client.ConfigGet(ctx, key).Val()
8908-
client.ConfigSet(ctx, key, "1")
8908+
// Use a higher threshold (100ms) to avoid capturing normal operations
8909+
// that could cause flakiness due to timing variations
8910+
client.ConfigSet(ctx, key, "100")
89098911
defer client.ConfigSet(ctx, key, old[key])
89108912

89118913
result, err := client.Latency(ctx).Result()
89128914
Expect(err).NotTo(HaveOccurred())
89138915
Expect(len(result)).Should(Equal(0))
89148916

8915-
err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err()
8917+
// Use a longer sleep (150ms) to ensure it exceeds the 100ms threshold
8918+
err = client.Do(ctx, "DEBUG", "SLEEP", 0.15).Err()
89168919
Expect(err).NotTo(HaveOccurred())
89178920

89188921
result, err = client.Latency(ctx).Result()
89198922
Expect(err).NotTo(HaveOccurred())
8920-
Expect(len(result)).Should(Equal(1))
8923+
Expect(len(result)).Should(BeNumerically(">=", 1))
89218924

89228925
// reset latency by event name
8923-
err = client.LatencyReset(ctx, result[0].Name).Err()
8926+
eventName := result[0].Name
8927+
err = client.LatencyReset(ctx, eventName).Err()
89248928
Expect(err).NotTo(HaveOccurred())
89258929

8930+
// Verify the specific event was reset (not that all events are gone)
8931+
// This avoids flakiness from other operations triggering latency events
89268932
result, err = client.Latency(ctx).Result()
89278933
Expect(err).NotTo(HaveOccurred())
8928-
Expect(len(result)).Should(Equal(0))
8934+
for _, event := range result {
8935+
if event.Name == eventName {
8936+
Fail("Event " + eventName + " should have been reset")
8937+
}
8938+
}
89298939
})
89308940
})
89318941
})

osscluster.go

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ type ClusterOptions struct {
146146
// cluster upgrade notifications gracefully and manage connection/pool state
147147
// transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications.
148148
// If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it.
149-
// The ClusterClient does not directly work with maintnotifications, it is up to the clients in the Nodes map to work with maintnotifications.
149+
// The ClusterClient supports SMIGRATING and SMIGRATED notifications for cluster state management.
150+
// Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.).
150151
MaintNotificationsConfig *maintnotifications.Config
151152
}
152153

@@ -945,8 +946,9 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
945946
type clusterStateHolder struct {
946947
load func(ctx context.Context) (*clusterState, error)
947948

948-
state atomic.Value
949-
reloading uint32 // atomic
949+
state atomic.Value
950+
reloading uint32 // atomic
951+
reloadPending uint32 // atomic - set to 1 when reload is requested during active reload
950952
}
951953

952954
func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
@@ -965,17 +967,36 @@ func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error)
965967
}
966968

967969
func (c *clusterStateHolder) LazyReload() {
970+
// If already reloading, mark that another reload is pending
968971
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
972+
atomic.StoreUint32(&c.reloadPending, 1)
969973
return
970974
}
975+
971976
go func() {
972-
defer atomic.StoreUint32(&c.reloading, 0)
977+
for {
978+
_, err := c.Reload(context.Background())
979+
if err != nil {
980+
atomic.StoreUint32(&c.reloading, 0)
981+
return
982+
}
973983

974-
_, err := c.Reload(context.Background())
975-
if err != nil {
976-
return
984+
// Clear pending flag after reload completes, before cooldown
985+
// This captures notifications that arrived during the reload
986+
atomic.StoreUint32(&c.reloadPending, 0)
987+
988+
// Wait cooldown period
989+
time.Sleep(200 * time.Millisecond)
990+
991+
// Check if another reload was requested during cooldown
992+
if atomic.LoadUint32(&c.reloadPending) == 0 {
993+
// No pending reload, we're done
994+
atomic.StoreUint32(&c.reloading, 0)
995+
return
996+
}
997+
998+
// Pending reload requested, loop to reload again
977999
}
978-
time.Sleep(200 * time.Millisecond)
9791000
}()
9801001
}
9811002

@@ -1038,6 +1059,26 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
10381059
txPipeline: c.processTxPipeline,
10391060
})
10401061

1062+
// Set up SMIGRATED notification handling for cluster state reload
1063+
// When a node client receives a SMIGRATED notification, it should trigger
1064+
// cluster state reload on the parent ClusterClient
1065+
if opt.MaintNotificationsConfig != nil {
1066+
c.nodes.OnNewNode(func(nodeClient *Client) {
1067+
manager := nodeClient.GetMaintNotificationsManager()
1068+
if manager != nil {
1069+
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
1070+
// Log the migration details for now
1071+
if internal.LogLevel.InfoOrAbove() {
1072+
internal.Logger.Printf(ctx, "cluster: slots %v migrated to %s, reloading cluster state", slotRanges, hostPort)
1073+
}
1074+
// Currently we reload the entire cluster state
1075+
// In the future, this could be optimized to reload only the specific slots
1076+
c.state.LazyReload()
1077+
})
1078+
}
1079+
})
1080+
}
1081+
10411082
return c
10421083
}
10431084

osscluster_lazy_reload_test.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
)
9+
10+
// TestLazyReloadQueueBehavior tests that LazyReload properly queues reload requests
11+
func TestLazyReloadQueueBehavior(t *testing.T) {
12+
t.Run("SingleReload", func(t *testing.T) {
13+
var reloadCount atomic.Int32
14+
15+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
16+
reloadCount.Add(1)
17+
time.Sleep(50 * time.Millisecond) // Simulate reload work
18+
return &clusterState{}, nil
19+
})
20+
21+
// Trigger one reload
22+
holder.LazyReload()
23+
24+
// Wait for reload to complete
25+
time.Sleep(300 * time.Millisecond)
26+
27+
if count := reloadCount.Load(); count != 1 {
28+
t.Errorf("Expected 1 reload, got %d", count)
29+
}
30+
})
31+
32+
t.Run("ConcurrentReloadsDeduplication", func(t *testing.T) {
33+
var reloadCount atomic.Int32
34+
35+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
36+
reloadCount.Add(1)
37+
time.Sleep(50 * time.Millisecond) // Simulate reload work
38+
return &clusterState{}, nil
39+
})
40+
41+
// Trigger multiple reloads concurrently
42+
for i := 0; i < 10; i++ {
43+
go holder.LazyReload()
44+
}
45+
46+
// Wait for all to complete
47+
time.Sleep(100 * time.Millisecond)
48+
49+
// Should only reload once (all concurrent calls deduplicated)
50+
if count := reloadCount.Load(); count != 1 {
51+
t.Errorf("Expected 1 reload (deduplication), got %d", count)
52+
}
53+
})
54+
55+
t.Run("PendingReloadDuringCooldown", func(t *testing.T) {
56+
var reloadCount atomic.Int32
57+
58+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
59+
reloadCount.Add(1)
60+
time.Sleep(10 * time.Millisecond) // Simulate reload work
61+
return &clusterState{}, nil
62+
})
63+
64+
// Trigger first reload
65+
holder.LazyReload()
66+
67+
// Wait for reload to complete but still in cooldown
68+
time.Sleep(50 * time.Millisecond)
69+
70+
// Trigger second reload during cooldown period
71+
holder.LazyReload()
72+
73+
// Wait for second reload to complete
74+
time.Sleep(300 * time.Millisecond)
75+
76+
// Should have reloaded twice (second request queued and executed)
77+
if count := reloadCount.Load(); count != 2 {
78+
t.Errorf("Expected 2 reloads (queued during cooldown), got %d", count)
79+
}
80+
})
81+
82+
t.Run("MultiplePendingReloadsCollapsed", func(t *testing.T) {
83+
var reloadCount atomic.Int32
84+
85+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
86+
reloadCount.Add(1)
87+
time.Sleep(10 * time.Millisecond) // Simulate reload work
88+
return &clusterState{}, nil
89+
})
90+
91+
// Trigger first reload
92+
holder.LazyReload()
93+
94+
// Wait for reload to start
95+
time.Sleep(5 * time.Millisecond)
96+
97+
// Trigger multiple reloads during active reload + cooldown
98+
for i := 0; i < 10; i++ {
99+
holder.LazyReload()
100+
time.Sleep(5 * time.Millisecond)
101+
}
102+
103+
// Wait for all to complete
104+
time.Sleep(400 * time.Millisecond)
105+
106+
// Should have reloaded exactly twice:
107+
// 1. Initial reload
108+
// 2. One more reload for all the pending requests (collapsed into one)
109+
if count := reloadCount.Load(); count != 2 {
110+
t.Errorf("Expected 2 reloads (initial + collapsed pending), got %d", count)
111+
}
112+
})
113+
114+
t.Run("ReloadAfterCooldownPeriod", func(t *testing.T) {
115+
var reloadCount atomic.Int32
116+
117+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
118+
reloadCount.Add(1)
119+
time.Sleep(10 * time.Millisecond) // Simulate reload work
120+
return &clusterState{}, nil
121+
})
122+
123+
// Trigger first reload
124+
holder.LazyReload()
125+
126+
// Wait for reload + cooldown to complete
127+
time.Sleep(300 * time.Millisecond)
128+
129+
// Trigger second reload after cooldown
130+
holder.LazyReload()
131+
132+
// Wait for second reload to complete
133+
time.Sleep(300 * time.Millisecond)
134+
135+
// Should have reloaded twice (separate reload cycles)
136+
if count := reloadCount.Load(); count != 2 {
137+
t.Errorf("Expected 2 reloads (separate cycles), got %d", count)
138+
}
139+
})
140+
141+
t.Run("ErrorDuringReload", func(t *testing.T) {
142+
var reloadCount atomic.Int32
143+
var shouldFail atomic.Bool
144+
shouldFail.Store(true)
145+
146+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
147+
reloadCount.Add(1)
148+
if shouldFail.Load() {
149+
return nil, context.DeadlineExceeded
150+
}
151+
return &clusterState{}, nil
152+
})
153+
154+
// Trigger reload that will fail
155+
holder.LazyReload()
156+
157+
// Wait for failed reload
158+
time.Sleep(50 * time.Millisecond)
159+
160+
// Trigger another reload (should succeed now)
161+
shouldFail.Store(false)
162+
holder.LazyReload()
163+
164+
// Wait for successful reload
165+
time.Sleep(300 * time.Millisecond)
166+
167+
// Should have attempted reload twice (first failed, second succeeded)
168+
if count := reloadCount.Load(); count != 2 {
169+
t.Errorf("Expected 2 reload attempts, got %d", count)
170+
}
171+
})
172+
173+
t.Run("CascadingSMigratedScenario", func(t *testing.T) {
174+
// Simulate the real-world scenario: multiple SMIGRATED notifications
175+
// arriving in quick succession from different node clients
176+
var reloadCount atomic.Int32
177+
178+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
179+
reloadCount.Add(1)
180+
time.Sleep(20 * time.Millisecond) // Simulate realistic reload time
181+
return &clusterState{}, nil
182+
})
183+
184+
// Simulate 5 SMIGRATED notifications arriving within 100ms
185+
for i := 0; i < 5; i++ {
186+
go holder.LazyReload()
187+
time.Sleep(20 * time.Millisecond)
188+
}
189+
190+
// Wait for all reloads to complete
191+
time.Sleep(500 * time.Millisecond)
192+
193+
// Should reload at most 2 times:
194+
// 1. First notification triggers reload
195+
// 2. Notifications 2-5 collapse into one pending reload
196+
count := reloadCount.Load()
197+
if count < 1 || count > 2 {
198+
t.Errorf("Expected 1-2 reloads for cascading scenario, got %d", count)
199+
}
200+
})
201+
}

0 commit comments

Comments
 (0)