Skip to content

Commit ffbdec8

Browse files
committed
enable additional options and log level in bus
Signed-off-by: Jeeva Kandasamy <jkandasa@gmail.com>
1 parent 9baa835 commit ffbdec8

File tree

1 file changed

+38
-30
lines changed

1 file changed

+38
-30
lines changed

plugin/bus/natsio/client.go

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,25 @@ const (
2626
loggerName = "BUS:NATS.IO"
2727
defaultReconnectWait = 5 * time.Second
2828
defaultConnectionTimeout = 10 * time.Second
29-
defaultMaximumReconnect = 100
30-
defaultBufferSize = 1000
29+
defaultMaximumReconnect = 60
30+
defaultBufferSize = 4194304 // 4MB
3131
)
3232

3333
// Config details of the client
3434
type Config struct {
35-
Type string `yaml:"type"`
36-
ServerURL string `yaml:"server_url"`
37-
Token string `yaml:"token"`
38-
Username string `yaml:"username"`
39-
Password string `yaml:"password"`
40-
Insecure bool `yaml:"insecure"`
41-
ConnectionTimeout string `yaml:"connection_timeout"`
42-
BufferSize int `yaml:"buffer_size"`
43-
MaximumReconnect int `yaml:"maximum_reconnect"`
44-
ReconnectWait string `yaml:"reconnect_wait"`
45-
WebsocketOptions *WebsocketOptions `yaml:"websocket_options"`
46-
TopicPrefix string `yaml:"topic_prefix"`
35+
Type string `yaml:"type"`
36+
ServerURL string `yaml:"server_url"`
37+
Token string `yaml:"token"`
38+
Username string `yaml:"username"`
39+
Password string `yaml:"password"`
40+
Insecure bool `yaml:"insecure"`
41+
BufferSize int `yaml:"buffer_size"`
42+
RetryOnFailedConnect bool `yaml:"retry_on_failed_connect"`
43+
ConnectionTimeout string `yaml:"connection_timeout"`
44+
MaximumReconnect int `yaml:"maximum_reconnect"`
45+
ReconnectWait string `yaml:"reconnect_wait"`
46+
WebsocketOptions *WebsocketOptions `yaml:"websocket_options"`
47+
TopicPrefix string `yaml:"topic_prefix"`
4748
}
4849

4950
// WebsocketOptions are config options for a websocket dialer
@@ -110,17 +111,20 @@ func NewClient(ctx context.Context, config cmap.CustomMap) (busPluginTY.Plugin,
110111
}
111112

112113
opts := natsIO.Options{
113-
Url: fakeServerURI.String(),
114-
Secure: false, // will be handled by our custom dialer
115-
Verbose: true,
116-
ReconnectWait: utils.ToDuration(cfg.ReconnectWait, defaultReconnectWait),
117-
AllowReconnect: true,
118-
MaxReconnect: cfg.MaximumReconnect,
119-
ReconnectBufSize: cfg.BufferSize,
120-
ClosedCB: client.callBackClosed,
121-
ReconnectedCB: client.callBackReconnected,
122-
DisconnectedCB: client.callBackDisconnected,
123-
DisconnectedErrCB: client.callBackDisconnectedError,
114+
Url: fakeServerURI.String(),
115+
Secure: false, // will be handled by our custom dialer
116+
Verbose: true,
117+
RetryOnFailedConnect: cfg.RetryOnFailedConnect,
118+
AllowReconnect: true,
119+
ReconnectWait: utils.ToDuration(cfg.ReconnectWait, defaultReconnectWait),
120+
MaxReconnect: cfg.MaximumReconnect,
121+
ReconnectBufSize: cfg.BufferSize,
122+
ConnectedCB: client.callBackConnected,
123+
ClosedCB: client.callBackClosed,
124+
ReconnectedCB: client.callBackReconnected,
125+
DisconnectedCB: client.callBackDisconnected,
126+
DisconnectedErrCB: client.callBackDisconnectedError,
127+
NoCallbacksAfterClientClose: true,
124128
}
125129

126130
// update secure login if enabled
@@ -132,9 +136,9 @@ func NewClient(ctx context.Context, config cmap.CustomMap) (busPluginTY.Plugin,
132136
case cfg.Username != "":
133137
opts.User = cfg.Username
134138
opts.Password = cfg.Password
135-
136139
}
137140

141+
// add custom dialer
138142
customDialer, err := NewCustomDialer(cfg, client.logger)
139143
if err != nil {
140144
return nil, err
@@ -284,21 +288,25 @@ func (c *Client) UnsubscribeAll(topic string) error {
284288
}
285289

286290
// call back functions
291+
func (c *Client) callBackConnected(con *natsIO.Conn) {
292+
c.logger.Info("connected")
293+
}
294+
287295
func (c *Client) callBackDisconnected(con *natsIO.Conn) {
288-
c.logger.Debug("disconnected")
296+
c.logger.Info("disconnected")
289297
}
290298

291299
func (c *Client) callBackReconnected(con *natsIO.Conn) {
292-
c.logger.Debug("reconnected")
300+
c.logger.Info("reconnected")
293301
}
294302

295303
func (c *Client) callBackClosed(con *natsIO.Conn) {
296-
c.logger.Debug("connection closed")
304+
c.logger.Info("connection closed")
297305
}
298306

299307
func (c *Client) callBackDisconnectedError(con *natsIO.Conn, err error) {
300308
if err != nil {
301-
c.logger.Debug("disconnected", zap.String("error", err.Error()))
309+
c.logger.Error("disconnected", zap.String("error", err.Error()))
302310
} else {
303311
c.logger.Debug("disconnected")
304312
}

0 commit comments

Comments
 (0)