Skip to content

Commit 0bdbe31

Browse files
authored
enhance queue (#32)
Signed-off-by: Jeeva Kandasamy <jkandasa@gmail.com>
1 parent cc1ddf0 commit 0bdbe31

File tree

19 files changed

+1772
-78
lines changed

19 files changed

+1772
-78
lines changed

pkg/service/deletion/service.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,18 @@ func (svc *DeletionService) onEventReceive(busData *busTY.BusData) {
9292
}
9393
}
9494

95-
func (svc *DeletionService) processEvent(item interface{}) {
95+
func (svc *DeletionService) processEvent(item interface{}) error {
9696
busData := item.(*busTY.BusData)
9797
event := &eventTY.Event{}
9898
err := busData.LoadData(event)
9999
if err != nil {
100100
svc.logger.Warn("error on convert to target type", zap.Any("topic", busData.Topic), zap.Error(err))
101-
return
101+
return nil
102102
}
103103

104104
// if it is not a deletion event, return from here
105105
if event.Type != eventTY.TypeDeleted {
106-
return
106+
return nil
107107
}
108108

109109
svc.logger.Debug("received an deletion event", zap.Any("event", event))
@@ -116,7 +116,7 @@ func (svc *DeletionService) processEvent(item interface{}) {
116116
err = event.LoadEntity(gateway)
117117
if err != nil {
118118
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
119-
return
119+
return nil
120120
}
121121
svc.deleteNodes(gateway)
122122

@@ -125,7 +125,7 @@ func (svc *DeletionService) processEvent(item interface{}) {
125125
err = event.LoadEntity(node)
126126
if err != nil {
127127
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
128-
return
128+
return nil
129129
}
130130
svc.deleteSources(node)
131131

@@ -134,14 +134,15 @@ func (svc *DeletionService) processEvent(item interface{}) {
134134
err = event.LoadEntity(source)
135135
if err != nil {
136136
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
137-
return
137+
return nil
138138
}
139139
svc.deleteFields(source)
140140

141141
default:
142142
// do not proceed further
143-
return
143+
return nil
144144
}
145+
return nil
145146
}
146147

147148
// deletes nodes

pkg/service/forward_payload/service.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ func (svc *ForwardPayloadService) Close() error {
127127
}
128128

129129
// processEvent from the queue
130-
func (svc *ForwardPayloadService) processEvent(item interface{}) {
130+
func (svc *ForwardPayloadService) processEvent(item interface{}) error {
131131
field := item.(*field.Field)
132132

133133
quickID, err := quickIdUtils.GetQuickID(*field)
134134
if err != nil {
135135
svc.logger.Error("unable to get quick id", zap.Error(err), zap.String("gateway", field.GatewayID), zap.String("node", field.NodeID), zap.String("source", field.SourceID), zap.String("field", field.FieldID))
136-
return
136+
return nil
137137
}
138138

139139
// fetch mapped filed for this event
@@ -145,11 +145,11 @@ func (svc *ForwardPayloadService) processEvent(item interface{}) {
145145
response, err := svc.api.ForwardPayload().List(filters, pagination)
146146
if err != nil {
147147
svc.logger.Error("error getting mapping data from database", zap.Error(err))
148-
return
148+
return nil
149149
}
150150

151151
if response.Count == 0 {
152-
return
152+
return nil
153153
}
154154

155155
svc.logger.Debug("Starting data forwarding", zap.Any("data", field))
@@ -167,4 +167,5 @@ func (svc *ForwardPayloadService) processEvent(item interface{}) {
167167
}
168168
}
169169
}
170+
return nil
170171
}

pkg/service/gateway/listener.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (svc *GatewayService) onEvent(event *busTY.BusData) {
6868
}
6969

7070
// processEvent from the queue
71-
func (svc *GatewayService) processEvent(event interface{}) {
71+
func (svc *GatewayService) processEvent(event interface{}) error {
7272
reqEvent := event.(*rsTY.ServiceEvent)
7373
svc.logger.Debug("Processing a request", zap.Any("event", reqEvent))
7474

@@ -92,7 +92,7 @@ func (svc *GatewayService) processEvent(event interface{}) {
9292
if err != nil {
9393
svc.logger.Error("error on stopping a service", zap.Error(err), zap.String("id", reqEvent.ID))
9494
}
95-
return
95+
return nil
9696
}
9797
gwCfg := svc.getGatewayConfig(reqEvent)
9898
if gwCfg != nil {
@@ -129,6 +129,7 @@ func (svc *GatewayService) processEvent(event interface{}) {
129129
default:
130130
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
131131
}
132+
return nil
132133
}
133134

134135
func (svc *GatewayService) getGatewayConfig(reqEvent *rsTY.ServiceEvent) *gwTY.Config {

pkg/service/gateway_msg_processor/service.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (svc *MessageProcessor) Close() error {
131131
}
132132

133133
// processMessage from the queue
134-
func (svc *MessageProcessor) processMessage(item interface{}) {
134+
func (svc *MessageProcessor) processMessage(item interface{}) error {
135135
msg := item.(*msgTY.Message)
136136
svc.logger.Debug("Starting Message Processing", zap.Any("message", msg))
137137

@@ -142,6 +142,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
142142
err := svc.setFieldData(msg)
143143
if err != nil {
144144
svc.logger.Error("error on field data set", zap.Error(err))
145+
return err // Requeue on error
145146
}
146147
// update last seen
147148
svc.updateSourceLastSeen(msg.GatewayID, msg.NodeID, msg.SourceID, msg.Timestamp)
@@ -151,12 +152,14 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
151152
err := svc.requestFieldData(msg)
152153
if err != nil {
153154
svc.logger.Error("error on field data request", zap.Error(err))
155+
return err // Requeue on error
154156
}
155157

156158
case msgTY.TypePresentation: // update source data, like name or other details
157159
err := svc.updateSourceDetail(msg)
158160
if err != nil {
159161
svc.logger.Error("error on source data update", zap.Error(err))
162+
return err // Requeue on error
160163
}
161164
// update last seen
162165
svc.updateSourceLastSeen(msg.GatewayID, msg.NodeID, msg.SourceID, msg.Timestamp)
@@ -172,6 +175,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
172175
err := svc.updateNodeData(msg)
173176
if err != nil {
174177
svc.logger.Error("error on node data update", zap.Error(err))
178+
return err // Requeue on error
175179
}
176180
// node last seen managed in updateNodeData
177181

@@ -194,6 +198,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
194198
}
195199

196200
svc.logger.Debug("message processed", zap.String("timeTaken", time.Since(msg.Timestamp).String()), zap.Any("message", msg))
201+
return nil
197202
}
198203

199204
// update node detail

pkg/service/handler/message_listerner.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (svc *HandlerService) closeMessageListener() {
4747
svc.messageQueue.Close()
4848
}
4949

50-
func (svc *HandlerService) processHandlerMessage(item interface{}) {
50+
func (svc *HandlerService) processHandlerMessage(item interface{}) error {
5151
msg := item.(*handlerTY.MessageWrapper)
5252
start := time.Now()
5353

@@ -56,16 +56,18 @@ func (svc *HandlerService) processHandlerMessage(item interface{}) {
5656
handler := svc.store.Get(msg.ID)
5757
if handler == nil {
5858
svc.logger.Info("handler not available", zap.Any("handlerID", msg.ID), zap.Any("availableHandlers", svc.store.ListIDs()))
59-
return
59+
return nil // Don't requeue if handler not available
6060
}
6161

6262
state := handler.State()
6363

6464
err := handler.Post(msg.Data)
6565
if err != nil {
66-
// if err == handlerTY.ErrReQueue {
67-
// // TODO: requeue and try again
68-
// }
66+
if err == handlerTY.ErrReQueue {
67+
// Requeue the message to try again
68+
svc.logger.Info("requeuing message", zap.Any("handlerID", msg.ID))
69+
return err
70+
}
6971
svc.logger.Warn("error from handler", zap.Any("handlerID", msg.ID), zap.Error(err))
7072
state.Status = types.StatusError
7173
state.Message = err.Error()
@@ -76,4 +78,5 @@ func (svc *HandlerService) processHandlerMessage(item interface{}) {
7678

7779
state.Since = time.Now()
7880
busUtils.SetHandlerState(svc.logger, svc.bus, msg.ID, *state)
81+
return nil
7982
}

pkg/service/handler/service_listener.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,13 @@ func (svc *HandlerService) onServiceEvent(event *busTY.BusData) {
138138
}
139139

140140
// postProcessServiceEvent from the queue
141-
func (svc *HandlerService) postProcessServiceEvent(event interface{}) {
141+
func (svc *HandlerService) postProcessServiceEvent(event interface{}) error {
142142
reqEvent := event.(*rsTY.ServiceEvent)
143143
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))
144144

145145
if reqEvent.Type != rsTY.TypeHandler {
146146
svc.logger.Warn("unsupported event type", zap.Any("event", reqEvent))
147+
return nil
147148
}
148149

149150
switch reqEvent.Command {
@@ -162,7 +163,7 @@ func (svc *HandlerService) postProcessServiceEvent(event interface{}) {
162163
if err != nil {
163164
svc.logger.Error("error on stopping a service", zap.Error(err))
164165
}
165-
return
166+
return nil
166167
}
167168
cfg := svc.getConfig(reqEvent)
168169
if cfg != nil {
@@ -187,6 +188,7 @@ func (svc *HandlerService) postProcessServiceEvent(event interface{}) {
187188
default:
188189
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
189190
}
191+
return nil
190192
}
191193

192194
func (svc *HandlerService) getConfig(reqEvent *rsTY.ServiceEvent) *handlerTY.Config {

pkg/service/resource/service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (svc *ResourceService) onEvent(data *busTY.BusData) {
113113
}
114114

115115
// processEvent from the queue
116-
func (svc *ResourceService) processEvent(item interface{}) {
116+
func (svc *ResourceService) processEvent(item interface{}) error {
117117
request := item.(*rsTY.ServiceEvent)
118118
svc.logger.Debug("processing an event", zap.Any("event", request))
119119
start := time.Now()
@@ -170,6 +170,7 @@ func (svc *ResourceService) processEvent(item interface{}) {
170170
svc.logger.Warn("unknown event type", zap.Any("event", request))
171171
}
172172
svc.logger.Debug("completed a resource service", zap.String("timeTaken", time.Since(start).String()), zap.Any("data", request))
173+
return nil
173174
}
174175

175176
func (svc *ResourceService) postResponse(topic string, response *rsTY.ServiceEvent) error {

pkg/service/scheduler/listener.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (svc *SchedulerService) onServiceEvent(event *busTY.BusData) {
7070
}
7171

7272
// processServiceEvent from the queue
73-
func (svc *SchedulerService) processServiceEvent(event interface{}) {
73+
func (svc *SchedulerService) processServiceEvent(event interface{}) error {
7474
reqEvent := event.(*rsTY.ServiceEvent)
7575
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))
7676

@@ -105,6 +105,7 @@ func (svc *SchedulerService) processServiceEvent(event interface{}) {
105105
default:
106106
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
107107
}
108+
return nil
108109
}
109110

110111
func (svc *SchedulerService) getConfig(reqEvent *rsTY.ServiceEvent) *schedulerTY.Config {

pkg/service/system_jobs/service_listener.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,18 @@ func (svc *SystemJobsService) onEvent(event *busTY.BusData) {
5252
}
5353

5454
// processEvent from the queue
55-
func (svc *SystemJobsService) processEvent(event interface{}) {
55+
func (svc *SystemJobsService) processEvent(event interface{}) error {
5656
reqEvent := event.(*rsTY.ServiceEvent)
5757
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))
5858

5959
if reqEvent.Type != rsTY.TypeSystemJobs {
6060
svc.logger.Warn("unsupported event type", zap.Any("event", reqEvent))
61-
return
61+
return nil
6262
}
6363

6464
if reqEvent.Command != rsTY.CommandReload {
6565
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
66-
return
66+
return nil
6767
}
6868

6969
switch reqEvent.Data {
@@ -77,4 +77,5 @@ func (svc *SystemJobsService) processEvent(event interface{}) {
7777
default:
7878
// NOOP
7979
}
80+
return nil
8081
}

pkg/service/task/listener_events.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ func (svc *TaskService) onEventReceive(busData *busTY.BusData) {
4747
}
4848
}
4949

50-
func (svc *TaskService) processPreEvent(item interface{}) {
50+
func (svc *TaskService) processPreEvent(item interface{}) error {
5151
busData := item.(*busTY.BusData)
5252

5353
event := &eventTY.Event{}
5454
err := busData.LoadData(event)
5555
if err != nil {
5656
svc.logger.Warn("error on convert to target type", zap.Any("topic", busData.Topic), zap.Error(err))
57-
return
57+
return nil
5858
}
5959

6060
var out interface{}
@@ -78,21 +78,21 @@ func (svc *TaskService) processPreEvent(item interface{}) {
7878

7979
default:
8080
// return do not proceed further
81-
return
81+
return nil
8282
}
8383

8484
err = event.LoadEntity(out)
8585
if err != nil {
8686
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
87-
return
87+
return nil
8888
}
8989
event.Entity = out
9090

9191
resourceWrapper := &eventWrapper{Event: event}
9292
err = svc.resourcePreProcessor(resourceWrapper)
9393
if err != nil {
9494
svc.logger.Error("error on executing a resource", zap.Any("resource", resourceWrapper), zap.Error(err))
95-
return
95+
return err
9696
}
9797

9898
if len(resourceWrapper.Tasks) > 0 {
@@ -101,6 +101,7 @@ func (svc *TaskService) processPreEvent(item interface{}) {
101101
svc.logger.Error("failed to post selected tasks on post processor queue")
102102
}
103103
}
104+
return nil
104105
}
105106

106107
func (svc *TaskService) resourcePreProcessor(evntWrapper *eventWrapper) error {
@@ -119,11 +120,11 @@ func (svc *TaskService) resourcePreProcessor(evntWrapper *eventWrapper) error {
119120
return nil
120121
}
121122

122-
func (svc *TaskService) resourcePostProcessor(item interface{}) {
123+
func (svc *TaskService) resourcePostProcessor(item interface{}) error {
123124
evntWrapper, ok := item.(*eventWrapper)
124125
if !ok {
125126
svc.logger.Warn("supplied item is not resourceWrapper", zap.Any("item", item))
126-
return
127+
return nil
127128
}
128129

129130
svc.logger.Debug("resourceWrapper received", zap.String("entityType", evntWrapper.Event.EntityType))
@@ -132,4 +133,5 @@ func (svc *TaskService) resourcePostProcessor(item interface{}) {
132133
task := evntWrapper.Tasks[index]
133134
svc.executeTask(&task, evntWrapper)
134135
}
136+
return nil
135137
}

0 commit comments

Comments
 (0)