Skip to content

Commit 1ba1ea2

Browse files
authored
Add streaming key to the requests (#135)
### What is this? In this PR each request has a unique streaming key with `orgId` to identify the organizations. So different organizations will have separate streams. On the frontend we generate and send that key and on the backend we use it to store the topic and use orgID provided. If the orgID is different than the plugin configuration this will return an error and subscribing to the stream will be failed. The idea was taken from - https://github.com/grafana/falconlogscale-datasource/blob/02a48c3224b139423a190583939a80cb80b8b7da/src/streaming.ts#L9 - https://github.com/grafana/falconlogscale-datasource/blob/02a48c3224b139423a190583939a80cb80b8b7da/pkg/plugin/streaming.go#L17-L62 ### How to test? - In mqtt repository - Run `mage -v` - Run `yarn dev` - Run `yarn broker` - In grafana repository - Run grafana as usual - Create an mqtt datasource - Run query for `millisecond/1000` and see in developer tools it's sending a streaming key with orgID.
1 parent 70c7f8a commit 1ba1ea2

26 files changed

+1267
-269
lines changed

cspell.config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"Millis",
3636
"paho",
3737
"jsoniter",
38-
"subresource"
38+
"subresource",
39+
"streamingkey"
3940
]
4041
}

pkg/mqtt/client.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ func (c *client) GetTopic(reqPath string) (*Topic, bool) {
118118
}
119119

