Skip to content

Commit 497b232

Browse files
committed
[broker-11] implement in-memory mqtt session service
1 parent bdfe7dc commit 497b232

File tree

10 files changed

+205
-95
lines changed

10 files changed

+205
-95
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
import com.ss.mqtt.broker.network.packet.in.handler.*;
1010
import com.ss.mqtt.broker.service.*;
1111
import com.ss.mqtt.broker.service.impl.*;
12-
import com.ss.rlib.network.*;
12+
import com.ss.rlib.network.BufferAllocator;
13+
import com.ss.rlib.network.Network;
14+
import com.ss.rlib.network.NetworkFactory;
15+
import com.ss.rlib.network.ServerNetworkConfig;
1316
import com.ss.rlib.network.impl.DefaultBufferAllocator;
1417
import com.ss.rlib.network.server.ServerNetwork;
1518
import lombok.RequiredArgsConstructor;
@@ -44,11 +47,6 @@ private interface ChannelFactory extends
4447
return new DefaultBufferAllocator(networkConfig);
4548
}
4649

47-
@Bean
48-
@NotNull ClientService clientService() {
49-
return new DefaultClientService();
50-
}
51-
5250
@Bean
5351
@NotNull ClientIdRegistry clientIdRegistry() {
5452
return new InMemoryClientIdRegistry(
@@ -85,11 +83,16 @@ private interface ChannelFactory extends
8583
@NotNull AuthenticationService authenticationService,
8684
@NotNull ClientIdRegistry clientIdRegistry,
8785
@NotNull SubscriptionService subscriptionService,
88-
@NotNull PublishingService publishingService
86+
@NotNull PublishingService publishingService,
87+
@NotNull MqttSessionService mqttSessionService
8988
) {
9089

9190
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
92-
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(clientIdRegistry, authenticationService);
91+
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
92+
clientIdRegistry,
93+
authenticationService,
94+
mqttSessionService
95+
);
9396
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
9497
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
9598
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);

