-
Notifications
You must be signed in to change notification settings - Fork 0
[O2B-1474] Add kafka producer for beam mode change on stable beams #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
graduta
wants to merge
17
commits into
main
from
feature/O2B-1474/add-kafka-producer-for-beam-mode-change-on-SB
Closed
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
04d4db5
Add Kafka producer interface to be used
graduta 9db930c
Add producer dedicated to beam mode change tracks
graduta 8242631
Use the newly added beam mode event producer
graduta 3d94def
Add proto files used by FLP suite
graduta 6623a2c
Update readme
graduta fe23d9e
Add protoc generated classes
graduta b377334
Mark beam producer as volatile as it is updated for a different thread
graduta 5d27220
Map Beam Mode from LHC to enum of protobuf
graduta 1b48e68
Use future for kafka return
graduta 99c461c
Update project structure namings
graduta 78ca1e0
Use final for kafka topic
graduta 4f95473
Fix spelling mistakes
graduta a96b486
Merge branch 'main' of github.com:AliceO2Group/BKP-LHC-Client into fe…
graduta d3827a8
Improve logging for safe beam messages
graduta 92bb339
Add event for each new beam mode received
graduta d9fcf08
Remove dead code
graduta a8fa782
Use adapter for beam mode
graduta File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,26 +1,26 @@ | ||
| # AliDip2BK | ||
| # BKP-LHC Client | ||
|
|
||
| Initial Repository based on work from @iclegrand in repository: https://github.com/iclegrand/AliDip2BK | ||
|
|
||
| Collect selected Info from the CERN DIP system (LHC & ALICE -DCS) and publish them into the Bookkeeping/InfoLogger systems | ||
| The BKP-LHC Client is a java based application which uses the CERN DIP `jar` dependency to consume events from desired tracks. These events are then either: | ||
| - published on O2 Kafka Topics to be consumed further by O2 applications (e.g. ECS) | ||
| - updates the O2 Bookkeeping application via their HTTP endpoints. | ||
|
|
||
| A detailed description for this project is provided by Roberto in this document: | ||
| https://codimd.web.cern.ch/G0TSXqA1R8iPqWw2w2wuew | ||
|
|
||
|
|
||
| This program requires java 11 on a 64 bit system | ||
| (this is a constrain from the DIP library) | ||
|
|
||
| To test the java version run | ||
| ### Requirements | ||
| - java 11 on a 64 bit system (this is a constrain from the DIP library) | ||
graduta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| - to test the java version run | ||
| ``` | ||
| java -version | ||
| ``` | ||
|
|
||
| The run configuration is defined in the AliDip2BK.properties file. | ||
|
|
||
| To run the program : | ||
|
|
||
| sh runAliDip2BK.sh | ||
| ### Configuration | ||
| The run configuration is defined in the `AliDip2BK.properties` file. | ||
|
|
||
| When the the program is stopped, it enters into the shutdown mode and it will | ||
| unsubscribe to the DIP data providers will wait to process the DipData queue | ||
| and saves the state of the fills and runs. | ||
| ### Published Events | ||
| Currently the BKP-LHC-Client publishes on Kafka (topic: "dip.lhc.beam_mode") events for the start and end of stable beams in the format of `Ev_BeamModeEvent`. The proto file's source of truth is within the [Control Repository](https://github.com/AliceO2Group/Control/blob/master/common/protos/events.proto) | ||
graduta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ### Bookkeeping Updates | ||
| - TBC | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /** | ||
| * @license | ||
| * Copyright CERN and copyright holders of ALICE O2. This software is | ||
| * distributed under the terms of the GNU General Public License v3 (GPL | ||
| * Version 3), copied verbatim in the file "COPYING". | ||
| * | ||
| * See http://alice-o2.web.cern.ch/license for full licensing information. | ||
| * | ||
| * In applying this license CERN does not waive the privileges and immunities | ||
| * granted to it by virtue of its status as an Intergovernmental Organization | ||
| * or submit itself to any jurisdiction. | ||
| */ | ||
|
|
||
| package alice.dip.adapters; | ||
|
|
||
| import alice.dip.enums.BeamModeEnum; | ||
|
|
||
| /** | ||
| * Adapter class to convert between string representations of beam modes and the BeamModeEnum. | ||
| */ | ||
| public class BeamModeProtoAdapter { | ||
|
|
||
| /** | ||
| * Returns the enum constant matching the given string, or UNKNOWN if not found. | ||
| * Accepts both space and underscore separated names, case-insensitive. | ||
| * @param beamMode The beam mode string to convert. | ||
| * @return The corresponding BeamModeEnum constant, or UNKNOWN if not recognized. | ||
| */ | ||
| public static BeamModeEnum fromStringToEnum(String beamMode) { | ||
| if (beamMode == null || beamMode.trim().isEmpty()) { | ||
| return BeamModeEnum.UNKNOWN; | ||
| } | ||
| for (BeamModeEnum value : BeamModeEnum.values()) { | ||
| if (value.label.equalsIgnoreCase(beamMode)) { | ||
| return value; | ||
| } | ||
| } | ||
| return BeamModeEnum.UNKNOWN; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /** | ||
| * @license | ||
| * Copyright CERN and copyright holders of ALICE O2. This software is | ||
| * distributed under the terms of the GNU General Public License v3 (GPL | ||
| * Version 3), copied verbatim in the file "COPYING". | ||
| * | ||
| * See http://alice-o2.web.cern.ch/license for full licensing information. | ||
| * | ||
| * In applying this license CERN does not waive the privileges and immunities | ||
| * granted to it by virtue of its status as an Intergovernmental Organization | ||
| * or submit itself to any jurisdiction. | ||
| */ | ||
|
|
||
| package alice.dip.enums; | ||
|
|
||
| /** | ||
| * Java enum matching the BeamMode values from DIP service and common.proto | ||
| * @enum BeamModeEnum | ||
| */ | ||
| public enum BeamModeEnum { | ||
| UNKNOWN("UNKNOWN"), | ||
| SETUP("SETUP"), | ||
| ABORT("ABORT"), | ||
| INJECTION_PROBE_BEAM("INJECTION PROBE BEAM"), | ||
| INJECTION_SETUP_BEAM("INJECTION SETUP BEAM"), | ||
| INJECTION_PHYSICS_BEAM("INJECTION PHYSICS BEAM"), | ||
| PREPARE_RAMP("PREPARE RAMP"), | ||
| RAMP("RAMP"), | ||
| FLAT_TOP("FLAT TOP"), | ||
| SQUEEZE("SQUEEZE"), | ||
| ADJUST("ADJUST"), | ||
| STABLE_BEAMS("STABLE BEAMS"), | ||
| LOST_BEAMS("LOST BEAMS"), | ||
| UNSTABLE_BEAMS("UNSTABLE BEAMS"), | ||
| BEAM_DUMP_WARNING("BEAM DUMP WARNING"), | ||
| BEAM_DUMP("BEAM DUMP"), | ||
| RAMP_DOWN("RAMP DOWN"), | ||
| CYCLING("CYCLING"), | ||
| RECOVERY("RECOVERY"), | ||
| INJECT_AND_DUMP("INJECT AND DUMP"), | ||
| CIRCULATE_AND_DUMP("CIRCULATE AND DUMP"), | ||
| NO_BEAM("NO BEAM"); | ||
|
|
||
| public final String label; | ||
|
|
||
| private BeamModeEnum(String label) { | ||
| this.label = label; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /** | ||
| * @license | ||
| * Copyright CERN and copyright holders of ALICE O2. This software is | ||
| * distributed under the terms of the GNU General Public License v3 (GPL | ||
| * Version 3), copied verbatim in the file "COPYING". | ||
| * | ||
| * See http://alice-o2.web.cern.ch/license for full licensing information. | ||
| * | ||
| * In applying this license CERN does not waive the privileges and immunities | ||
| * granted to it by virtue of its status as an Intergovernmental Organization | ||
| * or submit itself to any jurisdiction. | ||
| */ | ||
|
|
||
| package alice.dip.kafka; | ||
|
|
||
| import org.apache.kafka.common.serialization.ByteArraySerializer; | ||
| import org.apache.kafka.common.serialization.IntegerSerializer; | ||
|
|
||
| import alice.dip.adapters.BeamModeProtoAdapter; | ||
| import alice.dip.AliDip2BK; | ||
| import alice.dip.enums.BeamModeEnum; | ||
| import alice.dip.kafka.dto.Common; | ||
| import alice.dip.kafka.dto.Events; | ||
| import alice.dip.LhcInfoObj; | ||
|
|
||
| /** | ||
| * Kafka producer for LHC Beam Mode events, serialized using Protocol Buffers. | ||
| */ | ||
| public class BeamModeEventsKafkaProducer extends KafkaProducerInterface<Integer, byte[]> { | ||
| public static final String KAFKA_PRODUCER_TOPIC_DIP = "dip.lhc.beam_mode"; | ||
|
|
||
| /** | ||
| * Constructor to create a BeamModeEventsKafkaProducer | ||
| * @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port | ||
| */ | ||
| public BeamModeEventsKafkaProducer(String bootstrapServers) { | ||
| super(bootstrapServers, KAFKA_PRODUCER_TOPIC_DIP, new IntegerSerializer(), new ByteArraySerializer()); | ||
| AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Initialized producer for topic: " + KAFKA_PRODUCER_TOPIC_DIP); | ||
| } | ||
|
|
||
| /** | ||
| * Given a fill number for partitioning, a LhcInfoObj containing fill information, | ||
| * and a timestamp, creates and sends a proto serialized Beam Mode Event to the Kafka topic. | ||
| * @param fillNumber - fill number to be used for partition to ensure ordering | ||
| * @param fill - LhcInfoObj containing fill information | ||
| * @param timestamp - event timestamp at which the beam mode change event was received from DIP | ||
| */ | ||
| public void sendEvent(Integer fillNumber, LhcInfoObj fill, long timestamp) { | ||
| String beamModeStr = fill.getBeamMode(); | ||
| BeamModeEnum beamMode = BeamModeProtoAdapter.fromStringToEnum(beamModeStr); | ||
|
|
||
| Common.BeamInfo beamInfo = Common.BeamInfo.newBuilder() | ||
| .setStableBeamsStart(fill.getStableBeamStart()) | ||
| .setStableBeamsEnd(fill.getStableBeamStop()) | ||
| .setFillNumber(fill.fillNo) | ||
| .setFillingSchemeName(fill.LHCFillingSchemeName) | ||
| .setBeamMode(Common.BeamMode.valueOf(beamMode.label)) | ||
| .setBeamType(fill.beamType) | ||
graduta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .build(); | ||
|
|
||
| Events.Ev_BeamModeEvent event = Events.Ev_BeamModeEvent.newBuilder() | ||
| .setTimestamp(timestamp) | ||
| .setBeamInfo(beamInfo) | ||
| .build(); | ||
| byte[] value = event.toByteArray(); | ||
|
|
||
| send(fillNumber, value); | ||
| AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Sent Beam Mode event for fill " + fill.fillNo + " with mode " + fill.getBeamMode() + " at timestamp " + timestamp); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.