1414import org .apache .logging .log4j .Logger ;
1515import org .eclipse .paho .client .mqttv3 .*;
1616
17- import java .util .Arrays ;
18- import java .util .HashMap ;
19- import java .util .Map ;
20- import java .util .Properties ;
17+ import java .util .*;
2118
2219public class SimpleKafkaMQTTConnector {
2320 private static final Logger logger = LogManager .getLogger (SimpleKafkaMQTTConnector .class );
2421
25- // Key = mqtt-topic input , Value = kafka-topic for output
26- private static HashMap <String , String > mqttKafkaTopicMap = new HashMap < String , String > ();
22+ // Key = mqtt-topic input , Value = kafka-topics for output
23+ private static HashMap <String , ArrayList < String >> mqttKafkaTopicMap = new HashMap ();
2724
2825
2926 public void run () {
@@ -58,7 +55,7 @@ public void run() {
5855 logger .info ("----------------------------------------" );
5956 logger .info ("" );
6057
61- // read config file into the system
58+ // Initialize topic routing map
6259 initTopicsRoutingMap (topicMapping );
6360
6461 logger .trace ("Creating Kafka Producer..." );
@@ -70,7 +67,7 @@ public void run() {
7067 KafkaProducer <Integer , String > kafkaProducer = new KafkaProducer <>(props );
7168 logger .trace ("Start sending messages..." );
7269
73- // Setup and start Mqtt Client
70+ // Setup and start the mqtt client
7471 initMqttClient (mqttHost , mqttPort , mqttClientId , mqttQos , kafkaProducer );
7572 }
7673
@@ -88,7 +85,7 @@ private void initMqttClient(String mqttHost, String mqttPort, String mqttClientI
8885 }
8986
9087 MqttConnectOptions options = new MqttConnectOptions ();
91- //Setzen einer Persistent Session
88+ // use a persistent session..
9289 options .setCleanSession (false );
9390
9491 options .setWill (
@@ -116,27 +113,27 @@ private void initMqttClient(String mqttHost, String mqttPort, String mqttClientI
116113
117114
118115 client .setCallback (new MqttCallback () {
119- int i = 0 ;
120116
121117 @ Override
122118 public void connectionLost (Throwable throwable ) { }
123119
124120 @ Override
125- public void messageArrived (String topic , MqttMessage mqttMessage ) throws Exception {
126- // Checks through which mqtt-topic this message was sent and sends it to the pre-configured corresponding kafka topic ..
121+ public void messageArrived (String mqttTopic , MqttMessage mqttMessage ) throws Exception {
122+ // Checks through which mqtt-topic this message was sent and sends it to the pre-configured corresponding kafka topics ..
127123
128124 String message = new String (mqttMessage .getPayload ());
129- logger .info (topic + " - " + message );
125+ logger .info (mqttTopic + " - " + message );
130126
131127 try {
132- if (!mqttKafkaTopicMap .get (topic ).isEmpty ()) {
133-
134- kafkaProducer .send (new ProducerRecord <>(mqttKafkaTopicMap .get (topic ), i ++, message ));
128+ if (mqttKafkaTopicMap .containsKey (mqttTopic )) {
129+ mqttKafkaTopicMap .get (mqttTopic ).forEach (kafkaTopic -> {
130+ kafkaProducer .send (new ProducerRecord <>(kafkaTopic , message ));
131+ });
135132 logger .trace ("send Message to kafka - " + message );
136133 }
137134 } catch (KafkaException e ) {
138135 logger .error ("Exception occurred – Check log for more details.\n " + e .getMessage ());
139- logger .warn ("There seems to be issue with the kafka connection. Currently no messages are forwarded to the kafka cluster!!!!" );
136+ logger .warn ("There seems to be an issue with the kafka connection. Currently no messages are forwarded to the kafka cluster!!!!" );
140137// System.exit(-1);
141138 }
142139
@@ -151,8 +148,13 @@ public static void initTopicsRoutingMap(String topicMappingString){
151148 logger .info ("Setting up topic mapping (MQTT >>> Kafka) ..." );
152149 Arrays .asList (topicMappingString .split (";" )).forEach (s -> {
153150 String [] pair = s .split (">>>" );
154- mqttKafkaTopicMap .put ( pair [0 ], pair [1 ]);
155- logger .info (pair [0 ] + " >>> " + pair [1 ]);
151+ String mqttTopic = pair [0 ];
152+ String kafkaTopic = pair [1 ];
153+ if ( !mqttKafkaTopicMap .containsKey (mqttTopic ) ){
154+ mqttKafkaTopicMap .put (mqttTopic , new ArrayList <String >());
155+ }
156+ mqttKafkaTopicMap .get (mqttTopic ).add (kafkaTopic );
157+ logger .info (mqttTopic + " >>> " + kafkaTopic );
156158 }
157159 );
158160 }
0 commit comments