Skip to content

Commit 814900a

Browse files
committed
ByPassingKafka to Update Pings when deployed into a single node
1 parent 7e0c501 commit 814900a

File tree

4 files changed

+142
-2
lines changed

4 files changed

+142
-2
lines changed

docs/adrs/adr-001-websocket-scalability.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,33 @@ The `flight-tracker-event-server` currently sends flight updates directly to cli
66
## Decision
77
Refactor the application so that the `PingEventPublisher` publishes flight events to a Kafka topic. A new dedicated component will consume these events and manage active WebSocket sessions for message delivery.
88

9+
Additionally, we will implement feature flags to control the WebSocket delivery mechanism, allowing the system to run in different deployment configurations:
10+
- **Monolithic Mode**: All components run in the same service with in-memory WebSocket sessions
11+
- **Decoupled Mode**: WebSocket delivery runs as a separate component consuming from Kafka
12+
913
This decision enables separation of responsibilities and prepares the system for a future migration to a STOMP-based architecture (e.g., RabbitMQ) if scale demands increase.
1014

1115
## Justification
1216
- **Time-to-market**: quick delivery without frontend changes
1317
- **Decoupling**: separates event generation from delivery logic
1418
- **Reuses existing infrastructure**: Kafka is already in place
1519
- **Low impact**: avoids protocol changes or client modifications for now
20+
- **Deployment Flexibility**: allows gradual transition between deployment models
21+
- **Reduced Complexity**: initial implementation can stay within the same service
1622

1723
## Alternatives Considered
1824
- **STOMP Broker with RabbitMQ**: powerful but requires client refactor and more setup
1925
- **Redis Streams/PubSub**: simple, fast, but with delivery and clustering limitations
2026
- **Optimizing the current implementation**: fast but not future-proof
27+
- **Immediate Service Split**: would require more upfront work and coordination
2128

2229
## Consequences
2330
- The system becomes modular and more scalable
2431
- Kafka enables better backpressure and failover handling
2532
- Prepares for a transition to STOMP when scale justifies it
2633
- A new WebSocket delivery component must be monitored closely
34+
- Feature flags add complexity but provide deployment flexibility
35+
- Allows for gradual migration of WebSocket handling to a separate service
2736

2837
## Links
2938
- [Technical Analysis – WebSocket Scalability](../analysis/technical-analysis-websocket-flight-tracker.md)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package dev.luismachadoreis.flighttracker.server.ping.infrastructure.pubsub;
2+
3+
import org.springframework.stereotype.Component;
4+
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
5+
import org.springframework.util.StringUtils;
6+
import org.springframework.transaction.event.TransactionalEventListener;
7+
import org.springframework.transaction.event.TransactionPhase;
8+
9+
import dev.luismachadoreis.flighttracker.server.ping.infrastructure.websocket.MapUpdatesHandler;
10+
import dev.luismachadoreis.flighttracker.server.ping.application.dto.PingMapper;
11+
import dev.luismachadoreis.flighttracker.server.ping.application.dto.PingDTO;
12+
import dev.luismachadoreis.flighttracker.server.common.utils.JsonUtils;
13+
import dev.luismachadoreis.flighttracker.server.ping.domain.PingCreatedEvent;
14+
15+
import lombok.AllArgsConstructor;
16+
import lombok.extern.slf4j.Slf4j;
17+
18+
19+
@Slf4j
20+
@Component
21+
@AllArgsConstructor
22+
@ConditionalOnExpression("${app.ping.publisher.enabled:false} == false and ${app.ping.websocket.enabled:false} == true")
23+
public class PingEventPublisherByPassingKafka {
24+
25+
private final MapUpdatesHandler mapUpdatesHandler;
26+
private final PingMapper pingMapper;
27+
private final JsonUtils jsonUtils;
28+
29+
/**
30+
* Handles PingCreated events.
31+
* @param event the PingCreated event
32+
*/
33+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
34+
public void handlePingCreated(PingCreatedEvent event) {
35+
PingDTO pingDTO = pingMapper.toDTO(event.ping());
36+
String message = jsonUtils.toJson(pingDTO);
37+
mapUpdatesHandler.sendMessage(message);
38+
}
39+
40+
}

src/main/resources/application.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ app:
101101
subscriber:
102102
enabled: true
103103
publisher:
104-
enabled: true
104+
enabled: false
105105
websocket:
106106
enabled: true
107107

