diff --git a/cspell.config.json b/cspell.config.json index 13e6d40..b91194f 100644 --- a/cspell.config.json +++ b/cspell.config.json @@ -36,6 +36,7 @@ "paho", "jsoniter", "subresource", - "streamingkey" + "streamingkey", + "unsub" ] } diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index ed45b12..3c63d83 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -18,8 +18,8 @@ import ( type Client interface { GetTopic(string) (*Topic, bool) IsConnected() bool - Subscribe(string, log.Logger) *Topic - Unsubscribe(string, log.Logger) + Subscribe(string, log.Logger) (*Topic, error) + Unsubscribe(string, log.Logger) error Dispose() } @@ -120,21 +120,19 @@ func (c *client) GetTopic(reqPath string) (*Topic, bool) { return c.topics.Load(reqPath) } -func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic { +func (c *client) Subscribe(reqPath string, logger log.Logger) (*Topic, error) { // Check if there's already a topic with this exact key (reqPath) if existingTopic, ok := c.topics.Load(reqPath); ok { - return existingTopic + return existingTopic, nil } chunks := strings.Split(reqPath, "/") if len(chunks) < 2 { - logger.Error("Invalid path", "path", reqPath) - return nil + return nil, backend.DownstreamErrorf("invalid path: %s", reqPath) } interval, err := time.ParseDuration(chunks[0]) if err != nil { - logger.Error("Invalid interval", "path", reqPath, "interval", chunks[0]) - return nil + return nil, backend.DownstreamErrorf("invalid interval %s: %s", chunks[0], err) } // For MQTT subscription, we only need the actual topic path (without streaming key) @@ -150,8 +148,7 @@ func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic { topic, err := decodeTopic(t.Path, logger) if err != nil { - logger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", backend.DownstreamError(err)) - return nil + return nil, backend.DownstreamErrorf("error decoding MQTT topic name %s: %s", t.Path, err) } logger.Debug("Subscribing to MQTT topic", "topic", topic) @@ -161,37 +158,38 @@ func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic { // and don't need to regex it against + and #. c.HandleMessage(topicPath, []byte(m.Payload())) }); token.Wait() && token.Error() != nil { - logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", backend.DownstreamError(token.Error())) + return nil, backend.DownstreamErrorf("error subscribing to MQTT topic %s: %s", topic, token.Error()) } // Store the topic using reqPath as the key (which includes streaming key) c.topics.Map.Store(reqPath, t) - return t + return t, nil } -func (c *client) Unsubscribe(reqPath string, logger log.Logger) { +func (c *client) Unsubscribe(reqPath string, logger log.Logger) error { t, ok := c.GetTopic(reqPath) if !ok { - return + return nil // No error if topic doesn't exist } c.topics.Delete(t.Key()) if exists := c.topics.HasSubscription(t.Path); exists { // There are still other subscriptions to this path, // so we shouldn't unsubscribe yet. - return + return nil } logger.Debug("Unsubscribing from MQTT topic", "topic", t.Path) topic, err := decodeTopic(t.Path, logger) if err != nil { - logger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", backend.DownstreamError(err)) - return + return backend.DownstreamErrorf("error decoding MQTT topic name %s: %s", t.Path, err) } if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { - logger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", backend.DownstreamError(token.Error())) + return backend.DownstreamErrorf("error unsubscribing from MQTT topic %s: %s", t.Path, token.Error()) } + + return nil } func (c *client) Dispose() { diff --git a/pkg/mqtt/client_test.go b/pkg/mqtt/client_test.go index 3fa18d2..d55c272 100644 --- a/pkg/mqtt/client_test.go +++ b/pkg/mqtt/client_test.go @@ -5,6 +5,8 @@ import ( "strings" "testing" "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend/log" ) // Mock client that implements our Client interface directly @@ -22,19 +24,19 @@ func (m *mockClient) IsConnected() bool { return m.connected } -func (m *mockClient) Subscribe(reqPath string) *Topic { +func (m *mockClient) Subscribe(reqPath string, logger log.Logger) (*Topic, error) { // Check if already exists if existingTopic, ok := m.topics.Load(reqPath); ok { - return existingTopic + return existingTopic, nil } chunks := strings.Split(reqPath, "/") if len(chunks) < 2 { - return nil + return nil, nil } interval, err := time.ParseDuration(chunks[0]) if err != nil { - return nil + return nil, err } topicPath := path.Join(chunks[1:]...) @@ -53,11 +55,12 @@ func (m *mockClient) Subscribe(reqPath string) *Topic { // Store with reqPath as key m.topics.Map.Store(reqPath, t) - return t + return t, nil } -func (m *mockClient) Unsubscribe(reqPath string) { +func (m *mockClient) Unsubscribe(reqPath string, logger log.Logger) error { m.topics.Delete(reqPath) + return nil } func (m *mockClient) Dispose() { @@ -117,7 +120,10 @@ func TestClient_Subscribe_WithStreamingKey(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topic := c.Subscribe(tt.reqPath) + topic, err := c.Subscribe(tt.reqPath, log.DefaultLogger) + if err != nil && tt.expectTopic { + t.Fatalf("Subscribe failed: %v", err) + } if tt.expectTopic { if topic == nil { @@ -149,13 +155,19 @@ func TestClient_Subscribe_Deduplication(t *testing.T) { reqPath := "1s/dGVzdC90b3BpYw/user1/hash123/org456" // Subscribe first time - topic1 := c.Subscribe(reqPath) + topic1, err := c.Subscribe(reqPath, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } if topic1 == nil { t.Fatal("Expected topic to be created") } // Subscribe second time - should return same topic - topic2 := c.Subscribe(reqPath) + topic2, err := c.Subscribe(reqPath, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } if topic2 == nil { t.Fatal("Expected topic to be returned") } @@ -184,9 +196,18 @@ func TestClient_Subscribe_MultipleStreamingKeys(t *testing.T) { reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456" reqPath3 := "1s/dGVzdC90b3BpYw/user1/hash123/org789" - topic1 := c.Subscribe(reqPath1) - topic2 := c.Subscribe(reqPath2) - topic3 := c.Subscribe(reqPath3) + topic1, err := c.Subscribe(reqPath1, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + topic2, err := c.Subscribe(reqPath2, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + topic3, err := c.Subscribe(reqPath3, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } if topic1 == nil || topic2 == nil || topic3 == nil { t.Fatal("Expected all topics to be created") @@ -234,7 +255,10 @@ func TestClient_GetTopic(t *testing.T) { } // Create topic - topic := c.Subscribe(reqPath) + topic, err := c.Subscribe(reqPath, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } if topic == nil { t.Fatal("Expected topic to be created") } @@ -257,8 +281,14 @@ func TestClient_MessageHandling_WithStreamingKeys(t *testing.T) { reqPath1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456" reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456" - topic1 := c.Subscribe(reqPath1) - topic2 := c.Subscribe(reqPath2) + topic1, err := c.Subscribe(reqPath1, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + topic2, err := c.Subscribe(reqPath2, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } if topic1 == nil || topic2 == nil { t.Fatal("Expected both topics to be created") diff --git a/pkg/plugin/datasource_test.go b/pkg/plugin/datasource_test.go index 329c6e7..6c9fa57 100644 --- a/pkg/plugin/datasource_test.go +++ b/pkg/plugin/datasource_test.go @@ -53,6 +53,6 @@ func (c *fakeMQTTClient) IsConnected() bool { return c.connected } -func (c *fakeMQTTClient) Subscribe(_ string, _ log.Logger) *mqtt.Topic { return nil } -func (c *fakeMQTTClient) Unsubscribe(_ string, _ log.Logger) {} -func (c *fakeMQTTClient) Dispose() {} +func (c *fakeMQTTClient) Subscribe(_ string, _ log.Logger) (*mqtt.Topic, error) { return nil, nil } +func (c *fakeMQTTClient) Unsubscribe(_ string, _ log.Logger) error { return nil } +func (c *fakeMQTTClient) Dispose() {} diff --git a/pkg/plugin/integration_test.go b/pkg/plugin/integration_test.go index c475978..fa2aeff 100644 --- a/pkg/plugin/integration_test.go +++ b/pkg/plugin/integration_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/mqtt-datasource/pkg/mqtt" ) @@ -118,9 +119,18 @@ func TestStreamingKeyIntegration_ClientSubscription(t *testing.T) { topicKey3 := "1s/dGVzdC90b3BpYw/user1/hash123/org789" // Subscribe to all three - topic1 := client.Subscribe(topicKey1) - topic2 := client.Subscribe(topicKey2) - topic3 := client.Subscribe(topicKey3) + topic1, err := client.Subscribe(topicKey1, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + topic2, err := client.Subscribe(topicKey2, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + topic3, err := client.Subscribe(topicKey3, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } // Verify all topics were created if topic1 == nil || topic2 == nil || topic3 == nil { @@ -163,8 +173,14 @@ func TestStreamingKeyIntegration_MessageIsolation(t *testing.T) { topicKey1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456" topicKey2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456" - topic1 := client.Subscribe(topicKey1) - topic2 := client.Subscribe(topicKey2) + topic1, err := client.Subscribe(topicKey1, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + topic2, err := client.Subscribe(topicKey2, log.DefaultLogger) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } if topic1 == nil || topic2 == nil { t.Fatal("Expected both topics to be created") @@ -219,10 +235,10 @@ func (m *mockMQTTClient) IsConnected() bool { return true } -func (m *mockMQTTClient) Subscribe(reqPath string) *mqtt.Topic { +func (m *mockMQTTClient) Subscribe(reqPath string, logger log.Logger) (*mqtt.Topic, error) { // Check if already exists if topic, exists := m.topics[reqPath]; exists { - return topic + return topic, nil } // Parse the reqPath (simplified version) @@ -239,11 +255,12 @@ func (m *mockMQTTClient) Subscribe(reqPath string) *mqtt.Topic { // Simulate MQTT subscription (would normally decode the topic) m.subscriptions["test/topic"] = true - return topic + return topic, nil } -func (m *mockMQTTClient) Unsubscribe(reqPath string) { +func (m *mockMQTTClient) Unsubscribe(reqPath string, logger log.Logger) error { delete(m.topics, reqPath) + return nil } func (m *mockMQTTClient) Dispose() { diff --git a/pkg/plugin/stream.go b/pkg/plugin/stream.go index d916aca..62fe2f7 100644 --- a/pkg/plugin/stream.go +++ b/pkg/plugin/stream.go @@ -30,8 +30,15 @@ func (ds *MQTTDatasource) RunStream(ctx context.Context, req *backend.RunStreamR return backend.DownstreamErrorf("invalid interval: %s", chunks[0]) } - ds.Client.Subscribe(topicKey, logger) - defer ds.Client.Unsubscribe(topicKey, logger) + _, err = ds.Client.Subscribe(topicKey, logger) + if err != nil { + return err + } + defer func() { + if unsubErr := ds.Client.Unsubscribe(topicKey, logger); unsubErr != nil { + logger.Error("Failed to unsubscribe from MQTT topic", "topicKey", topicKey, "error", unsubErr) + } + }() ticker := time.NewTicker(interval)