src/main/java/com/ss/mqtt/broker/model/MqttSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
public interface MqttSession {
66

7+
interface UnsafeMqttSession extends MqttSession {
8+
9+
void setExpirationTime(long expirationTime);
10+
}
11+
712
@NotNull String getClientId();
813

914
/**
Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
package com.ss.mqtt.broker.model.impl;
22

3-
import com.ss.mqtt.broker.model.MqttSession;
3+
import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession;
4+
import lombok.Getter;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.Setter;
7+
import org.jetbrains.annotations.NotNull;
48

5-
public class DefaultMqttSession implements MqttSession {}
9+
@RequiredArgsConstructor
10+
public class DefaultMqttSession implements UnsafeMqttSession {
11+
12+
private final @NotNull String clientId;
13+
14+
private volatile @Getter @Setter long expirationTime = -1;
15+
16+
@Override
17+
public @NotNull String getClientId() {
18+
return clientId;
19+
}
20+
}

src/main/java/com/ss/mqtt/broker/network/MqttConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ protected void doClose() {
104104
var session = getSession();
105105

106106
if (session != null) {
107-
sessionService.store(getClient().getClientId(), session)
107+
sessionService.store(client.getClientId(), session)
108108
.subscribe(result -> setSession(null));
109109
}
110110

src/main/java/com/ss/mqtt/broker/network/client/UnsafeMqttClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.network.client;
22

33
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
4+
import com.ss.mqtt.broker.model.MqttSession;
45
import com.ss.mqtt.broker.network.MqttConnection;
56
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
67
import org.jetbrains.annotations.NotNull;
@@ -21,5 +22,7 @@ void configure(
2122

2223
void setClientId(@NotNull String clientId);
2324

25+
void setSession(@NotNull MqttSession session);
26+
2427
void reject(@NotNull ConnectAckReasonCode reasonCode);
2528
}

src/main/java/com/ss/mqtt/broker/network/client/impl/AbstractMqttClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.ss.mqtt.broker.config.MqttConnectionConfig;
44
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
55
import com.ss.mqtt.broker.model.MqttPropertyConstants;
6+
import com.ss.mqtt.broker.model.MqttSession;
67
import com.ss.mqtt.broker.network.MqttConnection;
78
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
89
import com.ss.mqtt.broker.network.packet.factory.MqttPacketOutFactory;
@@ -14,6 +15,7 @@
1415
import lombok.Setter;
1516
import lombok.extern.log4j.Log4j2;
1617
import org.jetbrains.annotations.NotNull;
18+
import org.jetbrains.annotations.Nullable;
1719

1820
@Getter
1921
@Log4j2
@@ -23,6 +25,7 @@ public abstract class AbstractMqttClient implements UnsafeMqttClient {
2325
protected final @NotNull MqttConnection connection;
2426

2527
private volatile @Setter @NotNull String clientId;
28+
private volatile @Setter @Getter @Nullable MqttSession session;
2629

2730
private volatile long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
2831
private volatile int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/ConnectInPacketHandler.java

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package com.ss.mqtt.broker.network.packet.in.handler;
22

3-
import static reactor.core.publisher.Mono.fromCallable;
4-
import static reactor.core.publisher.Mono.fromRunnable;
3+
import static com.ss.mqtt.broker.model.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
4+
import static com.ss.mqtt.broker.model.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;
5+
import static com.ss.mqtt.broker.util.ReactorUtils.ifTrue;
56
import com.ss.mqtt.broker.exception.ConnectionRejectException;
67
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
78
import com.ss.mqtt.broker.model.ConnectAckReasonCode;
9+
import com.ss.mqtt.broker.model.MqttSession;
810
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
911
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
1012
import com.ss.mqtt.broker.service.AuthenticationService;
1113
import com.ss.mqtt.broker.service.ClientIdRegistry;
14+
import com.ss.mqtt.broker.service.MqttSessionService;
1215
import com.ss.rlib.common.util.StringUtils;
1316
import lombok.RequiredArgsConstructor;
1417
import org.jetbrains.annotations.NotNull;
@@ -19,6 +22,7 @@ public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
1922

2023
private final ClientIdRegistry clientIdRegistry;
2124
private final AuthenticationService authenticationService;
25+
private final MqttSessionService mqttSessionService;
2226

2327
@Override
2428
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -31,61 +35,60 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPa
3135
}
3236

3337
authenticationService.auth(packet.getUsername(), packet.getPassword())
34-
.filter(Boolean::booleanValue)
35-
.flatMap(authenticated -> registerClient(client, packet))
36-
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD)))
38+
.flatMap(ifTrue(client, packet, this::registerClient, BAD_USER_NAME_OR_PASSWORD, client::reject))
39+
.flatMap(ifTrue(client, packet, this::restoreSession, CLIENT_IDENTIFIER_NOT_VALID, client::reject))
3740
.subscribe();
3841
}
3942

4043
private @NotNull Mono<Boolean> registerClient(
4144
@NotNull UnsafeMqttClient client,
4245
@NotNull ConnectInPacket packet
4346
) {
47+
4448
var requestedClientId = packet.getClientId();
45-
if (StringUtils.isEmpty(requestedClientId)) {
46-
return processWithoutClientId(client, packet, requestedClientId);
49+
50+
if (StringUtils.isNotEmpty(requestedClientId)) {
51+
return clientIdRegistry.register(requestedClientId)
52+
.map(ifTrue(requestedClientId, client::setClientId));
4753
} else {
48-
return processWithClientId(client, packet, requestedClientId);
54+
return clientIdRegistry.generate()
55+
.flatMap(newClientId -> clientIdRegistry.register(newClientId)
56+
.map(ifTrue(newClientId, client::setClientId)));
4957
}
5058
}
5159

52-
private @NotNull Mono<Boolean> processWithClientId(
60+
private @NotNull Mono<Boolean> restoreSession(
5361
@NotNull UnsafeMqttClient client,
54-
@NotNull ConnectInPacket packet,
55-
@NotNull String requestedClientId
62+
@NotNull ConnectInPacket packet
5663
) {
57-
return clientIdRegistry.register(requestedClientId)
58-
.filter(Boolean::booleanValue)
59-
.flatMap(registered -> fromCallable(() -> {
60-
client.setClientId(requestedClientId);
61-
onConnected(client, packet, requestedClientId);
62-
return true;
63-
}))
64-
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID)));
65-
}
6664

67-
private @NotNull Mono<Boolean> processWithoutClientId(
68-
@NotNull UnsafeMqttClient client,
69-
@NotNull ConnectInPacket packet,
70-
@NotNull String requestedClientId
71-
) {
72-
return clientIdRegistry.generate()
73-
.doOnNext(client::setClientId)
74-
.flatMap(clientIdRegistry::register)
75-
.filter(Boolean::booleanValue)
76-
.flatMap(registered -> fromCallable(() -> {
77-
onConnected(client, packet, requestedClientId);
78-
return true;
79-
}))
80-
.switchIfEmpty(fromRunnable(() -> client.reject(ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID)));
65+
if (packet.isCleanStart()) {
66+
return mqttSessionService.create(client.getClientId())
67+
.flatMap(session -> onConnected(client, packet, session, false));
68+
} else {
69+
return mqttSessionService.restore(client.getClientId())
70+
.flatMap(session -> onConnected(client, packet, session, true))
71+
.switchIfEmpty(Mono.defer(() -> mqttSessionService.create(client.getClientId())
72+
.flatMap(session -> onConnected(client, packet, session,false))));
73+
}
8174
}
8275

83-
private void onConnected(
76+
private Mono<Boolean> onConnected(
8477
@NotNull UnsafeMqttClient client,
8578
@NotNull ConnectInPacket packet,
86-
@NotNull String requestedClientId
79+
@NotNull MqttSession session,
80+
boolean sessionRestored
8781
) {
8882

83+
var connection = client.getConnection();
84+
85+
// if it was closed in parallel
86+
if (connection.isClosed()) {
87+
// store the session again
88+
return mqttSessionService.store(client.getClientId(), session);
89+
}
90+
91+
client.setSession(session);
8992
client.configure(
9093
packet.getSessionExpiryInterval(),
9194
packet.getReceiveMax(),
@@ -97,13 +100,14 @@ private void onConnected(
97100
client.send(client.getPacketOutFactory().newConnectAck(
98101
client,
99102
ConnectAckReasonCode.SUCCESS,
100-
false,
101-
requestedClientId,
103+
sessionRestored,
104+
packet.getClientId(),
102105
packet.getSessionExpiryInterval(),
103106
packet.getKeepAlive()
104107
));
105-
}
106108

109+
return Mono.just(Boolean.TRUE);
110+
}
107111

108112
private boolean checkPacketException(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
109113

src/main/java/com/ss/mqtt/broker/service/MqttSessionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66

77
public interface MqttSessionService {
88

9-
@NotNull Mono<MqttSession> getOrCreate(@NotNull String clientId);
9+
@NotNull Mono<MqttSession> restore(@NotNull String clientId);
1010

11-
@NotNull Mono<MqttSession> createNew(@NotNull String clientId);
11+
@NotNull Mono<MqttSession> create(@NotNull String clientId);
1212

1313
@NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session);
1414
}

0 commit comments

Comments
 (0)