Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
# AliDip2BK
# BKP-LHC Client

Initial Repository based on work from @iclegrand in repository: https://github.com/iclegrand/AliDip2BK

Collect selected Info from the CERN DIP system (LHC & ALICE -DCS) and publish them into the Bookkeeping/InfoLogger systems
The BKP-LHC Client is a java based application which uses the CERN DIP `jar` dependency to consume events from desired tracks. These events are then either:
- published on O2 Kafka Topics to be consumed further by O2 applications (e.g. ECS)
- updates the O2 Bookkeeping application via their HTTP endpoints.

A detailed description for this project is provided by Roberto in this document:
https://codimd.web.cern.ch/G0TSXqA1R8iPqWw2w2wuew


This program requires java 11 on a 64 bit system
(this is a constrain from the DIP library)

To test the java version run
### Requirements
- java 11 on a 64 bit system (this is a constrain from the DIP library)
- to test the java version run
```
java -version
```

The run configuration is defined in the AliDip2BK.properties file.

To run the program :

sh runAliDip2BK.sh
### Configuration
The run configuration is defined in the `AliDip2BK.properties` file.

When the the program is stopped, it enters into the shutdown mode and it will
unsubscribe to the DIP data providers will wait to process the DipData queue
and saves the state of the fills and runs.
### Published Events
Currently the BKP-LHC-Client publishes on Kafka (topic: "dip.lhc.beam_mode") events for the start and end of stable beams in the format of `Ev_BeamModeEvent`. The proto file's source of truth is within the [Control Repository](https://github.com/AliceO2Group/Control/blob/master/common/protos/events.proto)

### Bookkeeping Updates
- TBC
24 changes: 18 additions & 6 deletions src/alice/dip/AliDip2BK.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/*************
* cil
**************/

/*
* Main Class
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip;
Expand All @@ -18,6 +22,8 @@
import java.util.Date;
import java.util.Properties;

import alice.dip.kafka.BeamModeEventsKafkaProducer;

public class AliDip2BK implements Runnable {
public static String Version = "2.1.2 22-Jul-2025";
public static String DNSnode = "dipnsdev.cern.ch";
Expand Down Expand Up @@ -52,6 +58,7 @@ public class AliDip2BK implements Runnable {
BookkeepingClient bookkeepingClient;
StartOfRunKafkaConsumer kcs;
EndOfRunKafkaConsumer kce;
BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;

public AliDip2BK() {
startDate = (new Date()).getTime();
Expand Down Expand Up @@ -83,6 +90,9 @@ public AliDip2BK() {

kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);

beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers);
dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer);

shutdownProc();

Thread t = new Thread(this);
Expand Down Expand Up @@ -145,6 +155,8 @@ public void run() {
}
dipMessagesProcessor.saveState();
writeStat("AliDip2BK.stat", true);
beamModeEventsKafkaProducer.close();
log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed");
}
});
}
Expand Down
89 changes: 45 additions & 44 deletions src/alice/dip/DipMessagesProcessor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
/*************
* cil
**************/
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip;

Expand All @@ -19,6 +28,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import alice.dip.kafka.BeamModeEventsKafkaProducer;
import cern.dip.BadParameter;
import cern.dip.DipData;
import cern.dip.TypeMismatch;
Expand All @@ -45,19 +55,28 @@ public class DipMessagesProcessor implements Runnable {
private BlockingQueue<MessageItem> outputQueue = new ArrayBlockingQueue<>(100);

private final LuminosityManager luminosityManager;
private volatile BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;

public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) {

this.bookkeepingClient = bookkeepingClient;
this.luminosityManager = luminosityManager;

this.beamModeEventsKafkaProducer = null;
Thread t = new Thread(this);
t.start();

currentAlice = new AliceInfoObj();
loadState();
}

/**
* Setter of events producer
* @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events
*/
public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) {
this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer;
}

/*
* This method is used for receiving DipData messages from the Dip Client
*/
Expand Down Expand Up @@ -299,24 +318,22 @@ private void handleSafeBeamMessage(DipData dipData) throws BadParameter, TypeMis
if (currentFill == null) return;

String bm = currentFill.getBeamMode();

