Skip to content

Commit 5d2b0a5

Browse files
committed
One mqtt topic can be mapped to multiple kafka topics now
1 parent 847181f commit 5d2b0a5

File tree

2 files changed

+21
-19
lines changed

2 files changed

+21
-19
lines changed

src/main/java/de/fhg/ipa/null70/simple_kafka_mqtt_connector/SimpleKafkaMQTTConnector.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,13 @@
1414
import org.apache.logging.log4j.Logger;
1515
import org.eclipse.paho.client.mqttv3.*;
1616

17-
import java.util.Arrays;
18-
import java.util.HashMap;
19-
import java.util.Map;
20-
import java.util.Properties;
17+
import java.util.*;
2118

2219
public class SimpleKafkaMQTTConnector {
2320
private static final Logger logger = LogManager.getLogger(SimpleKafkaMQTTConnector.class);
2421

25-
// Key = mqtt-topic input , Value = kafka-topic for output
26-
private static HashMap<String, String> mqttKafkaTopicMap = new HashMap<String, String>();
22+
// Key = mqtt-topic input , Value = kafka-topics for output
23+
private static HashMap<String, ArrayList<String>> mqttKafkaTopicMap = new HashMap();
2724

2825

2926
public void run() {
@@ -58,7 +55,7 @@ public void run() {
5855
logger.info("----------------------------------------");
5956
logger.info("");
6057

61-
// read config file into the system
58+
// Initialize topic routing map
6259
initTopicsRoutingMap(topicMapping);
6360

6461
logger.trace("Creating Kafka Producer...");
@@ -70,7 +67,7 @@ public void run() {
7067
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);
7168
logger.trace("Start sending messages...");
7269

73-
// Setup and start Mqtt Client
70+
// Setup and start the mqtt client
7471
initMqttClient(mqttHost, mqttPort, mqttClientId, mqttQos, kafkaProducer);
7572
}
7673

@@ -88,7 +85,7 @@ private void initMqttClient(String mqttHost, String mqttPort, String mqttClientI
8885
}
8986

9087
MqttConnectOptions options = new MqttConnectOptions();
91-
//Setzen einer Persistent Session
88+
// use a persistent session..
9289
options.setCleanSession(false);
9390

9491
options.setWill(
@@ -116,27 +113,27 @@ private void initMqttClient(String mqttHost, String mqttPort, String mqttClientI
116113

117114

118115
client.setCallback(new MqttCallback() {
119-
int i = 0;
120116

121117
@Override
122118
public void connectionLost(Throwable throwable) { }
123119

124120
@Override
125-
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
126-
// Checks through which mqtt-topic this message was sent and sends it to the pre-configured corresponding kafka topic..
121+
public void messageArrived(String mqttTopic, MqttMessage mqttMessage) throws Exception {
122+
// Checks through which mqtt-topic this message was sent and sends it to the pre-configured corresponding kafka topics..
127123

128124
String message = new String(mqttMessage.getPayload());
129-
logger.info(topic + " - " + message);
125+
logger.info(mqttTopic + " - " + message);
130126

131127
try {
132-
if(!mqttKafkaTopicMap.get(topic).isEmpty()) {
133-
134-
kafkaProducer.send(new ProducerRecord<>(mqttKafkaTopicMap.get(topic), i++, message));
128+
if(mqttKafkaTopicMap.containsKey(mqttTopic)) {
129+
mqttKafkaTopicMap.get(mqttTopic).forEach(kafkaTopic -> {
130+
kafkaProducer.send(new ProducerRecord<>(kafkaTopic, message));
131+
});
135132
logger.trace("send Message to kafka - " + message);
136133
}
137134
} catch (KafkaException e) {
138135
logger.error("Exception occurred – Check log for more details.\n" + e.getMessage());
139-
logger.warn("There seems to be issue with the kafka connection. Currently no messages are forwarded to the kafka cluster!!!!");
136+
logger.warn("There seems to be an issue with the kafka connection. Currently no messages are forwarded to the kafka cluster!!!!");
140137
// System.exit(-1);
141138
}
142139

@@ -151,8 +148,13 @@ public static void initTopicsRoutingMap(String topicMappingString){
151148
logger.info("Setting up topic mapping (MQTT >>> Kafka) ...");
152149
Arrays.asList(topicMappingString.split(";")).forEach(s -> {
153150
String[] pair = s.split(">>>");
154-
mqttKafkaTopicMap.put( pair[0], pair[1]);
155-
logger.info(pair[0] + " >>> " + pair[1]);
151+
String mqttTopic = pair[0];
152+
String kafkaTopic = pair[1];
153+
if( !mqttKafkaTopicMap.containsKey(mqttTopic) ){
154+
mqttKafkaTopicMap.put(mqttTopic, new ArrayList<String>());
155+
}
156+
mqttKafkaTopicMap.get(mqttTopic).add(kafkaTopic);
157+
logger.info(mqttTopic + " >>> " + kafkaTopic);
156158
}
157159
);
158160
}
Binary file not shown.

0 commit comments

Comments
 (0)