120120
func (c *client) Subscribe(reqPath string) *Topic {
121+
// Check if there's already a topic with this exact key (reqPath)
122+
if existingTopic, ok := c.topics.Load(reqPath); ok {
123+
return existingTopic
124+
}
125+
121126
chunks := strings.Split(reqPath, "/")
122127
if len(chunks) < 2 {
123128
log.DefaultLogger.Error("Invalid path", "path", reqPath)
@@ -129,14 +134,16 @@ func (c *client) Subscribe(reqPath string) *Topic {
129134
return nil
130135
}
131136

137+
// For MQTT subscription, we only need the actual topic path (without streaming key)
138+
// The streaming key is used for topic uniqueness in storage, but MQTT only cares about the topic path
132139
topicPath := path.Join(chunks[1:]...)
140+
141+
// Create topic with the reqPath as the key for storage
142+
// The actual topic components will be parsed when needed
133143
t := &Topic{
134144
Path: topicPath,
135145
Interval: interval,
136146
}
137-
if t, ok := c.topics.Load(topicPath); ok {
138-
return t
139-
}
140147

141148
topic, err := decodeTopic(t.Path)
142149
if err != nil {
@@ -153,7 +160,8 @@ func (c *client) Subscribe(reqPath string) *Topic {
153160
}); token.Wait() && token.Error() != nil {
154161
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
155162
}
156-
c.topics.Store(t)
163+
// Store the topic using reqPath as the key (which includes streaming key)
164+
c.topics.Map.Store(reqPath, t)
157165
return t
158166
}
159167

pkg/mqtt/client_test.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package mqtt
2+
3+
import (
4+
"path"
5+
"strings"
6+
"testing"
7+
"time"
8+
)
9+
10+
// Mock client that implements our Client interface directly
11+
type mockClient struct {
12+
topics TopicMap
13+
subscriptions map[string]bool
14+
connected bool
15+
}
16+
17+
func (m *mockClient) GetTopic(reqPath string) (*Topic, bool) {
18+
return m.topics.Load(reqPath)
19+
}
20+
21+
func (m *mockClient) IsConnected() bool {
22+
return m.connected
23+
}
24+
25+
func (m *mockClient) Subscribe(reqPath string) *Topic {
26+
// Check if already exists
27+
if existingTopic, ok := m.topics.Load(reqPath); ok {
28+
return existingTopic
29+
}
30+
31+
chunks := strings.Split(reqPath, "/")
32+
if len(chunks) < 2 {
33+
return nil
34+
}
35+
interval, err := time.ParseDuration(chunks[0])
36+
if err != nil {
37+
return nil
38+
}
39+
40+
topicPath := path.Join(chunks[1:]...)
41+
t := &Topic{
42+
Path: topicPath,
43+
Interval: interval,
44+
Messages: []Message{},
45+
}
46+
47+
// Track MQTT subscription (simplified for testing)
48+
if m.subscriptions == nil {
49+
m.subscriptions = make(map[string]bool)
50+
}
51+
// For testing, assume we decode the topic properly
52+
m.subscriptions["test/topic"] = true
53+
54+
// Store with reqPath as key
55+
m.topics.Map.Store(reqPath, t)
56+
return t
57+
}
58+
59+
func (m *mockClient) Unsubscribe(reqPath string) {
60+
m.topics.Delete(reqPath)
61+
}
62+
63+
func (m *mockClient) Dispose() {
64+
// Clear all topics and subscriptions
65+
m.topics = TopicMap{}
66+
m.subscriptions = make(map[string]bool)
67+
}
68+
69+
func (m *mockClient) HandleMessage(topicPath string, payload []byte) {
70+
message := Message{
71+
Timestamp: time.Now(),
72+
Value: payload,
73+
}
74+
m.topics.AddMessage(topicPath, message)
75+
}
76+
77+
func newMockClient() *mockClient {
78+
return &mockClient{
79+
topics: TopicMap{},
80+
subscriptions: make(map[string]bool),
81+
connected: true,
82+
}
83+
}
84+
85+
func TestClient_Subscribe_WithStreamingKey(t *testing.T) {
86+
c := newMockClient()
87+
88+
tests := []struct {
89+
name string
90+
reqPath string
91+
expectTopic bool
92+
expectedPath string
93+
}{
94+
{
95+
name: "subscribe with streaming key",
96+
reqPath: "1s/dGVzdC90b3BpYw/user1/hash123/org456",
97+
expectTopic: true,
98+
expectedPath: "dGVzdC90b3BpYw/user1/hash123/org456",
99+
},
100+
{
101+
name: "subscribe without streaming key",
102+
reqPath: "5s/dGVzdC90b3BpYw",
103+
expectTopic: true,
104+
expectedPath: "dGVzdC90b3BpYw",
105+
},
106+
{
107+
name: "invalid path - no interval",
108+
reqPath: "invalid",
109+
expectTopic: false,
110+
},
111+
{
112+
name: "invalid interval",
113+
reqPath: "invalid-interval/dGVzdC90b3BpYw",
114+
expectTopic: false,
115+
},
116+
}
117+
118+
for _, tt := range tests {
119+
t.Run(tt.name, func(t *testing.T) {
120+
topic := c.Subscribe(tt.reqPath)
121+
122+
if tt.expectTopic {
123+
if topic == nil {
124+
t.Errorf("Expected topic to be created, but got nil")
125+
return
126+
}
127+
128+
// Verify topic is stored with the correct key
129+
storedTopic, found := c.topics.Load(tt.reqPath)
130+
if !found {
131+
t.Errorf("Expected topic to be stored with key %s", tt.reqPath)
132+
}
133+
134+
if storedTopic.Path != tt.expectedPath {
135+
t.Errorf("Expected topic path %s, got %s", tt.expectedPath, storedTopic.Path)
136+
}
137+
} else {
138+
if topic != nil {
139+
t.Errorf("Expected nil topic for invalid input, but got %v", topic)
140+
}
141+
}
142+
})
143+
}
144+
}
145+
146+
func TestClient_Subscribe_Deduplication(t *testing.T) {
147+
c := newMockClient()
148+
149+
reqPath := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
150+
151+
// Subscribe first time
152+
topic1 := c.Subscribe(reqPath)
153+
if topic1 == nil {
154+
t.Fatal("Expected topic to be created")
155+
}
156+
157+
// Subscribe second time - should return same topic
158+
topic2 := c.Subscribe(reqPath)
159+
if topic2 == nil {
160+
t.Fatal("Expected topic to be returned")
161+
}
162+
163+
if topic1 != topic2 {
164+
t.Error("Expected same topic instance for duplicate subscription")
165+
}
166+
167+
// Verify only one topic is stored
168+
count := 0
169+
c.topics.Map.Range(func(key, value interface{}) bool {
170+
count++
171+
return true
172+
})
173+
174+
if count != 1 {
175+
t.Errorf("Expected 1 stored topic, got %d", count)
176+
}
177+
}
178+
179+
func TestClient_Subscribe_MultipleStreamingKeys(t *testing.T) {
180+
c := newMockClient()
181+
182+
// Same MQTT topic, same interval, different streaming keys
183+
reqPath1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
184+
reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"
185+
reqPath3 := "1s/dGVzdC90b3BpYw/user1/hash123/org789"
186+
187+
topic1 := c.Subscribe(reqPath1)
188+
topic2 := c.Subscribe(reqPath2)
189+
topic3 := c.Subscribe(reqPath3)
190+
191+
if topic1 == nil || topic2 == nil || topic3 == nil {
192+
t.Fatal("Expected all topics to be created")
193+
}
194+
195+
// All should be different instances
196+
if topic1 == topic2 || topic1 == topic3 || topic2 == topic3 {
197+
t.Error("Expected different topic instances for different streaming keys")
198+
}
199+
200+
// Verify all three topics are stored separately
201+
count := 0
202+
c.topics.Map.Range(func(key, value interface{}) bool {
203+
count++
204+
return true
205+
})
206+
207+
if count != 3 {
208+
t.Errorf("Expected 3 stored topics, got %d", count)
209+
}
210+
211+
// Verify each can be retrieved by its specific key
212+
storedTopic1, found1 := c.GetTopic(reqPath1)
213+
storedTopic2, found2 := c.GetTopic(reqPath2)
214+
storedTopic3, found3 := c.GetTopic(reqPath3)
215+
216+
if !found1 || !found2 || !found3 {
217+
t.Error("Expected all topics to be retrievable by their keys")
218+
}
219+
220+
if storedTopic1 == storedTopic2 || storedTopic1 == storedTopic3 || storedTopic2 == storedTopic3 {
221+
t.Error("Expected retrieved topics to be different instances")
222+
}
223+
}
224+
225+
func TestClient_GetTopic(t *testing.T) {
226+
c := newMockClient()
227+
228+
reqPath := "2s/dGVzdC90b3BpYw/streaming/key/123"
229+
230+
// Topic doesn't exist yet
231+
_, found := c.GetTopic(reqPath)
232+
if found {
233+
t.Error("Expected topic not to be found initially")
234+
}
235+
236+
// Create topic
237+
topic := c.Subscribe(reqPath)
238+
if topic == nil {
239+
t.Fatal("Expected topic to be created")
240+
}
241+
242+
// Now it should be found
243+
retrievedTopic, found := c.GetTopic(reqPath)
244+
if !found {
245+
t.Error("Expected topic to be found after subscription")
246+
}
247+
248+
if retrievedTopic != topic {
249+
t.Error("Expected retrieved topic to be the same instance")
250+
}
251+
}
252+
253+
func TestClient_MessageHandling_WithStreamingKeys(t *testing.T) {
254+
c := newMockClient()
255+
256+
// Create topics with same MQTT path but different streaming keys
257+
reqPath1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
258+
reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"
259+
260+
topic1 := c.Subscribe(reqPath1)
261+
topic2 := c.Subscribe(reqPath2)
262+
263+
if topic1 == nil || topic2 == nil {
264+
t.Fatal("Expected both topics to be created")
265+
}
266+
267+
// Simulate MQTT message arrival
268+
mqttTopicPath := "dGVzdC90b3BpYw/user1/hash123/org456" // This is what HandleMessage receives
269+
c.HandleMessage(mqttTopicPath, []byte("test message"))
270+
271+
// Check that only the matching topic received the message
272+
updatedTopic1, _ := c.GetTopic(reqPath1)
273+
updatedTopic2, _ := c.GetTopic(reqPath2)
274+
275+
if len(updatedTopic1.Messages) != 1 {
276+
t.Errorf("Expected 1 message in topic1, got %d", len(updatedTopic1.Messages))
277+
}
278+
if len(updatedTopic2.Messages) != 0 {
279+
t.Errorf("Expected 0 messages in topic2, got %d", len(updatedTopic2.Messages))
280+
}
281+
}

pkg/mqtt/testdata/array.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// | Labels: | Labels: |
99
// | Type: []time.Time | Type: []json.RawMessage |
1010
// +-------------------------------+-------------------------+
11-
// | 1969-12-31 19:00:00 -0500 EST | [1,2,3] |
11+
// | 1970-01-01 02:00:00 +0200 EET | [1,2,3] |
1212
// +-------------------------------+-------------------------+
1313
//
1414
//

pkg/mqtt/testdata/bool.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// | Labels: | Labels: |
99
// | Type: []time.Time | Type: []*bool |
1010
// +-------------------------------+---------------+
11-
// | 1969-12-31 19:00:00 -0500 EST | true |
11+
// | 1970-01-01 02:00:00 +0200 EET | true |
1212
// +-------------------------------+---------------+
1313
//
1414
//

pkg/mqtt/testdata/float.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// | Labels: | Labels: |
99
// | Type: []time.Time | Type: []*float64 |
1010
// +-------------------------------+------------------+
11-
// | 1969-12-31 19:00:00 -0500 EST | 123.456 |
11+
// | 1970-01-01 02:00:00 +0200 EET | 123.456 |
1212
// +-------------------------------+------------------+
1313
//
1414
//

pkg/mqtt/testdata/int.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// | Labels: | Labels: |
99
// | Type: []time.Time | Type: []*float64 |
1010
// +-------------------------------+------------------+
11-
// | 1969-12-31 19:00:00 -0500 EST | 123 |
11+
// | 1970-01-01 02:00:00 +0200 EET | 123 |
1212
// +-------------------------------+------------------+
1313
//
1414
//

pkg/mqtt/testdata/nested-object.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// | Labels: | Labels: | Labels: |
99
// | Type: []time.Time | Type: []*float64 | Type: []json.RawMessage |
1010
// +-------------------------------+------------------+-------------------------+
11-
// | 1969-12-31 19:00:00 -0500 EST | 1 | {"c":[1,2,3]} |
11+
// | 1970-01-01 02:00:00 +0200 EET | 1 | {"c":[1,2,3]} |
1212
// +-------------------------------+------------------+-------------------------+
1313
//
1414
//

pkg/mqtt/testdata/null.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// | Labels: |
99
// | Type: []time.Time |
1010
// +-------------------------------+
11-
// | 1969-12-31 19:00:00 -0500 EST |
11+
// | 1970-01-01 02:00:00 +0200 EET |
1212
// +-------------------------------+
1313
//
1414
//

pkg/mqtt/testdata/object-changing-type.jsonc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
// | Labels: | Labels: | Labels: |
99
// | Type: []time.Time | Type: []*float64 | Type: []*float64 |
1010
// +-------------------------------+------------------+------------------+
11-
// | 1969-12-31 19:00:00 -0500 EST | 1 | 2 |
12-
// | 1969-12-31 19:01:00 -0500 EST | null | null |
13-
// | 1969-12-31 19:02:00 -0500 EST | 3 | 4 |
11+
// | 1970-01-01 02:00:00 +0200 EET | 1 | 2 |
12+
// | 1970-01-01 02:01:00 +0200 EET | null | null |
13+
// | 1970-01-01 02:02:00 +0200 EET | 3 | 4 |
1414
// +-------------------------------+------------------+------------------+
1515
//
1616
//

0 commit comments

Comments
 (0)