Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
type Client interface {
GetTopic(string) (*Topic, bool)
IsConnected() bool
Subscribe(string, log.Logger) *Topic
Unsubscribe(string, log.Logger)
Subscribe(string, log.Logger) (*Topic, error)
Unsubscribe(string, log.Logger) error
Dispose()
}

Expand Down Expand Up @@ -120,21 +120,19 @@ func (c *client) GetTopic(reqPath string) (*Topic, bool) {
return c.topics.Load(reqPath)
}

func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic {
func (c *client) Subscribe(reqPath string, logger log.Logger) (*Topic, error) {
// Check if there's already a topic with this exact key (reqPath)
if existingTopic, ok := c.topics.Load(reqPath); ok {
return existingTopic
return existingTopic, nil
}

chunks := strings.Split(reqPath, "/")
if len(chunks) < 2 {
logger.Error("Invalid path", "path", reqPath)
return nil
return nil, backend.DownstreamErrorf("invalid path: %s", reqPath)
}
interval, err := time.ParseDuration(chunks[0])
if err != nil {
logger.Error("Invalid interval", "path", reqPath, "interval", chunks[0])
return nil
return nil, backend.DownstreamErrorf("invalid interval %s: %s", chunks[0], err)
}

// For MQTT subscription, we only need the actual topic path (without streaming key)
Expand All @@ -150,8 +148,7 @@ func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic {

topic, err := decodeTopic(t.Path, logger)
if err != nil {
logger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", backend.DownstreamError(err))
return nil
return nil, backend.DownstreamErrorf("error decoding MQTT topic name %s: %s", t.Path, err)
}

logger.Debug("Subscribing to MQTT topic", "topic", topic)
Expand All @@ -161,37 +158,38 @@ func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic {
// and don't need to regex it against + and #.
c.HandleMessage(topicPath, []byte(m.Payload()))
}); token.Wait() && token.Error() != nil {
logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", backend.DownstreamError(token.Error()))
return nil, backend.DownstreamErrorf("error subscribing to MQTT topic %s: %s", topic, token.Error())
}
// Store the topic using reqPath as the key (which includes streaming key)
c.topics.Map.Store(reqPath, t)
return t
return t, nil
}

func (c *client) Unsubscribe(reqPath string, logger log.Logger) {
func (c *client) Unsubscribe(reqPath string, logger log.Logger) error {
t, ok := c.GetTopic(reqPath)
if !ok {
return
return nil // No error if topic doesn't exist
}
c.topics.Delete(t.Key())

if exists := c.topics.HasSubscription(t.Path); exists {
// There are still other subscriptions to this path,
// so we shouldn't unsubscribe yet.
return
return nil
}

logger.Debug("Unsubscribing from MQTT topic", "topic", t.Path)

topic, err := decodeTopic(t.Path, logger)
if err != nil {
logger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", backend.DownstreamError(err))
return
return backend.DownstreamErrorf("error decoding MQTT topic name %s: %s", t.Path, err)
}

if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
logger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", backend.DownstreamError(token.Error()))
return backend.DownstreamErrorf("error unsubscribing from MQTT topic %s: %s", t.Path, token.Error())
}

return nil
}

func (c *client) Dispose() {
Expand Down
60 changes: 45 additions & 15 deletions pkg/mqtt/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strings"
"testing"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend/log"
)

// Mock client that implements our Client interface directly
Expand All @@ -22,19 +24,19 @@ func (m *mockClient) IsConnected() bool {
return m.connected
}

func (m *mockClient) Subscribe(reqPath string) *Topic {
func (m *mockClient) Subscribe(reqPath string, logger log.Logger) (*Topic, error) {
// Check if already exists
if existingTopic, ok := m.topics.Load(reqPath); ok {
return existingTopic
return existingTopic, nil
}

chunks := strings.Split(reqPath, "/")
if len(chunks) < 2 {
return nil
return nil, nil
}
interval, err := time.ParseDuration(chunks[0])
if err != nil {
return nil
return nil, err
}

topicPath := path.Join(chunks[1:]...)
Expand All @@ -53,11 +55,12 @@ func (m *mockClient) Subscribe(reqPath string) *Topic {

// Store with reqPath as key
m.topics.Map.Store(reqPath, t)
return t
return t, nil
}

func (m *mockClient) Unsubscribe(reqPath string) {
func (m *mockClient) Unsubscribe(reqPath string, logger log.Logger) error {
m.topics.Delete(reqPath)
return nil
}

func (m *mockClient) Dispose() {
Expand Down Expand Up @@ -117,7 +120,10 @@ func TestClient_Subscribe_WithStreamingKey(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
topic := c.Subscribe(tt.reqPath)
topic, err := c.Subscribe(tt.reqPath, log.DefaultLogger)
if err != nil && tt.expectTopic {
t.Fatalf("Subscribe failed: %v", err)
}

if tt.expectTopic {
if topic == nil {
Expand Down Expand Up @@ -149,13 +155,19 @@ func TestClient_Subscribe_Deduplication(t *testing.T) {
reqPath := "1s/dGVzdC90b3BpYw/user1/hash123/org456"

// Subscribe first time
topic1 := c.Subscribe(reqPath)
topic1, err := c.Subscribe(reqPath, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
if topic1 == nil {
t.Fatal("Expected topic to be created")
}

// Subscribe second time - should return same topic
topic2 := c.Subscribe(reqPath)
topic2, err := c.Subscribe(reqPath, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
if topic2 == nil {
t.Fatal("Expected topic to be returned")
}
Expand Down Expand Up @@ -184,9 +196,18 @@ func TestClient_Subscribe_MultipleStreamingKeys(t *testing.T) {
reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"
reqPath3 := "1s/dGVzdC90b3BpYw/user1/hash123/org789"

topic1 := c.Subscribe(reqPath1)
topic2 := c.Subscribe(reqPath2)
topic3 := c.Subscribe(reqPath3)
topic1, err := c.Subscribe(reqPath1, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
topic2, err := c.Subscribe(reqPath2, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
topic3, err := c.Subscribe(reqPath3, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}

if topic1 == nil || topic2 == nil || topic3 == nil {
t.Fatal("Expected all topics to be created")
Expand Down Expand Up @@ -234,7 +255,10 @@ func TestClient_GetTopic(t *testing.T) {
}

// Create topic
topic := c.Subscribe(reqPath)
topic, err := c.Subscribe(reqPath, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
if topic == nil {
t.Fatal("Expected topic to be created")
}
Expand All @@ -257,8 +281,14 @@ func TestClient_MessageHandling_WithStreamingKeys(t *testing.T) {
reqPath1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"

topic1 := c.Subscribe(reqPath1)
topic2 := c.Subscribe(reqPath2)
topic1, err := c.Subscribe(reqPath1, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
topic2, err := c.Subscribe(reqPath2, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}

if topic1 == nil || topic2 == nil {
t.Fatal("Expected both topics to be created")
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ func (c *fakeMQTTClient) IsConnected() bool {
return c.connected
}

func (c *fakeMQTTClient) Subscribe(_ string, _ log.Logger) *mqtt.Topic { return nil }
func (c *fakeMQTTClient) Unsubscribe(_ string, _ log.Logger) {}
func (c *fakeMQTTClient) Dispose() {}
func (c *fakeMQTTClient) Subscribe(_ string, _ log.Logger) (*mqtt.Topic, error) { return nil, nil }
func (c *fakeMQTTClient) Unsubscribe(_ string, _ log.Logger) error { return nil }
func (c *fakeMQTTClient) Dispose() {}
35 changes: 26 additions & 9 deletions pkg/plugin/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/mqtt-datasource/pkg/mqtt"
)

Expand Down Expand Up @@ -118,9 +119,18 @@ func TestStreamingKeyIntegration_ClientSubscription(t *testing.T) {
topicKey3 := "1s/dGVzdC90b3BpYw/user1/hash123/org789"

// Subscribe to all three
topic1 := client.Subscribe(topicKey1)
topic2 := client.Subscribe(topicKey2)
topic3 := client.Subscribe(topicKey3)
topic1, err := client.Subscribe(topicKey1, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
topic2, err := client.Subscribe(topicKey2, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
topic3, err := client.Subscribe(topicKey3, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}

// Verify all topics were created
if topic1 == nil || topic2 == nil || topic3 == nil {
Expand Down Expand Up @@ -163,8 +173,14 @@ func TestStreamingKeyIntegration_MessageIsolation(t *testing.T) {
topicKey1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
topicKey2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"

topic1 := client.Subscribe(topicKey1)
topic2 := client.Subscribe(topicKey2)
topic1, err := client.Subscribe(topicKey1, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
topic2, err := client.Subscribe(topicKey2, log.DefaultLogger)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}

if topic1 == nil || topic2 == nil {
t.Fatal("Expected both topics to be created")
Expand Down Expand Up @@ -219,10 +235,10 @@ func (m *mockMQTTClient) IsConnected() bool {
return true
}

func (m *mockMQTTClient) Subscribe(reqPath string) *mqtt.Topic {
func (m *mockMQTTClient) Subscribe(reqPath string, logger log.Logger) (*mqtt.Topic, error) {
// Check if already exists
if topic, exists := m.topics[reqPath]; exists {
return topic
return topic, nil
}

// Parse the reqPath (simplified version)
Expand All @@ -239,11 +255,12 @@ func (m *mockMQTTClient) Subscribe(reqPath string) *mqtt.Topic {
// Simulate MQTT subscription (would normally decode the topic)
m.subscriptions["test/topic"] = true

return topic
return topic, nil
}

func (m *mockMQTTClient) Unsubscribe(reqPath string) {
func (m *mockMQTTClient) Unsubscribe(reqPath string, logger log.Logger) error {
delete(m.topics, reqPath)
return nil
}

func (m *mockMQTTClient) Dispose() {
Expand Down
11 changes: 9 additions & 2 deletions pkg/plugin/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ func (ds *MQTTDatasource) RunStream(ctx context.Context, req *backend.RunStreamR
return backend.DownstreamErrorf("invalid interval: %s", chunks[0])
}

ds.Client.Subscribe(topicKey, logger)
defer ds.Client.Unsubscribe(topicKey, logger)
_, err = ds.Client.Subscribe(topicKey, logger)
if err != nil {
return err
}
defer func() {
if unsubErr := ds.Client.Unsubscribe(topicKey, logger); unsubErr != nil {
logger.Error("Failed to unsubscribe from MQTT topic", "topicKey", topicKey, "error", unsubErr)
}
}()

ticker := time.NewTicker(interval)

Expand Down
Loading