Skip to content

Commit bfaa33f

Browse files
committed
[broker-11] finish implement mqtt session service, add correct disconnect flow
1 parent 497b232 commit bfaa33f

File tree

15 files changed

+241
-55
lines changed

15 files changed

+241
-55
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import com.ss.mqtt.broker.model.MqttPropertyConstants;
44
import com.ss.mqtt.broker.model.QoS;
55
import com.ss.mqtt.broker.network.MqttConnection;
6+
import com.ss.mqtt.broker.network.client.MqttClientReleaseHandler;
67
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
78
import com.ss.mqtt.broker.network.client.impl.DeviceMqttClient;
9+
import com.ss.mqtt.broker.network.client.impl.DeviceMqttClientReleaseHandler;
810
import com.ss.mqtt.broker.network.packet.PacketType;
911
import com.ss.mqtt.broker.network.packet.in.handler.*;
1012
import com.ss.mqtt.broker.service.*;
@@ -96,25 +98,34 @@ private interface ChannelFactory extends
9698
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
9799
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
98100
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
101+
handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler();
99102

100103
return handlers;
101104
}
102105

106+
@Bean
107+
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler(
108+
@NotNull ClientIdRegistry clientIdRegistry,
109+
@NotNull MqttSessionService mqttSessionService
110+
) {
111+
return new DeviceMqttClientReleaseHandler(clientIdRegistry, mqttSessionService);
112+
}
113+
103114
@Bean
104115
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork(
105116
@NotNull ServerNetworkConfig networkConfig,
106117
@NotNull BufferAllocator bufferAllocator,
107118
@NotNull MqttConnectionConfig deviceConnectionConfig,
108119
PacketInHandler @NotNull [] devicePacketHandlers,
109-
@NotNull MqttSessionService mqttSessionService
120+
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler
110121
) {
111122
return NetworkFactory.newServerNetwork(
112123
networkConfig,
113124
deviceConnectionFactory(
114125
bufferAllocator,
115126
deviceConnectionConfig,
116127
devicePacketHandlers,
117-
mqttSessionService
128+
deviceMqttClientReleaseHandler
118129
)
119130
);
120131
}
@@ -188,7 +199,7 @@ private interface ChannelFactory extends
188199
@NotNull BufferAllocator bufferAllocator,
189200
@NotNull MqttConnectionConfig connectionConfig,
190201
PacketInHandler @NotNull [] packetHandlers,
191-
@NotNull MqttSessionService mqttSessionService
202+
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler
192203
) {
193204
return (network, channel) -> new MqttConnection(
194205
network,
@@ -197,8 +208,7 @@ private interface ChannelFactory extends
197208
100,
198209
packetHandlers,
199210
connectionConfig,
200-
DeviceMqttClient::new,
201-
mqttSessionService
211+
mqttConnection -> new DeviceMqttClient(mqttConnection, deviceMqttClientReleaseHandler)
202212
);
203213
}
204214
}

src/main/java/com/ss/mqtt/broker/network/MqttConnection.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@
33
import com.ss.mqtt.broker.config.MqttConnectionConfig;
44
import com.ss.mqtt.broker.model.MqttSession;
55
import com.ss.mqtt.broker.model.MqttVersion;
6-
import com.ss.mqtt.broker.network.client.MqttClient;
76
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
87
import com.ss.mqtt.broker.network.packet.MqttPacketReader;
98
import com.ss.mqtt.broker.network.packet.MqttPacketWriter;
109
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
1110
import com.ss.mqtt.broker.network.packet.in.handler.PacketInHandler;
1211
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
13-
import com.ss.mqtt.broker.service.MqttSessionService;
1412
import com.ss.rlib.network.BufferAllocator;
1513
import com.ss.rlib.network.Connection;
1614
import com.ss.rlib.network.Network;
@@ -36,11 +34,10 @@ public class MqttConnection extends AbstractConnection<MqttReadablePacket, MqttW
3634

3735
@Getter(AccessLevel.PROTECTED)
3836
private final @NotNull PacketWriter packetWriter;
39-
private final @NotNull MqttSessionService sessionService;
4037

4138
private final @Getter PacketInHandler @NotNull [] packetHandlers;
4239

43-
private final @Getter @NotNull MqttClient client;
40+
private final @Getter @NotNull UnsafeMqttClient client;
4441
private final @Getter @NotNull MqttConnectionConfig config;
4542