if (bm.contentEquals("STABLE BEAMS")) {
AliDip2BK.log(
0,
"ProcData.newSafeBeams",
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
);

if (!isBeam1 || !isBeam2) {
AliDip2BK.log(
1,
"ProcData.newSafeBeams",
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
);
if ((bm.contentEquals("STABLE BEAMS") && (!isBeam1 || !isBeam2))) {
currentFill.setBeamMode(time, "LOST BEAMS");
if (this.beamModeEventsKafkaProducer != null) {
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
}
AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! ");
}

return;
}

if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
} else if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
currentFill.setBeamMode(time, "STABLE BEAMS");
if (this.beamModeEventsKafkaProducer != null) {
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
}
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
}
}
Expand Down Expand Up @@ -571,33 +588,17 @@ public void newFillNo(long date, String strFno, String par1, String par2, String
public void newBeamMode(long date, String BeamMode) {

if (currentFill != null) {
AliDip2BK.log(
2,
"ProcData.newBeamMode",
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
);
currentFill.setBeamMode(date, BeamMode);
bookkeepingClient.updateLhcFill(currentFill);
saveState();

int mc = -1;
for (int i = 0; i < AliDip2BK.endFillCases.length; i++) {
if (AliDip2BK.endFillCases[i].equalsIgnoreCase(BeamMode)) mc = i;
}
if (mc < 0) {

AliDip2BK.log(
2,
"ProcData.newBeamMode",
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
);
bookkeepingClient.updateLhcFill(currentFill);
saveState();
} else {
currentFill.endedTime = date;
bookkeepingClient.updateLhcFill(currentFill);
if (AliDip2BK.KEEP_FILLS_HISTORY_DIRECTORY != null) {
writeFillHistFile(currentFill);
}
AliDip2BK.log(
3,
"ProcData.newBeamMode",
"CLOSE Fill_NO=" + currentFill.fillNo + " Based on new beam mode=" + BeamMode
);
currentFill = null;
if (this.beamModeEventsKafkaProducer != null) {
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date);
}
} else {
AliDip2BK.log(4, "ProcData.newBeamMode", " ERROR new beam mode=" + BeamMode + " NO FILL NO for it");
Expand Down
40 changes: 40 additions & 0 deletions src/alice/dip/adapters/BeamModeProtoAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip.adapters;

import alice.dip.enums.BeamModeEnum;

/**
* Adapter class to convert between string representations of beam modes and the BeamModeEnum.
*/
public class BeamModeProtoAdapter {

/**
* Returns the enum constant matching the given string, or UNKNOWN if not found.
* Accepts both space and underscore separated names, case-insensitive.
* @param beamMode The beam mode string to convert.
* @return The corresponding BeamModeEnum constant, or UNKNOWN if not recognized.
*/
public static BeamModeEnum fromStringToEnum(String beamMode) {
if (beamMode == null || beamMode.trim().isEmpty()) {
return BeamModeEnum.UNKNOWN;
}
for (BeamModeEnum value : BeamModeEnum.values()) {
if (value.label.equalsIgnoreCase(beamMode)) {
return value;
}
}
return BeamModeEnum.UNKNOWN;
}
}
49 changes: 49 additions & 0 deletions src/alice/dip/enums/BeamModeEnum.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip.enums;

/**
* Java enum matching the BeamMode values from DIP service and common.proto
* @enum BeamModeEnum
*/
public enum BeamModeEnum {
UNKNOWN("UNKNOWN"),
SETUP("SETUP"),
ABORT("ABORT"),
INJECTION_PROBE_BEAM("INJECTION PROBE BEAM"),
INJECTION_SETUP_BEAM("INJECTION SETUP BEAM"),
INJECTION_PHYSICS_BEAM("INJECTION PHYSICS BEAM"),
PREPARE_RAMP("PREPARE RAMP"),
RAMP("RAMP"),
FLAT_TOP("FLAT TOP"),
SQUEEZE("SQUEEZE"),
ADJUST("ADJUST"),
STABLE_BEAMS("STABLE BEAMS"),
LOST_BEAMS("LOST BEAMS"),
UNSTABLE_BEAMS("UNSTABLE BEAMS"),
BEAM_DUMP_WARNING("BEAM DUMP WARNING"),
BEAM_DUMP("BEAM DUMP"),
RAMP_DOWN("RAMP DOWN"),
CYCLING("CYCLING"),
RECOVERY("RECOVERY"),
INJECT_AND_DUMP("INJECT AND DUMP"),
CIRCULATE_AND_DUMP("CIRCULATE AND DUMP"),
NO_BEAM("NO BEAM");

public final String label;

private BeamModeEnum(String label) {
this.label = label;
}
}
70 changes: 70 additions & 0 deletions src/alice/dip/kafka/BeamModeEventsKafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip.kafka;

import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;

import alice.dip.adapters.BeamModeProtoAdapter;
import alice.dip.AliDip2BK;
import alice.dip.enums.BeamModeEnum;
import alice.dip.kafka.dto.Common;
import alice.dip.kafka.dto.Events;
import alice.dip.LhcInfoObj;

/**
* Kafka producer for LHC Beam Mode events, serialized using Protocol Buffers.
*/
public class BeamModeEventsKafkaProducer extends KafkaProducerInterface<Integer, byte[]> {
public static final String KAFKA_PRODUCER_TOPIC_DIP = "dip.lhc.beam_mode";

/**
* Constructor to create a BeamModeEventsKafkaProducer
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
*/
public BeamModeEventsKafkaProducer(String bootstrapServers) {
super(bootstrapServers, KAFKA_PRODUCER_TOPIC_DIP, new IntegerSerializer(), new ByteArraySerializer());
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Initialized producer for topic: " + KAFKA_PRODUCER_TOPIC_DIP);
}

/**
* Given a fill number for partitioning, a LhcInfoObj containing fill information,
* and a timestamp, creates and sends a proto serialized Beam Mode Event to the Kafka topic.
* @param fillNumber - fill number to be used for partition to ensure ordering
* @param fill - LhcInfoObj containing fill information
* @param timestamp - event timestamp at which the beam mode change event was received from DIP
*/
public void sendEvent(Integer fillNumber, LhcInfoObj fill, long timestamp) {
String beamModeStr = fill.getBeamMode();
BeamModeEnum beamMode = BeamModeProtoAdapter.fromStringToEnum(beamModeStr);

Common.BeamInfo beamInfo = Common.BeamInfo.newBuilder()
.setStableBeamsStart(fill.getStableBeamStart())
.setStableBeamsEnd(fill.getStableBeamStop())
.setFillNumber(fill.fillNo)
.setFillingSchemeName(fill.LHCFillingSchemeName)
.setBeamMode(Common.BeamMode.valueOf(beamMode.label))
.setBeamType(fill.beamType)
.build();

Events.Ev_BeamModeEvent event = Events.Ev_BeamModeEvent.newBuilder()
.setTimestamp(timestamp)
.setBeamInfo(beamInfo)
.build();
byte[] value = event.toByteArray();

send(fillNumber, value);
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Sent Beam Mode event for fill " + fill.fillNo + " with mode " + fill.getBeamMode() + " at timestamp " + timestamp);
}
}
Loading