Skip to content

Commit 8b7fab2

Browse files
authored
Add error source and use context logger (#139)
This PR adds error source to errors and also changes the logger to context logger. Fixes grafana/oss-big-tent-squad#78
1 parent 8d6c415 commit 8b7fab2

File tree

9 files changed

+67
-71
lines changed

9 files changed

+67
-71
lines changed

pkg/mqtt/client.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package mqtt
22

33
import (
4+
"context"
45
"crypto/tls"
56
"crypto/x509"
67
"fmt"
@@ -10,14 +11,15 @@ import (
1011
"time"
1112

1213
paho "github.com/eclipse/paho.mqtt.golang"
14+
"github.com/grafana/grafana-plugin-sdk-go/backend"
1315
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1416
)
1517

1618
type Client interface {
1719
GetTopic(string) (*Topic, bool)
1820
IsConnected() bool
19-
Subscribe(string) *Topic
20-
Unsubscribe(string)
21+
Subscribe(string, log.Logger) *Topic
22+
Unsubscribe(string, log.Logger)
2123
Dispose()
2224
}
2325

@@ -37,7 +39,8 @@ type client struct {
3739
topics TopicMap
3840
}
3941

40-
func NewClient(o Options) (Client, error) {
42+
func NewClient(ctx context.Context, o Options) (Client, error) {
43+
logger := log.DefaultLogger.FromContext(ctx)
4144
opts := paho.NewClientOptions()
4245

4346
opts.AddBroker(o.URI)
@@ -63,7 +66,7 @@ func NewClient(o Options) (Client, error) {
6366
if o.TLSClientCert != "" || o.TLSClientKey != "" {
6467
cert, err := tls.X509KeyPair([]byte(o.TLSClientCert), []byte(o.TLSClientKey))
6568
if err != nil {
66-
return nil, fmt.Errorf("failed to setup TLSClientCert: %w", err)
69+
return nil, backend.DownstreamErrorf("failed to setup TLSClientCert: %w", err)
6770
}
6871

6972
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
@@ -82,17 +85,17 @@ func NewClient(o Options) (Client, error) {
8285
opts.SetCleanSession(false)
8386
opts.SetMaxReconnectInterval(10 * time.Second)
8487
opts.SetConnectionLostHandler(func(c paho.Client, err error) {
85-
log.DefaultLogger.Error("MQTT Connection lost", "error", err)
88+
logger.Warn("MQTT Connection lost", "error", err)
8689
})
8790
opts.SetReconnectingHandler(func(c paho.Client, options *paho.ClientOptions) {
88-
log.DefaultLogger.Debug("MQTT Reconnecting")
91+
logger.Debug("MQTT Reconnecting")
8992
})
9093

91-
log.DefaultLogger.Info("MQTT Connecting", "clientID", clientID)
94+
logger.Info("MQTT Connecting", "clientID", clientID)
9295

9396
pahoClient := paho.NewClient(opts)
9497
if token := pahoClient.Connect(); token.Wait() && token.Error() != nil {
95-
return nil, fmt.Errorf("error connecting to MQTT broker: %s", token.Error())
98+
return nil, backend.DownstreamErrorf("error connecting to MQTT broker: %s", token.Error())
9699
}
97100

98101
return &client{
@@ -117,20 +120,20 @@ func (c *client) GetTopic(reqPath string) (*Topic, bool) {
117120
return c.topics.Load(reqPath)
118121
}
119122

120-
func (c *client) Subscribe(reqPath string) *Topic {
123+
func (c *client) Subscribe(reqPath string, logger log.Logger) *Topic {
121124
// Check if there's already a topic with this exact key (reqPath)
122125
if existingTopic, ok := c.topics.Load(reqPath); ok {
123126
return existingTopic
124127
}
125128

126129
chunks := strings.Split(reqPath, "/")
127130
if len(chunks) < 2 {
128-
log.DefaultLogger.Error("Invalid path", "path", reqPath)
131+
logger.Error("Invalid path", "path", reqPath)
129132
return nil
130133
}
131134
interval, err := time.ParseDuration(chunks[0])
132135
if err != nil {
133-
log.DefaultLogger.Error("Invalid interval", "path", reqPath, "interval", chunks[0])
136+
logger.Error("Invalid interval", "path", reqPath, "interval", chunks[0])
134137
return nil
135138
}
136139

@@ -145,27 +148,27 @@ func (c *client) Subscribe(reqPath string) *Topic {
145148
Interval: interval,
146149
}
147150

148-
topic, err := decodeTopic(t.Path)
151+
topic, err := decodeTopic(t.Path, logger)
149152
if err != nil {
150-
log.DefaultLogger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", err)
153+
logger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", backend.DownstreamError(err))
151154
return nil
152155
}
153156

154-
log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topic)
157+
logger.Debug("Subscribing to MQTT topic", "topic", topic)
155158

156159
if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) {
157160
// by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic
158161
// and don't need to regex it against + and #.
159162
c.HandleMessage(topicPath, []byte(m.Payload()))
160163
}); token.Wait() && token.Error() != nil {
161-
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
164+
logger.Error("Error subscribing to MQTT topic", "topic", topic, "error", backend.DownstreamError(token.Error()))
162165
}
163166
// Store the topic using reqPath as the key (which includes streaming key)
164167
c.topics.Map.Store(reqPath, t)
165168
return t
166169
}
167170

168-
func (c *client) Unsubscribe(reqPath string) {
171+
func (c *client) Unsubscribe(reqPath string, logger log.Logger) {
169172
t, ok := c.GetTopic(reqPath)
170173
if !ok {
171174
return
@@ -178,16 +181,16 @@ func (c *client) Unsubscribe(reqPath string) {
178181
return
179182
}
180183

181-
log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path)
184+
logger.Debug("Unsubscribing from MQTT topic", "topic", t.Path)
182185

183-
topic, err := decodeTopic(t.Path)
186+
topic, err := decodeTopic(t.Path, logger)
184187
if err != nil {
185-
log.DefaultLogger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", err)
188+
logger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", backend.DownstreamError(err))
186189
return
187190
}
188191

189192
if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
190-
log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error())
193+
logger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", backend.DownstreamError(token.Error()))
191194
}
192195
}
193196

pkg/mqtt/framer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type framer struct {
1717
fieldMap map[string]int
1818
}
1919

20-
func (df *framer) next() error {
20+
func (df *framer) next(logger log.Logger) error {
2121
switch df.iterator.WhatIsNext() {
2222
case jsoniter.StringValue:
2323
v := df.iterator.ReadString()
@@ -29,7 +29,7 @@ func (df *framer) next() error {
2929
v := df.iterator.ReadBool()
3030
df.addValue(data.FieldTypeNullableBool, &v)
3131
case jsoniter.NilValue:
32-
df.addNil()
32+
df.addNil(logger)
3333
df.iterator.ReadNil()
3434
case jsoniter.ArrayValue:
3535
df.addValue(data.FieldTypeJSON, json.RawMessage(df.iterator.SkipAndReturnBytes()))
@@ -42,7 +42,7 @@ func (df *framer) next() error {
4242
for fname := df.iterator.ReadObject(); fname != ""; fname = df.iterator.ReadObject() {
4343
if size == 0 {
4444
df.path = append(df.path, fname)
45-
if err := df.next(); err != nil {
45+
if err := df.next(logger); err != nil {
4646
return err
4747
}
4848
}
@@ -61,12 +61,12 @@ func (df *framer) key() string {
6161
return strings.Join(df.path, "")
6262
}
6363

64-
func (df *framer) addNil() {
64+
func (df *framer) addNil(logger log.Logger) {
6565
if idx, ok := df.fieldMap[df.key()]; ok {
6666
df.fields[idx].Set(0, nil)
6767
return
6868
}
69-
log.DefaultLogger.Debug("nil value for unknown field", "key", df.key())
69+
logger.Debug("nil value for unknown field", "key", df.key())
7070
}
7171

7272
func (df *framer) addValue(fieldType data.FieldType, v interface{}) {
@@ -96,7 +96,7 @@ func newFramer() *framer {
9696
return df
9797
}
9898

99-
func (df *framer) toFrame(messages []Message) (*data.Frame, error) {
99+
func (df *framer) toFrame(messages []Message, logger log.Logger) (*data.Frame, error) {
100100
// clear the data in the fields
101101
for _, field := range df.fields {
102102
for i := field.Len() - 1; i >= 0; i-- {
@@ -106,10 +106,10 @@ func (df *framer) toFrame(messages []Message) (*data.Frame, error) {
106106

107107
for _, message := range messages {
108108
df.iterator = jsoniter.ParseBytes(jsoniter.ConfigDefault, message.Value)
109-
err := df.next()
109+
err := df.next(logger)
110110
if err != nil {
111111
// If JSON parsing fails, treat the raw bytes as a string value
112-
log.DefaultLogger.Debug("JSON parsing failed, treating as raw string", "error", err, "value", string(message.Value))
112+
logger.Debug("JSON parsing failed, treating as raw string", "error", err, "value", string(message.Value))
113113
rawValue := string(message.Value)
114114
df.addValue(data.FieldTypeNullableString, &rawValue)
115115
}

pkg/mqtt/framer_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
89
"github.com/grafana/grafana-plugin-sdk-go/experimental"
910
"github.com/stretchr/testify/require"
1011
)
@@ -86,7 +87,7 @@ func runTest(t *testing.T, name string, values ...any) {
8687
for i, v := range values {
8788
messages = append(messages, Message{Timestamp: timestamp.Add(time.Duration(i) * time.Minute), Value: toJSON(v)})
8889
}
89-
frame, err := f.toFrame(messages)
90+
frame, err := f.toFrame(messages, log.DefaultLogger)
9091
require.NoError(t, err)
9192
require.NotNil(t, frame)
9293
experimental.CheckGoldenJSONFrame(t, "testdata", name, frame, update)
@@ -100,7 +101,7 @@ func runRawTest(t *testing.T, name string, rawValues ...[]byte) {
100101
for i, v := range rawValues {
101102
messages = append(messages, Message{Timestamp: timestamp.Add(time.Duration(i) * time.Minute), Value: v})
102103
}
103-
frame, err := f.toFrame(messages)
104+
frame, err := f.toFrame(messages, log.DefaultLogger)
104105
require.NoError(t, err)
105106
require.NotNil(t, frame)
106107
experimental.CheckGoldenJSONFrame(t, "testdata", name, frame, update)

pkg/mqtt/topic.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ func (t *Topic) Key() string {
3333
}
3434

3535
// ToDataFrame converts the topic to a data frame.
36-
func (t *Topic) ToDataFrame() (*data.Frame, error) {
36+
func (t *Topic) ToDataFrame(logger log.Logger) (*data.Frame, error) {
3737
if t.framer == nil {
3838
t.framer = newFramer()
3939
}
40-
return t.framer.toFrame(t.Messages)
40+
return t.framer.toFrame(t.Messages, logger)
4141
}
4242

4343
// TopicMap is a thread-safe map of topics
@@ -58,7 +58,7 @@ func (tm *TopicMap) Load(key string) (*Topic, bool) {
5858

5959
// AddMessage adds a message to the topic for the given path.
6060
func (tm *TopicMap) AddMessage(path string, message Message) {
61-
tm.Map.Range(func(key, t any) bool {
61+
tm.Range(func(key, t any) bool {
6262
topic, ok := t.(*Topic)
6363
if !ok {
6464
return false
@@ -75,7 +75,7 @@ func (tm *TopicMap) AddMessage(path string, message Message) {
7575
func (tm *TopicMap) HasSubscription(path string) bool {
7676
found := false
7777

78-
tm.Map.Range(func(key, t any) bool {
78+
tm.Range(func(key, t any) bool {
7979
topic, ok := t.(*Topic)
8080
if !ok {
8181
return true // this shouldn't happen, but continue iterating
@@ -110,10 +110,10 @@ func (tm *TopicMap) Delete(key string) {
110110
//
111111
// To comply with these restrictions, the topic is encoded using URL-safe base64
112112
// encoding. (RFC 4648; 5. Base 64 Encoding with URL and Filename Safe Alphabet)
113-
func decodeTopic(topicPath string) (string, error) {
113+
func decodeTopic(topicPath string, logger log.Logger) (string, error) {
114114
chunks := strings.Split(topicPath, "/")
115115
topic := chunks[0]
116-
log.DefaultLogger.Debug("Decoding MQTT topic name", "encodedTopic", topic)
116+
logger.Debug("Decoding MQTT topic name", "encodedTopic", topic)
117117
decoded, err := base64.RawURLEncoding.DecodeString(topic)
118118

119119
if err != nil {

pkg/plugin/datasource.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ var (
2121
)
2222

2323
// NewMQTTDatasource creates a new datasource instance.
24-
func NewMQTTInstance(_ context.Context, s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
24+
func NewMQTTInstance(ctx context.Context, s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
2525
settings, err := getDatasourceSettings(s)
2626
if err != nil {
2727
return nil, err
2828
}
2929

30-
client, err := mqtt.NewClient(*settings)
30+
client, err := mqtt.NewClient(ctx, *settings)
3131
if err != nil {
3232
return nil, err
3333
}

pkg/plugin/datasource_test.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
"github.com/grafana/grafana-plugin-sdk-go/backend"
8+
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
89
"github.com/grafana/mqtt-datasource/pkg/mqtt"
910
"github.com/grafana/mqtt-datasource/pkg/plugin"
1011
"github.com/stretchr/testify/require"
@@ -13,8 +14,7 @@ import (
1314
func TestCheckHealthHandler(t *testing.T) {
1415
t.Run("HealthStatusOK when can connect", func(t *testing.T) {
1516
ds := plugin.NewMQTTDatasource(&fakeMQTTClient{
16-
connected: true,
17-
subscribed: false,
17+
connected: true,
1818
}, "xyz")
1919

2020
res, _ := ds.CheckHealth(
@@ -28,8 +28,7 @@ func TestCheckHealthHandler(t *testing.T) {
2828

2929
t.Run("HealthStatusError when disconnected", func(t *testing.T) {
3030
ds := plugin.NewMQTTDatasource(&fakeMQTTClient{
31-
connected: false,
32-
subscribed: false,
31+
connected: false,
3332
}, "xyz")
3433

3534
res, _ := ds.CheckHealth(
@@ -43,8 +42,7 @@ func TestCheckHealthHandler(t *testing.T) {
4342
}
4443

4544
type fakeMQTTClient struct {
46-
connected bool
47-
subscribed bool
45+
connected bool
4846
}
4947

5048
func (c *fakeMQTTClient) GetTopic(_ string) (*mqtt.Topic, bool) {
@@ -55,10 +53,6 @@ func (c *fakeMQTTClient) IsConnected() bool {
5553
return c.connected
5654
}
5755

58-
func (c *fakeMQTTClient) IsSubscribed(_ string) bool {
59-
return c.subscribed
60-
}
61-
62-
func (c *fakeMQTTClient) Subscribe(_ string) *mqtt.Topic { return nil }
63-
func (c *fakeMQTTClient) Unsubscribe(_ string) {}
64-
func (c *fakeMQTTClient) Dispose() {}
56+
func (c *fakeMQTTClient) Subscribe(_ string, _ log.Logger) *mqtt.Topic { return nil }
57+
func (c *fakeMQTTClient) Unsubscribe(_ string, _ log.Logger) {}
58+
func (c *fakeMQTTClient) Dispose() {}

pkg/plugin/query.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package plugin
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"path"
87

98
"github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -30,13 +29,11 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse {
3029
)
3130

3231
if err := json.Unmarshal(query.JSON, &t); err != nil {
33-
response.Error = err
34-
return response
32+
return backend.ErrorResponseWithErrorSource(backend.DownstreamErrorf("failed to unmarshal query: %w", err))
3533
}
3634

3735
if t.Path == "" {
38-
response.Error = fmt.Errorf("topic path is required")
39-
return response
36+
return backend.ErrorResponseWithErrorSource(backend.DownstreamErrorf("topic path is required"))
4037
}
4138

4239
t.Interval = query.Interval

0 commit comments

Comments
 (0)