11package de .fhg .ipa .null70 .simple_kafka_mqtt_connector ;
22
3- import org .apache .commons .configuration .CompositeConfiguration ;
4- import org .apache .commons .configuration .ConfigurationException ;
5- import org .apache .commons .configuration .PropertiesConfiguration ;
6- import org .apache .commons .configuration .SystemConfiguration ;
73import org .apache .kafka .clients .producer .KafkaProducer ;
84import org .apache .kafka .clients .producer .ProducerConfig ;
95import org .apache .kafka .clients .producer .ProducerRecord ;
@@ -23,52 +19,27 @@ public class SimpleKafkaMQTTConnector {
2319 private static HashMap <String , ArrayList <String >> mqttKafkaTopicMap = new HashMap ();
2420
2521
26- public void run () {
27- CompositeConfiguration config = new CompositeConfiguration ();
28- config .addConfiguration (new SystemConfiguration ());
29-
30- config .addConfiguration (new PropertiesConfiguration ());
31- try {
32- config .addConfiguration (new PropertiesConfiguration ("application.properties" ));
33- } catch (ConfigurationException e ) {
34- e .printStackTrace ();
35- }
36-
37- // Properties configuration
38- String kafkaHost = config .getString ("kafka.host" );
39- String kafkaPort = config .getString ("kafka.port" );
40- String kafkaClientId = config .getString ("kafka.client.id" );
41-
42- String mqttHost = config .getString ("mqtt.host" );
43- String mqttPort = config .getString ("mqtt.port" );
44- String mqttClientId = config .getString ("mqtt.client.id" );
45- Integer mqttQos = Integer .parseInt (config .getString ("mqtt.qos" ).trim ());
46-
47- String topicMapping = config .getString ("topic.mapping" );
48-
49- logger .info ("-------- APPLICATION PROPERTIES --------" );
50- logger .info ("kafkaHost = " + kafkaHost );
51- logger .info ("kafkaPort = " + kafkaPort );
52- logger .info ("mqttHost = " + mqttHost );
53- logger .info ("mqttPort = " + mqttPort );
54- logger .info ("topicMapping = " + topicMapping );
55- logger .info ("----------------------------------------" );
56- logger .info ("" );
57-
22+ public void run (String kafkaHost , String kafkaPort , String kafkaClientId , String mqttHost , String mqttPort , String mqttClientId , Integer mqttQos , String topicMapping ) {
5823 // Initialize topic routing map
5924 initTopicsRoutingMap (topicMapping );
6025
26+ // Init and start kafka producer
27+ KafkaProducer <Integer , String > kafkaProducer = initKafkaProducer (kafkaHost , kafkaPort , kafkaClientId );
28+
29+ // Setup and start the mqtt client
30+ initMqttClient (mqttHost , mqttPort , mqttClientId , mqttQos , kafkaProducer );
31+ }
32+
33+ private KafkaProducer <Integer , String > initKafkaProducer (String kafkaHost , String kafkaPort , String kafkaClientId ) {
6134 logger .trace ("Creating Kafka Producer..." );
6235 Properties props = new Properties ();
6336 props .put (ProducerConfig .CLIENT_ID_CONFIG , kafkaClientId );
6437 props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , kafkaHost + ":" + kafkaPort );
6538 props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , IntegerSerializer .class .getName ());
6639 props .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
6740 KafkaProducer <Integer , String > kafkaProducer = new KafkaProducer <>(props );
68- logger .trace ("Start sending messages..." );
69-
70- // Setup and start the mqtt client
71- initMqttClient (mqttHost , mqttPort , mqttClientId , mqttQos , kafkaProducer );
41+ logger .trace ("Kafka producer ready to produce..." );
42+ return kafkaProducer ;
7243 }
7344
7445 private void initMqttClient (String mqttHost , String mqttPort , String mqttClientId , Integer mqttQos , KafkaProducer <Integer , String > kafkaProducer ) {
0 commit comments