Skip to content

Commit 42c11a7

Browse files
committed
fix mqtt reconnect subscription issue
Signed-off-by: Jeeva Kandasamy <jkandasa@gmail.com>
1 parent be788a0 commit 42c11a7

File tree

3 files changed

+24
-21
lines changed

3 files changed

+24
-21
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* `raw` - sends the serial,ethernet messages to mqtt as is
1313
* `serial` to `MQTT`
1414
* `ethernet` to `MQTT`
15+
* `http` to `MQTT`
1516

1617
## Download
1718
### Container images

plugin/device/mqtt/device.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message)
5757
if err != nil {
5858
return nil, err
5959
}
60-
zap.L().Debug("mqtt config", zap.String("id", ID), zap.Any("config", cfg))
60+
zap.L().Debug("mqtt config", zap.Any("adapterName", ID), zap.Any("config", cfg))
6161

6262
// endpoint
6363
endpoint := &Endpoint{
@@ -94,11 +94,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message)
9494
// adding client
9595
endpoint.Client = c
9696

97-
err = endpoint.Subscribe(cfg.Subscribe)
98-
if err != nil {
99-
zap.L().Error("error on subscribe a topic", zap.String("topic", cfg.Subscribe), zap.Error(err))
100-
}
101-
zap.L().Debug("mqtt client connected successfully", zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg))
97+
zap.L().Debug("mqtt client connected successfully", zap.Any("adapterName", ID), zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg))
10298
return endpoint, nil
10399
}
104100

@@ -108,6 +104,12 @@ func (ep *Endpoint) Name() string {
108104

109105
func (ep *Endpoint) onConnectionHandler(c paho.Client) {
110106
zap.L().Debug("mqtt connection success", zap.Any("adapterName", ep.ID))
107+
108+
err := ep.Subscribe(ep.Config.Subscribe)
109+
if err != nil {
110+
zap.L().Error("error on subscribe topics", zap.Any("adapterName", ep.ID), zap.String("topics", ep.Config.Subscribe), zap.Error(err))
111+
}
112+
111113
ep.statusFunc(&model.State{
112114
Status: model.StatusUP,
113115
Message: "",
@@ -116,7 +118,7 @@ func (ep *Endpoint) onConnectionHandler(c paho.Client) {
116118
}
117119

118120
func (ep *Endpoint) onConnectionLostHandler(c paho.Client, err error) {
119-
zap.L().Error("mqtt connection lost", zap.Any("id", ep.ID), zap.Error(err))
121+
zap.L().Error("mqtt connection lost", zap.Any("adapterName", ep.ID), zap.Error(err))
120122
// Report connection lost
121123
if err != nil {
122124
ep.statusFunc(&model.State{
@@ -132,7 +134,7 @@ func (ep *Endpoint) Write(message *model.Message) error {
132134
if message == nil {
133135
return nil
134136
}
135-
zap.L().Debug("about to send a message", zap.String("message", message.ToString()))
137+
zap.L().Debug("about to send a message", zap.Any("adapterName", ep.ID), zap.String("message", message.ToString()))
136138
topic := message.Others.GetString(model.KeyMqttTopic)
137139
qos := byte(ep.Config.QoS)
138140

@@ -171,18 +173,18 @@ func (ep *Endpoint) getCallBack() func(paho.Client, paho.Message) {
171173
}
172174

173175
// Subscribe a topic
174-
func (ep *Endpoint) Subscribe(topic string) error {
175-
if topic == "" {
176+
func (ep *Endpoint) Subscribe(topicsStr string) error {
177+
if topicsStr == "" {
176178
return nil
177179
}
178-
token := ep.Client.Subscribe(topic, 0, ep.getCallBack())
179-
token.WaitTimeout(3 * time.Second)
180-
if token.Error() != nil {
181-
ep.statusFunc(&model.State{
182-
Status: model.StatusError,
183-
Message: token.Error().Error(),
184-
Since: time.Now(),
185-
})
180+
topics := strings.Split(topicsStr, ",")
181+
for _, topic := range topics {
182+
topic = strings.TrimSpace(topic)
183+
token := ep.Client.Subscribe(topic, 0, ep.getCallBack())
184+
token.WaitTimeout(3 * time.Second)
185+
if token.Error() != nil {
186+
return token.Error()
187+
}
186188
}
187-
return token.Error()
189+
return nil
188190
}

plugin/device/plugin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package plugin
33
import (
44
"fmt"
55

6-
"github.com/mycontroller-org/2mqtt/pkg/types"
6+
model "github.com/mycontroller-org/2mqtt/pkg/types"
77
deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types"
88
"github.com/mycontroller-org/server/v2/pkg/types/cmap"
99
"go.uber.org/zap"
@@ -25,7 +25,7 @@ func Register(name string, fn CreatorFn) {
2525

2626
func Create(name, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (p deviceType.Plugin, err error) {
2727
if fn, ok := creators[name]; ok {
28-
p, err = fn("", config, rxFunc, statusFunc)
28+
p, err = fn(ID, config, rxFunc, statusFunc)
2929
} else {
3030
err = fmt.Errorf("device plugin [%s] is not registered", name)
3131
}

0 commit comments

Comments
 (0)