108-
# ... rest of the file remains unchanged ...
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package dev.luismachadoreis.flighttracker.server.ping.infrastructure.pubsub;
2+
3+
import static org.mockito.Mockito.verify;
4+
import static org.mockito.Mockito.when;
5+
6+
import java.time.Instant;
7+
import java.util.UUID;
8+
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.junit.jupiter.api.extension.ExtendWith;
12+
import org.mockito.Mock;
13+
import org.mockito.junit.jupiter.MockitoExtension;
14+
15+
import dev.luismachadoreis.flighttracker.server.ping.application.dto.PingDTO;
16+
import dev.luismachadoreis.flighttracker.server.ping.application.dto.PingMapper;
17+
import dev.luismachadoreis.flighttracker.server.ping.domain.Ping;
18+
import dev.luismachadoreis.flighttracker.server.ping.domain.PingCreatedEvent;
19+
import dev.luismachadoreis.flighttracker.server.ping.infrastructure.websocket.MapUpdatesHandler;
20+
import dev.luismachadoreis.flighttracker.server.common.utils.JsonUtils;
21+
22+
@ExtendWith(MockitoExtension.class)
23+
class PingEventPublisherByPassingKafkaTest {
24+
25+
@Mock
26+
private MapUpdatesHandler mapUpdatesHandler;
27+
28+
@Mock
29+
private PingMapper pingMapper;
30+
31+
@Mock
32+
private JsonUtils jsonUtils;
33+
34+
private PingEventPublisherByPassingKafka publisher;
35+
36+
@BeforeEach
37+
void setUp() {
38+
publisher = new PingEventPublisherByPassingKafka(mapUpdatesHandler, pingMapper, jsonUtils);
39+
}
40+
41+
@Test
42+
void handlePingCreated_ShouldSendMessageToWebSocket() {
43+
// Arrange
44+
Instant now = Instant.now();
45+
var aircraft = new Ping.Aircraft("ABC123", "FL123", "US", now, "7700", true, new Integer[]{1, 2});
46+
var vector = new Ping.Vector(500.0, 180.0, 0.0);
47+
var position = new Ping.Position(10.0, 20.0, 30000.0, 29000.0, false, 1, now);
48+
var ping = new Ping(aircraft, vector, position);
49+
var event = new PingCreatedEvent(ping, now);
50+
51+
var pingDTO = new PingDTO(
52+
ping.getId(),
53+
new PingDTO.Aircraft(
54+
aircraft.icao24(),
55+
aircraft.callsign(),
56+
aircraft.originCountry(),
57+
aircraft.lastContact(),
58+
aircraft.squawk(),
59+
aircraft.spi(),
60+
aircraft.sensors()
61+
),
62+
new PingDTO.Vector(
63+
vector.velocity(),
64+
vector.trueTrack(),
65+
vector.verticalRate()
66+
),
67+
new PingDTO.Position(
68+
position.longitude(),
69+
position.latitude(),
70+
position.geoAltitude(),
71+
position.baroAltitude(),
72+
position.onGround(),
73+
position.source(),
74+
position.time()
75+
),
76+
ping.getLastUpdate()
77+
);
78+
79+
String expectedJson = "{\"id\":\"" + ping.getId() + "\",\"aircraft\":{\"icao24\":\"ABC123\",\"callsign\":\"FL123\",\"origin_country\":\"US\",\"last_contact\":\"" + now + "\",\"squawk\":\"7700\",\"spi\":true,\"sensors\":[1,2]},\"vector\":{\"velocity\":500.0,\"true_track\":180.0,\"vertical_rate\":0.0},\"position\":{\"longitude\":10.0,\"latitude\":20.0,\"geo_altitude\":30000.0,\"baro_altitude\":29000.0,\"on_ground\":false,\"source\":1,\"time\":\"" + now + "\"},\"last_update\":\"" + now + "\"}";
80+
81+
when(pingMapper.toDTO(ping)).thenReturn(pingDTO);
82+
when(jsonUtils.toJson(pingDTO)).thenReturn(expectedJson);
83+
84+
// Act
85+
publisher.handlePingCreated(event);
86+
87+
// Assert
88+
verify(pingMapper).toDTO(ping);
89+
verify(jsonUtils).toJson(pingDTO);
90+
verify(mapUpdatesHandler).sendMessage(expectedJson);
91+
}
92+
}

0 commit comments

Comments
 (0)