Skip to content

Commit 7e0c501

Browse files
committed
Fixing Serialization issues
1 parent 34fac5d commit 7e0c501

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package dev.luismachadoreis.flighttracker.server.ping.infrastructure.config;
2+
3+
import org.apache.kafka.common.serialization.StringDeserializer;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.beans.factory.annotation.Qualifier;
7+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
8+
import org.springframework.kafka.core.ConsumerFactory;
9+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
10+
11+
import java.util.Map;
12+
13+
@Configuration
14+
public class PingEventConsumerConfig {
15+
16+
private final Map<String, Object> properties;
17+
18+
public PingEventConsumerConfig(@Qualifier("kafkaListenerContainerFactoryProperties") Map<String, Object> properties) {
19+
this.properties = properties;
20+
}
21+
22+
@Bean(name = "pingEventConsumerFactory")
23+
public ConsumerFactory<String, String> consumerFactory() {
24+
return new DefaultKafkaConsumerFactory<>(
25+
properties,
26+
new StringDeserializer(),
27+
new StringDeserializer()
28+
);
29+
}
30+
31+
@Bean(name = "pingEventKafkaListenerContainerFactory")
32+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
33+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
34+
new ConcurrentKafkaListenerContainerFactory<>();
35+
factory.setConsumerFactory(consumerFactory());
36+
return factory;
37+
}
38+
}

src/main/java/dev/luismachadoreis/flighttracker/server/ping/infrastructure/pubsub/PingEventConsumer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ public class PingEventConsumer {
2323
/**
2424
* Consumes ping created events from Kafka and forwards them to WebSocket clients.
2525
*
26-
* @param message the ping created event
26+
* @param message the ping created event as JSON string
2727
*/
28-
@KafkaListener(topics = "${spring.kafka.topic.ping-created}", groupId = "${spring.kafka.consumer.group-id}")
28+
@KafkaListener(
29+
topics = "${spring.kafka.topic.ping-created}",
30+
groupId = "${spring.kafka.consumer.group-id}",
31+
containerFactory = "pingEventKafkaListenerContainerFactory"
32+
)
2933
public void consumePingCreated(String message) {
3034
log.debug("Received ping created event from Kafka: {}", message);
3135
if (StringUtils.hasText(message)) {
@@ -34,5 +38,4 @@ public void consumePingCreated(String message) {
3438
log.debug("Skipping empty or null message");
3539
}
3640
}
37-
3841
}

0 commit comments

Comments
 (0)