diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index 9a5b253e..05b6fe74 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -287,12 +287,33 @@ func (p *Plugin) Init(instanceId string) error { in := &dcspb.SubscriptionRequest{ InstanceId: instanceId, } - evStream, err := p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{}) - if err != nil { - return fmt.Errorf("failed to subscribe to DCS service on %s, possible network issue or DCS gateway malfunction", viper.GetString("dcsServiceEndpoint")) - } + + // Always start the goroutine, even if initial subscription fails go func() { + var evStream dcspb.Configurator_SubscribeClient + var err error + for { + // Try to establish subscription if we don't have one + if evStream == nil { + log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). + Debug("attempting to subscribe to DCS service") + + evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{}) + if err != nil { + log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). + WithError(err). + Warnf("failed to subscribe to DCS service, possible network issue or DCS gateway malfunction") + time.Sleep(3 * time.Second) + continue + } else { + log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). + WithField("level", infologger.IL_Support). + Info("successfully subscribed to DCS service") + } + } + + // Process events from the stream for { if evStream == nil { break @@ -300,12 +321,14 @@ func (p *Plugin) Init(instanceId string) error { ev, streamErr := evStream.Recv() if streamErr == io.EOF { log.Info("unexpected EOF from DCS service, possible DCS gateway malfunction") + evStream = nil break } if streamErr != nil { log.WithError(streamErr). Error("stream error or bad event from DCS service, dropping stream") + evStream = nil time.Sleep(3 * time.Second) break } @@ -330,20 +353,10 @@ func (p *Plugin) Init(instanceId string) error { Debug("received DCS event") } + // If we reach here, the stream was dropped and evStream is nil + // The loop will continue and try to reestablish the subscription log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). Info("DCS stream dropped, attempting reconnect") - - evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{}) - if err != nil { - log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). - WithError(err). - Warnf("failed to resubscribe to DCS service, possible network issue or DCS gateway malfunction") - time.Sleep(3 * time.Second) - } else { - log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). - WithField("level", infologger.IL_Support). - Info("successfully resubscribed to DCS service") - } } }() }