Skip to content

Commit 3a595c0

Browse files
committed
[broker-14] add subscriber cleaning/restoring after disconnect/reconnect
1 parent dfe926c commit 3a595c0

24 files changed

+425
-336
lines changed

build.gradle

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.6.0"
35+
rlibVersion = "9.8.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'
@@ -86,6 +86,19 @@ allprojects {
8686
tasks.withType(Test) {
8787
maxParallelForks = 2
8888
forkEvery = 100
89+
jvmArgs += "--enable-preview"
90+
}
91+
92+
tasks.withType(JavaCompile) {
93+
options.compilerArgs += "--enable-preview"
94+
}
95+
96+
tasks.withType(GroovyCompile) {
97+
options.forkOptions.jvmArgs << "--enable-preview"
98+
}
99+
100+
tasks.withType(JavaCompile) {
101+
options.compilerArgs.add("--enable-preview")
89102
}
90103

91104
processResources {

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler;
1414
import com.ss.mqtt.broker.model.MqttPropertyConstants;
1515
import com.ss.mqtt.broker.model.QoS;
16+
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
1617
import com.ss.mqtt.broker.network.MqttConnection;
1718
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
1819
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
@@ -68,6 +69,11 @@ private interface ChannelFactory extends
6869
);
6970
}
7071

72+
@Bean
73+
@NotNull TopicSubscribers topicSubscribers() {
74+
return new TopicSubscribers();
75+
}
76+
7177
@Bean
7278
@NotNull MqttSessionService mqttSessionService() {
7379
return new InMemoryMqttSessionService(
@@ -95,15 +101,17 @@ private interface ChannelFactory extends
95101
@NotNull SubscriptionService subscriptionService,
96102
@NotNull PublishingService publishingService,
97103
@NotNull MqttSessionService mqttSessionService,
98-
@NotNull PublishRetryService publishRetryService
104+
@NotNull PublishRetryService publishRetryService,
105+
@NotNull TopicSubscribers topicSubscribers
99106
) {
100107

101108
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
102109
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
103110
clientIdRegistry,
104111
authenticationService,
105112
mqttSessionService,
106-
publishRetryService
113+
publishRetryService,
114+
topicSubscribers
107115
);
108116
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
109117
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -118,9 +126,15 @@ private interface ChannelFactory extends
118126
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
119127
@NotNull ClientIdRegistry clientIdRegistry,
120128
@NotNull MqttSessionService mqttSessionService,
121-
@NotNull PublishRetryService publishRetryService
129+
@NotNull PublishRetryService publishRetryService,
130+
@NotNull TopicSubscribers topicSubscribers
122131
) {
123-
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
132+
return new DefaultMqttClientReleaseHandler(
133+
clientIdRegistry,
134+
mqttSessionService,
135+
publishRetryService,
136+
topicSubscribers
137+
);
124138
}
125139

126140
@Bean
@@ -165,8 +179,8 @@ private interface ChannelFactory extends
165179
}
166180

