From 785c140f01a9229460096c2ff953937f2f65d5b8 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 10 Oct 2025 15:18:27 +0200 Subject: [PATCH 1/3] Add a Kafka reader A necessary building block for OCTRL-1049 --- common/event/reader.go | 132 ++++++++++++++++++++++++++++++++++++ common/event/reader_test.go | 48 +++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 common/event/reader.go create mode 100644 common/event/reader_test.go diff --git a/common/event/reader.go b/common/event/reader.go new file mode 100644 index 00000000..0b348caf --- /dev/null +++ b/common/event/reader.go @@ -0,0 +1,132 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Piotr Konopka + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package event + +import ( + "context" + "fmt" + "github.com/AliceO2Group/Control/common/event/topic" + "github.com/AliceO2Group/Control/common/logger/infologger" + pb "github.com/AliceO2Group/Control/common/protos" + "github.com/segmentio/kafka-go" + "github.com/spf13/viper" + "google.golang.org/protobuf/proto" + "sync" +) + +// Reader interface provides methods to read events. +type Reader interface { + Next(ctx context.Context) (*pb.Event, error) + Close() error +} + +// DummyReader is an implementation of Reader that returns no events. +type DummyReader struct{} + +// Next returns the next event or nil if there are no more events. +func (*DummyReader) Next(context.Context) (*pb.Event, error) { return nil, nil } + +// Close closes the DummyReader. +func (*DummyReader) Close() error { return nil } + +// KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events. +// Consumption mode is chosen at creation time: +// - latestOnly=false: consume everything (from stored offsets or beginning depending on group state) +// - latestOnly=true: seek to latest offsets on start and only receive messages produced after start +type KafkaReader struct { + *kafka.Reader + mu sync.Mutex + topic string +} + +// NewReaderWithTopic creates a KafkaReader for the provided topic and starts it. +// If latestOnly is true the reader attempts to seek to the latest offsets on start so that +// only new messages (produced after creation) are consumed. +func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader { + cfg := kafka.ReaderConfig{ + Brokers: viper.GetStringSlice("kafkaEndpoints"), + Topic: string(topic), + GroupID: groupID, + MinBytes: 1, + MaxBytes: 10e7, + } + + rk := &KafkaReader{ + Reader: kafka.NewReader(cfg), + topic: string(topic), + } + + if latestOnly { + // best-effort: set offset to last so we don't replay older messages + if err := rk.SetOffset(kafka.LastOffset); err != nil { + log.WithField(infologger.Level, infologger.IL_Devel). + Warnf("failed to set offset to last offset: %v", err) + } + } + + return rk +} + +// Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed +// (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx. +func (r *KafkaReader) Next(ctx context.Context) (*pb.Event, error) { + if r == nil { + return nil, fmt.Errorf("nil reader") + } + + msg, err := r.ReadMessage(ctx) + if err != nil { + return nil, err + } + + event, err := kafkaMessageToEvent(msg) + if err != nil { + return nil, err + } + + return event, nil +} + +// Close stops the reader. +func (r *KafkaReader) Close() error { + if r == nil { + return nil + } + // Close the underlying kafka reader which will cause ReadMessage to return an error + err := r.Reader.Close() + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel). + Errorf("failed to close kafka reader: %v", err) + } + return err +} + +func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) { + var evt pb.Event + if err := proto.Unmarshal(m.Value, &evt); err != nil { + return nil, fmt.Errorf("failed to unmarshal kafka message: %w", err) + } + return &evt, nil +} diff --git a/common/event/reader_test.go b/common/event/reader_test.go new file mode 100644 index 00000000..751606fd --- /dev/null +++ b/common/event/reader_test.go @@ -0,0 +1,48 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Piotr Konopka + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package event + +import ( + pb "github.com/AliceO2Group/Control/common/protos" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/segmentio/kafka-go" + "google.golang.org/protobuf/proto" +) + +var _ = Describe("Reader", func() { + When("converting kafka message to event", func() { + It("unmarshals protobuf payload correctly", func() { + e := &pb.Event{Payload: &pb.Event_CoreStartEvent{CoreStartEvent: &pb.Ev_MetaEvent_CoreStart{FrameworkId: "z"}}} + b, err := proto.Marshal(e) + Expect(err).To(BeNil()) + + m := kafka.Message{Value: b} + evt, err := kafkaMessageToEvent(m) + Expect(err).To(BeNil()) + Expect(evt.GetCoreStartEvent().FrameworkId).To(Equal("z")) + }) + }) +}) From 5aa8f2fed3bce859070c3d05b64b747ef78cacf3 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 10 Oct 2025 15:20:56 +0200 Subject: [PATCH 2/3] New LHC plugin to follow LHC updates available in Kafka The updates come from ALICE LHC DIP client. Work towards OCTRL-1049. Eventually, this plugin will also allow to get rid of BKP.RetrieveFillInfo call and consume Fill information without going through BKP. --- README.md | 1 + cmd/o2-aliecs-core/main.go | 5 + core/integration/README.md | 5 + core/integration/lhc/event/lhcevent.go | 56 +++++++ core/integration/lhc/plugin.go | 200 +++++++++++++++++++++++++ 5 files changed, 267 insertions(+) create mode 100644 core/integration/lhc/event/lhcevent.go create mode 100644 core/integration/lhc/plugin.go diff --git a/README.md b/README.md index 3c06e2b4..c1326df5 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,7 @@ There are two ways of interacting with AliECS: * [ECS2DCS2ECS mock server](/core/integration/README.md#ecs2dcs2ecs-mock-server) * [DD Scheduler](/core/integration/README.md#dd-scheduler) * [Kafka (legacy)](/core/integration/README.md#kafka-legacy) + * [LHC](/core/integration/README.md) * [ODC](/core/integration/README.md#odc) * [Test plugin](/core/integration/README.md#test-plugin) * [Trigger](/core/integration/README.md#trigger) diff --git a/cmd/o2-aliecs-core/main.go b/cmd/o2-aliecs-core/main.go index e19eadec..1da25fdd 100644 --- a/cmd/o2-aliecs-core/main.go +++ b/cmd/o2-aliecs-core/main.go @@ -35,6 +35,7 @@ import ( "github.com/AliceO2Group/Control/core/integration/dcs" "github.com/AliceO2Group/Control/core/integration/ddsched" "github.com/AliceO2Group/Control/core/integration/kafka" + "github.com/AliceO2Group/Control/core/integration/lhc" "github.com/AliceO2Group/Control/core/integration/odc" "github.com/AliceO2Group/Control/core/integration/testplugin" "github.com/AliceO2Group/Control/core/integration/trg" @@ -64,6 +65,10 @@ func init() { "kafka", "kafkaEndpoint", kafka.NewPlugin) + integration.RegisterPlugin( + "lhc", + "kafkaEndpoints", + lhc.NewPlugin) integration.RegisterPlugin( "odc", "odcEndpoint", diff --git a/core/integration/README.md b/core/integration/README.md index 2b56fd8a..eda781e8 100644 --- a/core/integration/README.md +++ b/core/integration/README.md @@ -177,6 +177,11 @@ DD scheduler plugin informs the Data Distribution software about the pool of FLP See [Legacy events: Kafka plugin](/docs/kafka.md#legacy-events-kafka-plugin) +# LHC plugin + +This plugin listens to Kafka messages coming from the LHC DIP Client and pushes any relevant internal notifications to the AliECS core. +Its main purpose is to provide basic information about ongoing LHC activity (e.g. fill information) to affected parties and allow AliECS to react upon them (e.g. by automatically stopping a physics run when stable beams are over). + ## ODC ODC plugin communicates with the [Online Device Control (ODC)](https://github.com/FairRootGroup/ODC) instance of the ALICE experiment, which controls the event processing farm used in data taking and offline processing. diff --git a/core/integration/lhc/event/lhcevent.go b/core/integration/lhc/event/lhcevent.go new file mode 100644 index 00000000..ecaa3cc6 --- /dev/null +++ b/core/integration/lhc/event/lhcevent.go @@ -0,0 +1,56 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Piotr Konopka + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package event + +import ( + "github.com/AliceO2Group/Control/common/event" + commonpb "github.com/AliceO2Group/Control/common/protos" +) + +// BeamInfo mirrors (a subset of) the information described in the proto draft. +type BeamInfo struct { + StableBeamsStart int64 `json:"stableBeamsStart,omitempty"` + StableBeamsEnd int64 `json:"stableBeamsEnd,omitempty"` + FillNumber int32 `json:"fillNumber,omitempty"` + FillingSchemeName string `json:"fillingSchemeName,omitempty"` + BeamType string `json:"beamType,omitempty"` + BeamMode commonpb.BeamMode `json:"beamMode,omitempty"` +} + +type LhcStateChangeEvent struct { + event.IntegratedServiceEventBase + BeamInfo BeamInfo +} + +func (e *LhcStateChangeEvent) GetName() string { + return "LHC_STATE_CHANGE_EVENT" +} + +func (e *LhcStateChangeEvent) GetBeamInfo() BeamInfo { + if e == nil { + return BeamInfo{} + } + return e.BeamInfo +} diff --git a/core/integration/lhc/plugin.go b/core/integration/lhc/plugin.go new file mode 100644 index 00000000..bb427655 --- /dev/null +++ b/core/integration/lhc/plugin.go @@ -0,0 +1,200 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Piotr Konopka + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package lhc + +import ( + "context" + "encoding/json" + "errors" + "github.com/AliceO2Group/Control/common/event/topic" + "github.com/AliceO2Group/Control/common/logger/infologger" + pb "github.com/AliceO2Group/Control/common/protos" + "io" + "strings" + "sync" + "time" + + cmnevent "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/utils/uid" + "github.com/AliceO2Group/Control/core/environment" + "github.com/AliceO2Group/Control/core/integration" + lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +var log = logger.New(logrus.StandardLogger(), "lhcclient") +var dipClientTopic topic.Topic = "dip.lhc.beam_mode" + +// Plugin implements integration.Plugin and listens for LHC updates. +type Plugin struct { + endpoint string + ctx context.Context + //cancel context.CancelFunc + //wg sync.WaitGroup + mu sync.Mutex + currentState *pb.BeamInfo + reader cmnevent.Reader +} + +func NewPlugin(endpoint string) integration.Plugin { + + return &Plugin{endpoint: endpoint, mu: sync.Mutex{}, currentState: &pb.BeamInfo{BeamMode: pb.BeamMode_UNKNOWN}} +} + +func (p *Plugin) Init(_ string) error { + + // use a background context for reader loop; Destroy will Close the reader + p.ctx = context.Background() + + p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "", true) + + if p.reader == nil { + return errors.New("could not create a kafka reader for LHC plugin") + } + go p.readAndInjectLhcUpdates() + + log.Debug("LHC plugin initialized (client started)") + return nil +} + +func (p *Plugin) GetName() string { return "lhc" } +func (p *Plugin) GetPrettyName() string { return "LHC (DIP/Kafka client)" } +func (p *Plugin) GetEndpoint() string { + return strings.Join(viper.GetStringSlice("kafkaEndpoints"), ",") +} + +func (p *Plugin) GetConnectionState() string { + if p == nil || p.reader == nil { + return "UNKNOWN" + } + return "READY" // Unfortunately, kafka.Reader does not provide any GetStatus method +} + +func (p *Plugin) GetData(_ []any) string { + p.mu.Lock() + defer p.mu.Unlock() + if p.currentState == nil { + return "" + } + + outMap := make(map[string]interface{}) + outMap["BeamMode"] = p.currentState.BeamMode.String() + outMap["BeamType"] = p.currentState.BeamType + outMap["FillingSchemeName"] = p.currentState.FillingSchemeName + outMap["FillNumber"] = p.currentState.FillNumber + outMap["StableBeamsEnd"] = p.currentState.StableBeamsEnd + outMap["StableBeamsStart"] = p.currentState.StableBeamsStart + + b, _ := json.Marshal(outMap) + return string(b) +} + +func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string { + // there is nothing sensible we could provide here, LHC client is not environment-specific + return nil +} + +func (p *Plugin) GetEnvironmentsShortData(envIds []uid.ID) map[uid.ID]string { + return p.GetEnvironmentsData(envIds) +} + +func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) { + return make(map[string]interface{}) +} +func (p *Plugin) CallStack(_ interface{}) (stack map[string]interface{}) { + return make(map[string]interface{}) +} + +func (p *Plugin) Destroy() error { + if p == nil { + return nil + } + p.mu.Lock() + defer p.mu.Unlock() + + if p.reader != nil { + err := p.reader.Close() + if err != nil { + return err + } + } + return nil +} + +func (p *Plugin) readAndInjectLhcUpdates() { + for { + msg, err := p.reader.Next(p.ctx) + if errors.Is(err, io.EOF) { + log.WithField(infologger.Level, infologger.IL_Support). + Debug("received an EOF from Kafka reader, likely cancellation was requested, breaking") + break + } + if err != nil { + log.WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Error("error while reading from Kafka") + // in case of errors, we throttle the loop to mitigate the risk a log spam if error persists + time.Sleep(time.Second * 1) + continue + } + if msg == nil { + log.WithField(infologger.Level, infologger.IL_Devel). + Warn("received an empty message with no error. it's unexpected, but continuing") + continue + } + + if bmEvt := msg.GetBeamModeEvent(); bmEvt != nil && bmEvt.GetBeamInfo() != nil { + beamInfo := bmEvt.GetBeamInfo() + log.WithField(infologger.Level, infologger.IL_Devel). + Debugf("new LHC update received: BeamMode=%s, FillNumber=%d, FillingScheme=%s, StableBeamsStart=%d, StableBeamsEnd=%d, BeamType=%s", + beamInfo.GetBeamMode().String(), beamInfo.GetFillNumber(), beamInfo.GetFillingSchemeName(), + beamInfo.GetStableBeamsStart(), beamInfo.GetStableBeamsEnd(), beamInfo.GetBeamType()) + // update plugin state + p.mu.Lock() + p.currentState = beamInfo + p.mu.Unlock() + + // convert to internal LHC event and notify environment manager + go func(beamInfo *pb.BeamInfo) { + envMan := environment.ManagerInstance() + + ev := &lhcevent.LhcStateChangeEvent{ + IntegratedServiceEventBase: cmnevent.IntegratedServiceEventBase{ServiceName: "LHC"}, + BeamInfo: lhcevent.BeamInfo{ + BeamMode: beamInfo.GetBeamMode(), + StableBeamsStart: beamInfo.GetStableBeamsStart(), + StableBeamsEnd: beamInfo.GetStableBeamsEnd(), + FillNumber: beamInfo.GetFillNumber(), + FillingSchemeName: beamInfo.GetFillingSchemeName(), + BeamType: beamInfo.GetBeamType(), + }, + } + envMan.NotifyIntegratedServiceEvent(ev) + }(beamInfo) + } + } +} From 8e67e58a6283ea5383a77c71138e1b8eec21a2b7 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 10 Oct 2025 15:22:14 +0200 Subject: [PATCH 3/3] Stop ongoing runs upon beam dump if requested A new "stop_at_beam_dump" flag allows to request an automatic STOP_ACTIVITY when we recognize that LHC dumped beams. Closes OCTRL-1049 --- core/environment/manager.go | 61 +++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/core/environment/manager.go b/core/environment/manager.go index d6a81b01..789677bb 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -42,6 +42,7 @@ import ( "github.com/AliceO2Group/Control/common/system" "github.com/AliceO2Group/Control/common/utils" "github.com/AliceO2Group/Control/common/utils/uid" + lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event" event2 "github.com/AliceO2Group/Control/core/integration/odc/event" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -1065,6 +1066,66 @@ func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEve } } } + } else if evt.GetServiceName() == "LHC" { + envs.handleLhcEvents(evt) + } +} + +func (envs *Manager) handleLhcEvents(evt event.IntegratedServiceEvent) { + + lhcEvent, ok := evt.(*lhcevent.LhcStateChangeEvent) + if !ok { + return + } + + // stop all relevant environments when beams are dumped + beamMode := lhcEvent.GetBeamInfo().BeamMode + beamsDumped := beamMode == evpb.BeamMode_BEAM_DUMP || beamMode == evpb.BeamMode_LOST_BEAMS || beamMode == evpb.BeamMode_NO_BEAM + if !beamsDumped { + return + } + + for envId, env := range envs.m { + shouldStopAtBeamDump, _ := strconv.ParseBool(env.GetKV("", "stop_at_beam_dump")) + if shouldStopAtBeamDump && env.CurrentState() == "RUNNING" { + if currentTransition := env.CurrentTransition(); currentTransition != "" { + log.WithPrefix("scheduler"). + WithField(infologger.Level, infologger.IL_Support). + WithField("partition", envId.String()). + WithField("run", env.currentRunNumber). + Infof("run was supposed to be stopped at beam dump, but transition '%s' is already in progress, skipping (probably the operator was faster)", currentTransition) + continue + } + + go func(env *Environment) { + log.WithPrefix("scheduler"). + WithField(infologger.Level, infologger.IL_Ops). + WithField("partition", envId.String()). + WithField("run", env.currentRunNumber). + Info("stopping the run due to beam dump") + + err := env.TryTransition(NewStopActivityTransition(envs.taskman)) + if err != nil { + log.WithPrefix("scheduler"). + WithField("partition", envId.String()). + WithField("run", env.currentRunNumber). + WithError(err). + Error("could not stop the run upon beam dump") + + if env.CurrentState() != "ERROR" { + err = env.TryTransition(NewGoErrorTransition(envs.taskman)) + if err != nil { + log.WithPrefix("scheduler"). + WithField("partition", envId.String()). + WithField("run", env.currentRunNumber). + WithError(err). + Error("environment GO_ERROR transition failed after a beam dump event, forcing") + env.setState("ERROR") + } + } + } + }(env) + } } }