Skip to content

Commit 8d6c415

Browse files
authored
Add support for raw values (#137)
### Problem Users reported that string values like "on", "off", or "admin_off" were not being received, while numeric values worked fine. The framer expected all payloads to be valid JSON, but many MQTT publishers send raw strings without JSON encoding. ### Description Fixes an issue where the MQTT datasource failed to process raw string values that aren't JSON-encoded, causing data to be silently dropped. ### Solution Added fallback logic in toFrame() method to treat raw bytes as string values when JSON parsing fails. Fixes: #131
1 parent 1ba1ea2 commit 8d6c415

File tree

5 files changed

+205
-3
lines changed

5 files changed

+205
-3
lines changed

pkg/mqtt/framer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,10 @@ func (df *framer) toFrame(messages []Message) (*data.Frame, error) {
108108
df.iterator = jsoniter.ParseBytes(jsoniter.ConfigDefault, message.Value)
109109
err := df.next()
110110
if err != nil {
111-
log.DefaultLogger.Error("error parsing message", "error", err)
112-
continue
111+
// If JSON parsing fails, treat the raw bytes as a string value
112+
log.DefaultLogger.Debug("JSON parsing failed, treating as raw string", "error", err, "value", string(message.Value))
113+
rawValue := string(message.Value)
114+
df.addValue(data.FieldTypeNullableString, &rawValue)
113115
}
114116
df.fields[0].Append(message.Timestamp)
115117
df.extendFields(df.fields[0].Len() - 1)

pkg/mqtt/framer_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010
)
1111

12-
var update = true
12+
var update = false
1313

1414
func Test_framer(t *testing.T) {
1515

@@ -61,6 +61,21 @@ func Test_framer(t *testing.T) {
6161
t.Run("null", func(t *testing.T) {
6262
runTest(t, "null", nil)
6363
})
64+
65+
// Test raw string values (without JSON encoding) - this reproduces the user issue
66+
t.Run("raw string values", func(t *testing.T) {
67+
runRawTest(t, "raw-string", []byte("on"), []byte("off"), []byte("admin_off"))
68+
})
69+
70+
// Test raw numeric values
71+
t.Run("raw numeric values", func(t *testing.T) {
72+
runRawTest(t, "raw-number", []byte("123"), []byte("456.789"))
73+
})
74+
75+
// Test mixed raw values
76+
t.Run("mixed raw values", func(t *testing.T) {
77+
runRawTest(t, "raw-mixed", []byte("25"), []byte("on"), []byte("123.45"))
78+
})
6479
}
6580

6681
func runTest(t *testing.T, name string, values ...any) {
@@ -77,6 +92,20 @@ func runTest(t *testing.T, name string, values ...any) {
7792
experimental.CheckGoldenJSONFrame(t, "testdata", name, frame, update)
7893
}
7994

95+
func runRawTest(t *testing.T, name string, rawValues ...[]byte) {
96+
t.Helper()
97+
f := newFramer()
98+
timestamp := time.Unix(0, 0)
99+
messages := []Message{}
100+
for i, v := range rawValues {
101+
messages = append(messages, Message{Timestamp: timestamp.Add(time.Duration(i) * time.Minute), Value: v})
102+
}
103+
frame, err := f.toFrame(messages)
104+
require.NoError(t, err)
105+
require.NotNil(t, frame)
106+
experimental.CheckGoldenJSONFrame(t, "testdata", name, frame, update)
107+
}
108+
80109
func toJSON(v interface{}) []byte {
81110
b, err := json.Marshal(v)
82111
if err != nil {

pkg/mqtt/testdata/raw-mixed.jsonc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// 🌟 This was machine generated. Do not edit. 🌟
2+
//
3+
// Frame[0]
4+
// Name: mqtt
5+
// Dimensions: 2 Fields by 3 Rows
6+
// +-------------------------------+------------------+
7+
// | Name: Time | Name: Value |
8+
// | Labels: | Labels: |
9+
// | Type: []time.Time | Type: []*float64 |
10+
// +-------------------------------+------------------+
11+
// | 1970-01-01 02:00:00 +0200 EET | 25 |
12+
// | 1970-01-01 02:01:00 +0200 EET | null |
13+
// | 1970-01-01 02:02:00 +0200 EET | 123.45 |
14+
// +-------------------------------+------------------+
15+
//
16+
//
17+
// 🌟 This was machine generated. Do not edit. 🌟
18+
{
19+
"status": 200,
20+
"frames": [
21+
{
22+
"schema": {
23+
"name": "mqtt",
24+
"fields": [
25+
{
26+
"name": "Time",
27+
"type": "time",
28+
"typeInfo": {
29+
"frame": "time.Time"
30+
}
31+
},
32+
{
33+
"name": "Value",
34+
"type": "number",
35+
"typeInfo": {
36+
"frame": "float64",
37+
"nullable": true
38+
}
39+
}
40+
]
41+
},
42+
"data": {
43+
"values": [
44+
[
45+
0,
46+
60000,
47+
120000
48+
],
49+
[
50+
25,
51+
null,
52+
123.45
53+
]
54+
]
55+
}
56+
}
57+
]
58+
}

pkg/mqtt/testdata/raw-number.jsonc

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// 🌟 This was machine generated. Do not edit. 🌟
2+
//
3+
// Frame[0]
4+
// Name: mqtt
5+
// Dimensions: 2 Fields by 2 Rows
6+
// +-------------------------------+------------------+
7+
// | Name: Time | Name: Value |
8+
// | Labels: | Labels: |
9+
// | Type: []time.Time | Type: []*float64 |
10+
// +-------------------------------+------------------+
11+
// | 1970-01-01 02:00:00 +0200 EET | 123 |
12+
// | 1970-01-01 02:01:00 +0200 EET | 456.789 |
13+
// +-------------------------------+------------------+
14+
//
15+
//
16+
// 🌟 This was machine generated. Do not edit. 🌟
17+
{
18+
"status": 200,
19+
"frames": [
20+
{
21+
"schema": {
22+
"name": "mqtt",
23+
"fields": [
24+
{
25+
"name": "Time",
26+
"type": "time",
27+
"typeInfo": {
28+
"frame": "time.Time"
29+
}
30+
},
31+
{
32+
"name": "Value",
33+
"type": "number",
34+
"typeInfo": {
35+
"frame": "float64",
36+
"nullable": true
37+
}
38+
}
39+
]
40+
},
41+
"data": {
42+
"values": [
43+
[
44+
0,
45+
60000
46+
],
47+
[
48+
123,
49+
456.789
50+
]
51+
]
52+
}
53+
}
54+
]
55+
}

pkg/mqtt/testdata/raw-string.jsonc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// 🌟 This was machine generated. Do not edit. 🌟
2+
//
3+
// Frame[0]
4+
// Name: mqtt
5+
// Dimensions: 2 Fields by 3 Rows
6+
// +-------------------------------+-----------------+
7+
// | Name: Time | Name: Value |
8+
// | Labels: | Labels: |
9+
// | Type: []time.Time | Type: []*string |
10+
// +-------------------------------+-----------------+
11+
// | 1970-01-01 02:00:00 +0200 EET | on |
12+
// | 1970-01-01 02:01:00 +0200 EET | off |
13+
// | 1970-01-01 02:02:00 +0200 EET | admin_off |
14+
// +-------------------------------+-----------------+
15+
//
16+
//
17+
// 🌟 This was machine generated. Do not edit. 🌟
18+
{
19+
"status": 200,
20+
"frames": [
21+
{
22+
"schema": {
23+
"name": "mqtt",
24+
"fields": [
25+
{
26+
"name": "Time",
27+
"type": "time",
28+
"typeInfo": {
29+
"frame": "time.Time"
30+
}
31+
},
32+
{
33+
"name": "Value",
34+
"type": "string",
35+
"typeInfo": {
36+
"frame": "string",
37+
"nullable": true
38+
}
39+
}
40+
]
41+
},
42+
"data": {
43+
"values": [
44+
[
45+
0,
46+
60000,
47+
120000
48+
],
49+
[
50+
"on",
51+
"off",
52+
"admin_off"
53+
]
54+
]
55+
}
56+
}
57+
]
58+
}

0 commit comments

Comments
 (0)