4643
private volatile @Getter @Setter @NotNull MqttVersion mqttVersion;
@@ -53,13 +50,11 @@ public MqttConnection(
5350
int maxPacketsByRead,
5451
PacketInHandler @NotNull [] packetHandlers,
5552
@NotNull MqttConnectionConfig config,
56-
@NotNull Function<MqttConnection, UnsafeMqttClient> clientFactory,
57-
@NotNull MqttSessionService mqttSessionService
53+
@NotNull Function<MqttConnection, UnsafeMqttClient> clientFactory
5854
) {
5955
super(network, channel, NetworkCryptor.NULL, bufferAllocator, maxPacketsByRead);
6056
this.packetHandlers = packetHandlers;
6157
this.config = config;
62-
this.sessionService = mqttSessionService;
6358
this.mqttVersion = MqttVersion.MQTT_3_1_1;
6459
this.packetReader = createPacketReader();
6560
this.packetWriter = createPacketWriter();
@@ -100,14 +95,7 @@ public boolean isSupported(@NotNull MqttVersion mqttVersion) {
10095

10196
@Override
10297
protected void doClose() {
103-
104-
var session = getSession();
105-
106-
if (session != null) {
107-
sessionService.store(client.getClientId(), session)
108-
.subscribe(result -> setSession(null));
109-
}
110-
98+
client.release().subscribe();
11199
super.doClose();
112100
}
113101
}

src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package com.ss.mqtt.broker.network.client;
22

33
import com.ss.mqtt.broker.config.MqttConnectionConfig;
4+
import com.ss.mqtt.broker.model.MqttSession;
45
import com.ss.mqtt.broker.network.packet.factory.MqttPacketOutFactory;
56
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
67
import org.jetbrains.annotations.NotNull;
8+
import reactor.core.publisher.Mono;
79

810
public interface MqttClient {
911

1012
@NotNull MqttPacketOutFactory getPacketOutFactory();
1113
@NotNull MqttConnectionConfig getConnectionConfig();
14+
1215
@NotNull String getClientId();
16+
@NotNull MqttSession getSession();
1317

1418
int getKeepAlive();
1519
int getMaximumPacketSize();
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.ss.mqtt.broker.network.client;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import reactor.core.publisher.Mono;
5+
6+
public interface MqttClientReleaseHandler {
7+
8+
@NotNull Mono<?> release(@NotNull UnsafeMqttClient client);
9+
}

src/main/java/com/ss/mqtt/broker/network/client/UnsafeMqttClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.ss.mqtt.broker.network.MqttConnection;
66
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
77
import org.jetbrains.annotations.NotNull;
8+
import reactor.core.publisher.Mono;
89

910
public interface UnsafeMqttClient extends MqttClient {
1011

@@ -25,4 +26,6 @@ void configure(
2526
void setSession(@NotNull MqttSession session);
2627

2728
void reject(@NotNull ConnectAckReasonCode reasonCode);
29+
30+
@NotNull Mono<?> release();
2831
}

src/main/java/com/ss/mqtt/broker/network/client/impl/AbstractMqttClient.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.ss.mqtt.broker.model.MqttPropertyConstants;
66
import com.ss.mqtt.broker.model.MqttSession;
77
import com.ss.mqtt.broker.network.MqttConnection;
8+
import com.ss.mqtt.broker.network.client.MqttClientReleaseHandler;
89
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
910
import com.ss.mqtt.broker.network.packet.factory.MqttPacketOutFactory;
1011
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
@@ -13,16 +14,23 @@
1314
import lombok.EqualsAndHashCode;
1415
import lombok.Getter;
1516
import lombok.Setter;
17+
import lombok.ToString;
1618
import lombok.extern.log4j.Log4j2;
1719
import org.jetbrains.annotations.NotNull;
1820
import org.jetbrains.annotations.Nullable;
21+
import reactor.core.publisher.Mono;
22+
23+
import java.util.concurrent.atomic.AtomicBoolean;
1924

2025
@Getter
2126
@Log4j2
27+
@ToString(of = "clientId")
2228
@EqualsAndHashCode(of = "clientId")
2329
public abstract class AbstractMqttClient implements UnsafeMqttClient {
2430

2531
protected final @NotNull MqttConnection connection;
32+
protected final MqttClientReleaseHandler releaseHandler;
33+
protected final AtomicBoolean released;
2634

2735
private volatile @Setter @NotNull String clientId;
2836
private volatile @Setter @Getter @Nullable MqttSession session;
@@ -33,8 +41,10 @@ public abstract class AbstractMqttClient implements UnsafeMqttClient {
3341
private volatile int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
3442
private volatile int keepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_MAX;
3543

36-
public AbstractMqttClient(@NotNull MqttConnection connection) {
44+
public AbstractMqttClient(@NotNull MqttConnection connection, @NotNull MqttClientReleaseHandler releaseHandler) {
3745
this.connection = connection;
46+
this.releaseHandler = releaseHandler;
47+
this.released = new AtomicBoolean(false);
3848
this.clientId = StringUtils.EMPTY;
3949
}
4050

@@ -86,4 +96,13 @@ public void reject(@NotNull ConnectAckReasonCode reasonCode) {
8696
public @NotNull MqttConnectionConfig getConnectionConfig() {
8797
return connection.getConfig();
8898
}
99+
100+
@Override
101+
public @NotNull Mono<?> release() {
102+
if (released.compareAndSet(false, true)) {
103+
return releaseHandler.release(this);
104+
} else {
105+
return Mono.empty();
106+
}
107+
}
89108
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.ss.mqtt.broker.network.client.impl;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClientReleaseHandler;
4+
import com.ss.mqtt.broker.network.client.UnsafeMqttClient;
5+
import com.ss.mqtt.broker.service.ClientIdRegistry;
6+
import com.ss.mqtt.broker.service.MqttSessionService;
7+
import com.ss.rlib.common.util.StringUtils;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.log4j.Log4j2;
10+
import org.jetbrains.annotations.NotNull;
11+
import reactor.core.publisher.Mono;
12+
13+
@Log4j2
14+
@RequiredArgsConstructor
15+
public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttClient> implements
16+
MqttClientReleaseHandler {
17+
18+
private final @NotNull ClientIdRegistry clientIdRegistry;
19+
private final @NotNull MqttSessionService sessionService;
20+
21+
@Override
22+
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
23+
//noinspection unchecked
24+
return releaseImpl((T) client);
25+
}
26+
27+
protected @NotNull Mono<?> releaseImpl(@NotNull T client) {
28+
29+
var clientId = client.getClientId();
30+
client.setClientId(StringUtils.EMPTY);
31+
32+
if (StringUtils.isEmpty(clientId)) {
33+
log.warn("This client {} is already released.", client);
34+
return Mono.empty();
35+
}
36+
37+
var session = client.getSession();
38+
39+
Mono<?> asyncActions = null;
40+
41+
if (session != null) {
42+
asyncActions = sessionService.store(clientId, session);
43+
client.setSession(null);
44+
}
45+
46+
if (asyncActions != null) {
47+
asyncActions = asyncActions.flatMap(any -> clientIdRegistry.unregister(clientId));
48+
} else {
49+
asyncActions = clientIdRegistry.unregister(clientId);
50+
}
51+
52+
if (asyncActions != null) {
53+
return asyncActions;
54+
}
55+
56+
return Mono.empty();
57+
}
58+
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package com.ss.mqtt.broker.network.client.impl;
22

33
import com.ss.mqtt.broker.network.MqttConnection;
4+
import com.ss.mqtt.broker.network.client.MqttClientReleaseHandler;
45
import org.jetbrains.annotations.NotNull;
56

67
public class DeviceMqttClient extends AbstractMqttClient {
78

8-
public DeviceMqttClient(@NotNull MqttConnection connection) {
9-
super(connection);
9+
public DeviceMqttClient(@NotNull MqttConnection connection, @NotNull MqttClientReleaseHandler releaseHandler) {
10+
super(connection, releaseHandler);
1011
}
1112
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.ss.mqtt.broker.network.client.impl;
2+
3+
import com.ss.mqtt.broker.service.ClientIdRegistry;
4+
import com.ss.mqtt.broker.service.MqttSessionService;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
public class DeviceMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
8+
9+
public DeviceMqttClientReleaseHandler(
10+
@NotNull ClientIdRegistry clientIdRegistry,
11+
@NotNull MqttSessionService sessionService
12+
) {
13+
super(clientIdRegistry, sessionService);
14+
}
15+
}

src/main/java/com/ss/mqtt/broker/network/packet/in/DisconnectInPacket.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.ss.mqtt.broker.network.packet.PacketType;
99
import com.ss.rlib.common.util.StringUtils;
1010
import lombok.Getter;
11+
import lombok.ToString;
1112
import org.jetbrains.annotations.NotNull;
1213

1314
import java.nio.ByteBuffer;
@@ -18,6 +19,7 @@
1819
* Disconnect notification.
1920
*/
2021
@Getter
22+
@ToString
2123
public class DisconnectInPacket extends MqttReadablePacket {
2224

2325
public static final byte PACKET_TYPE = (byte) PacketType.DISCONNECT.ordinal();
@@ -86,7 +88,7 @@ protected void readImpl(@NotNull MqttConnection connection, @NotNull ByteBuffer
8688
protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) {
8789

8890
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901207
89-
if (connection.isSupported(MqttVersion.MQTT_5)) {
91+
if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) {
9092
reasonCode = DisconnectReasonCode.of(readUnsignedByte(buffer));
9193
}
9294
}

0 commit comments

Comments
 (0)