167181
@Bean
168-
@NotNull SubscriptionService subscriptionService() {
169-
return new SimpleSubscriptionService();
182+
@NotNull SubscriptionService subscriptionService(@NotNull TopicSubscribers topicSubscribers) {
183+
return new SimpleSubscriptionService(topicSubscribers);
170184
}
171185

172186
@Bean

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.ss.mqtt.broker.handler.client;
22

3-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
3+
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
44
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
5+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
56
import com.ss.mqtt.broker.service.ClientIdRegistry;
67
import com.ss.mqtt.broker.service.MqttSessionService;
78
import com.ss.mqtt.broker.service.PublishRetryService;
@@ -19,6 +20,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1920
private final @NotNull ClientIdRegistry clientIdRegistry;
2021
private final @NotNull MqttSessionService sessionService;
2122
private final @NotNull PublishRetryService publishRetryService;
23+
private final @NotNull TopicSubscribers topicSubscribers;
2224

2325
@Override
2426
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -41,9 +43,12 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
4143

4244
Mono<?> asyncActions = null;
4345

44-
if (session != null && client.getConnectionConfig().isSessionsEnabled()) {
45-
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
46-
client.setSession(null);
46+
if (session != null) {
47+
topicSubscribers.cleanSubscribers(client, session.getTopicFilters());
48+
if (client.getConnectionConfig().isSessionsEnabled()) {
49+
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
50+
client.setSession(null);
51+
}
4752
}
4853

4954
if (asyncActions != null) {

src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ss.mqtt.broker.handler.client;
22

3+
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
34
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
45
import com.ss.mqtt.broker.service.ClientIdRegistry;
56
import com.ss.mqtt.broker.service.MqttSessionService;
@@ -11,8 +12,9 @@ public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHa
1112
public DefaultMqttClientReleaseHandler(
1213
@NotNull ClientIdRegistry clientIdRegistry,
1314
@NotNull MqttSessionService sessionService,
14-
@NotNull PublishRetryService publishRetryService
15+
@NotNull PublishRetryService publishRetryService,
16+
@NotNull TopicSubscribers topicSubscribers
1517
) {
16-
super(clientIdRegistry, sessionService, publishRetryService);
18+
super(clientIdRegistry, sessionService, publishRetryService, topicSubscribers);
1719
}
1820
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
99
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1010
import com.ss.mqtt.broker.model.MqttSession;
11+
import com.ss.mqtt.broker.model.topic.TopicSubscribers;
1112
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1213
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
13-
import com.ss.mqtt.broker.service.AuthenticationService;
14-
import com.ss.mqtt.broker.service.ClientIdRegistry;
15-
import com.ss.mqtt.broker.service.MqttSessionService;
16-
import com.ss.mqtt.broker.service.PublishRetryService;
14+
import com.ss.mqtt.broker.service.*;
1715
import com.ss.rlib.common.util.StringUtils;
1816
import lombok.RequiredArgsConstructor;
1917
import org.jetbrains.annotations.NotNull;
@@ -26,6 +24,7 @@ public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
2624
private final @NotNull AuthenticationService authenticationService;
2725
private final @NotNull MqttSessionService mqttSessionService;
2826
private final @NotNull PublishRetryService publishRetryService;
27+
private final @NotNull TopicSubscribers topicSubscribers;
2928

3029
@Override
3130
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -139,6 +138,8 @@ private Mono<Boolean> onConnected(
139138

140139
publishRetryService.register(client);
141140

141+
topicSubscribers.restoreSubscribers(client, session.getTopicFilters());
142+
142143
return Mono.just(Boolean.TRUE);
143144
}
144145

src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.QoS;
6+
import com.ss.mqtt.broker.model.Subscriber;
7+
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
59
import com.ss.mqtt.broker.service.SubscriptionService;
610
import lombok.RequiredArgsConstructor;
711
import org.jetbrains.annotations.NotNull;
@@ -12,7 +16,31 @@ abstract class AbstractPublishInHandler implements PublishInHandler {
1216
protected final @NotNull SubscriptionService subscriptionService;
1317
protected final @NotNull PublishOutHandler[] publishOutHandlers;
1418

15-
protected @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
19+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20+
var result = subscriptionService.forEachTopicSubscriber(
21+
packet.getTopicName(),
22+
packet,
23+
this::publish
24+
);
25+
handleImpl(client, packet, result);
26+
}
27+
28+
private boolean publish(
29+
@NotNull Subscriber subscriber,
30+
@NotNull PublishInPacket packet
31+
) {
32+
return publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
33+
}
34+
35+
private @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
1636
return publishOutHandlers[qos.ordinal()];
1737
}
38+
39+
protected void handleImpl(
40+
@NotNull MqttClient client,
41+
@NotNull PublishInPacket packet,
42+
@NotNull ActionResult result
43+
) {
44+
// nothing to do
45+
}
1846
}
Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4-
import com.ss.mqtt.broker.network.client.MqttClient;
5-
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
64
import com.ss.mqtt.broker.service.SubscriptionService;
75
import org.jetbrains.annotations.NotNull;
86

@@ -14,14 +12,4 @@ public Qos0PublishInHandler(
1412
) {
1513
super(subscriptionService, publishOutHandlers);
1614
}
17-
18-
@Override
19-
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
21-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
22-
23-
for (var subscriber : subscribers) {
24-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25-
}
26-
}
2715
}
Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
45
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
56
import com.ss.mqtt.broker.network.client.MqttClient;
67
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
@@ -17,23 +18,21 @@ public Qos1PublishInHandler(
1718
}
1819

1920
@Override
20-
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
21-
22-
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
23-
24-
for (var subscriber : subscribers) {
25-
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
26-
}
27-
28-
var reasonCode = subscribers.isEmpty() ?
29-
PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS : PublishAckReasonCode.SUCCESS;
30-
21+
protected void handleImpl(
22+
@NotNull MqttClient client,
23+
@NotNull PublishInPacket packet,
24+
@NotNull ActionResult result
25+
) {
26+
PublishAckReasonCode reasonCode = switch (result) {
27+
case EMPTY -> PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
28+
case SUCCESS -> PublishAckReasonCode.SUCCESS;
29+
default -> PublishAckReasonCode.UNSPECIFIED_ERROR;
30+
};
3131
var ackPacket = client.getPacketOutFactory().newPublishAck(
3232
client,
3333
packet.getPacketId(),
3434
reasonCode
3535
);
36-
3736
client.send(ackPacket);
3837
}
3938
}

src/main/java/com/ss/mqtt/broker/handler/publish/out/PublishOutHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
*/
1010
public interface PublishOutHandler {
1111

12-
void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber);
12+
boolean handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber);
1313
}

src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
public class Qos0PublishOutHandler extends AbstractPublishOutHandler {
1010

1111
@Override
12-
public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
12+
public boolean handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
1313

1414
var client = subscriber.getMqttClient();
1515
var packetOutFactory = client.getPacketOutFactory();
@@ -20,13 +20,14 @@ public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscrib
2020
QoS.AT_MOST_ONCE_DELIVERY,
2121
packet.isRetained(),
2222
false,
23-
packet.getTopicName(),
23+
packet.getTopicName().toString(),
2424
MqttPropertyConstants.TOPIC_ALIAS_NOT_SET,
2525
packet.getPayload(),
2626
packet.isPayloadFormatIndicator(),
2727
packet.getResponseTopic(),
2828
packet.getCorrelationData(),
2929
packet.getUserProperties()
3030
));
31+
return true;
3132
}
3233
}

0 commit comments

Comments
 (0)