Skip to content

Commit d36a5bc

Browse files
committed
[broker-15] implement message expiration and re-trying for QoS 1+ messages
1 parent 8116d5c commit d36a5bc

13 files changed

+268
-80
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.ss.mqtt.broker.config;
22

3-
import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler;
3+
import com.ss.mqtt.broker.handler.client.DefaultMqttClientReleaseHandler;
44
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
55
import com.ss.mqtt.broker.handler.packet.in.*;
66
import com.ss.mqtt.broker.handler.publish.in.PublishInHandler;
@@ -94,14 +94,16 @@ private interface ChannelFactory extends
9494
@NotNull ClientIdRegistry clientIdRegistry,
9595
@NotNull SubscriptionService subscriptionService,
9696
@NotNull PublishingService publishingService,
97-
@NotNull MqttSessionService mqttSessionService
97+
@NotNull MqttSessionService mqttSessionService,
98+
@NotNull PublishRetryService publishRetryService
9899
) {
99100

100101
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
101102
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
102103
clientIdRegistry,
103104
authenticationService,
104-
mqttSessionService
105+
mqttSessionService,
106+
publishRetryService
105107
);
106108
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
107109
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -113,11 +115,12 @@ private interface ChannelFactory extends
113115
}
114116

115117
@Bean
116-
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler(
118+
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
117119
@NotNull ClientIdRegistry clientIdRegistry,
118-
@NotNull MqttSessionService mqttSessionService
120+
@NotNull MqttSessionService mqttSessionService,
121+
@NotNull PublishRetryService publishRetryService
119122
) {
120-
return new DeviceMqttClientReleaseHandler(clientIdRegistry, mqttSessionService);
123+
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
121124
}
122125

123126
@Bean
@@ -139,6 +142,14 @@ private interface ChannelFactory extends
139142
);
140143
}
141144

145+
@Bean
146+
@NotNull PublishRetryService publishRetryService() {
147+
return new DefaultPublishRetryService(
148+
env.getProperty("publish.pending.check.interval", int.class, 60 * 1000),
149+
env.getProperty("publish.retry.interval", int.class, 60 * 1000)
150+
);
151+
}
152+
142153
@Bean
143154
@NotNull InetSocketAddress deviceNetworkAddress(
144155
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
55
import com.ss.mqtt.broker.service.ClientIdRegistry;
66
import com.ss.mqtt.broker.service.MqttSessionService;
7+
import com.ss.mqtt.broker.service.PublishRetryService;
78
import com.ss.rlib.common.util.StringUtils;
89
import lombok.RequiredArgsConstructor;
910
import lombok.extern.log4j.Log4j2;
@@ -17,6 +18,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1718

1819
private final @NotNull ClientIdRegistry clientIdRegistry;
1920
private final @NotNull MqttSessionService sessionService;
21+
private final @NotNull PublishRetryService publishRetryService;
2022

2123
@Override
2224
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -25,6 +27,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
2527
}
2628

2729
protected @NotNull Mono<?> releaseImpl(@NotNull T client) {
30+
publishRetryService.unregister(client);
2831

2932
var clientId = client.getClientId();
3033
client.setClientId(StringUtils.EMPTY);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.ss.mqtt.broker.handler.client;
2+
3+
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
4+
import com.ss.mqtt.broker.service.ClientIdRegistry;
5+
import com.ss.mqtt.broker.service.MqttSessionService;
6+
import com.ss.mqtt.broker.service.PublishRetryService;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
10+
11+
public DefaultMqttClientReleaseHandler(
12+
@NotNull ClientIdRegistry clientIdRegistry,
13+
@NotNull MqttSessionService sessionService,
14+
@NotNull PublishRetryService publishRetryService
15+
) {
16+
super(clientIdRegistry, sessionService, publishRetryService);
17+
}
18+
}

src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.ss.mqtt.broker.service.AuthenticationService;
1414
import com.ss.mqtt.broker.service.ClientIdRegistry;
1515
import com.ss.mqtt.broker.service.MqttSessionService;
16+
import com.ss.mqtt.broker.service.PublishRetryService;
1617
import com.ss.rlib.common.util.StringUtils;
1718
import lombok.RequiredArgsConstructor;
1819
import org.jetbrains.annotations.NotNull;
@@ -21,9 +22,10 @@
2122
@RequiredArgsConstructor
2223
public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, ConnectInPacket> {
2324

24-
private final ClientIdRegistry clientIdRegistry;
25-
private final AuthenticationService authenticationService;
26-
private final MqttSessionService mqttSessionService;
25+
private final @NotNull ClientIdRegistry clientIdRegistry;
26+
private final @NotNull AuthenticationService authenticationService;
27+
private final @NotNull MqttSessionService mqttSessionService;
28+
private final @NotNull PublishRetryService publishRetryService;
2729

2830
@Override
2931
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -135,6 +137,8 @@ private Mono<Boolean> onConnected(
135137
packet.getReceiveMax()
136138
));
137139

140+
publishRetryService.register(client);
141+
138142
return Mono.just(Boolean.TRUE);
139143
}
140144

src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscrib
1919
MqttPropertyConstants.PACKET_ID_FOR_QOS_0,
2020
QoS.AT_MOST_ONCE_DELIVERY,
2121
packet.isRetained(),
22-
packet.isDuplicate(),
22+
false,
2323
packet.getTopicName(),
2424
MqttPropertyConstants.TOPIC_ALIAS_NOT_SET,
2525
packet.getPayload(),
Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,70 @@
11
package com.ss.mqtt.broker.handler.publish.out;
22

33
import com.ss.mqtt.broker.model.MqttPropertyConstants;
4+
import com.ss.mqtt.broker.model.MqttSession;
45
import com.ss.mqtt.broker.model.QoS;
56
import com.ss.mqtt.broker.model.Subscriber;
7+
import com.ss.mqtt.broker.network.client.MqttClient;
8+
import com.ss.mqtt.broker.network.packet.HasPacketId;
69
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
710
import lombok.RequiredArgsConstructor;
811
import org.jetbrains.annotations.NotNull;
912

1013
@RequiredArgsConstructor
11-
public class Qos1PublishOutHandler extends AbstractPublishOutHandler {
14+
public class Qos1PublishOutHandler extends AbstractPublishOutHandler implements MqttSession.PendingPacketHandler {
1215

1316
@Override
1417
public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
1518

1619
var client = subscriber.getMqttClient();
1720
var session = client.getSession();
18-
var packetOutFactory = client.getPacketOutFactory();
1921

20-
var publish = packetOutFactory.newPublish(
22+
// it means this client was already closed
23+
if (session == null) {
24+
return;
25+
}
26+
27+
var packetId = session.nextPacketId();
28+
session.registerPendingPublish(packet, this, packetId);
29+
30+
var packetOutFactory = client.getPacketOutFactory();
31+
client.send(packetOutFactory.newPublish(
2132
client,
22-
session.nextPacketId(),
33+
packetId,
2334
QoS.AT_LEAST_ONCE_DELIVERY,
2435
packet.isRetained(),
25-
packet.isDuplicate(),
36+
false,
2637
packet.getTopicName(),
2738
MqttPropertyConstants.TOPIC_ALIAS_NOT_SET,
2839
packet.getPayload(),
2940
packet.isPayloadFormatIndicator(),
3041
packet.getResponseTopic(),
3142
packet.getCorrelationData(),
3243
packet.getUserProperties()
33-
);
44+
));
45+
}
3446

35-
session.registerPendingPublish(publish);
47+
@Override
48+
public boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response) {
49+
return true;
50+
}
3651

37-
client.send(publish);
52+
@Override
53+
public void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) {
54+
var packetOutFactory = client.getPacketOutFactory();
55+
client.send(packetOutFactory.newPublish(
56+
client,
57+
packetId,
58+
QoS.AT_LEAST_ONCE_DELIVERY,
59+
packet.isRetained(),
60+
true,
61+
packet.getTopicName(),
62+
MqttPropertyConstants.TOPIC_ALIAS_NOT_SET,
63+
packet.getPayload(),
64+
packet.isPayloadFormatIndicator(),
65+
packet.getResponseTopic(),
66+
packet.getCorrelationData(),
67+
packet.getUserProperties()
68+
));
3869
}
3970
}

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public interface MqttPropertyConstants {
2626
boolean PAYLOAD_FORMAT_INDICATOR_DEFAULT = false;
2727

2828
long MESSAGE_EXPIRY_INTERVAL_DEFAULT = 0;
29+
long MESSAGE_EXPIRY_INTERVAL_UNDEFINED = -1;
2930

3031
int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1;
3132
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;

src/main/java/com/ss/mqtt/broker/model/MqttSession.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
import com.ss.mqtt.broker.network.client.MqttClient;
44
import com.ss.mqtt.broker.network.packet.HasPacketId;
5-
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
6-
import com.ss.mqtt.broker.network.packet.out.PublishOutPacket;
5+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
76
import org.jetbrains.annotations.NotNull;
87

98
public interface MqttSession {
@@ -15,14 +14,14 @@ interface UnsafeMqttSession extends MqttSession {
1514
void clear();
1615
}
1716

18-
interface PendingCallback<T extends HasPacketId> {
19-
20-
@NotNull PendingCallback<?> EMPTY = (client, feedback) -> true;
17+
interface PendingPacketHandler {
2118

2219
/**
23-
* @return true of pending packet can be removed.
20+
* @return true if pending packet can be removed.
2421
*/
25-
boolean handle(@NotNull MqttClient client, @NotNull T feedback);
22+
boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response);
23+
24+
void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId);
2625
}
2726

2827
@NotNull String getClientId();
@@ -34,15 +33,9 @@ interface PendingCallback<T extends HasPacketId> {
3433
*/
3534
long getExpirationTime();
3635

37-
void registerPendingPublish(@NotNull PublishOutPacket publish);
38-
39-
<T extends MqttReadablePacket & HasPacketId> void registerPendingPublish(
40-
@NotNull PublishOutPacket publish,
41-
@NotNull MqttSession.PendingCallback<T> callback
42-
);
36+
void removeExpiredPackets();
37+
void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval);
4338

44-
<T extends MqttReadablePacket & HasPacketId> void unregisterPendingPacket(
45-
@NotNull MqttClient client,
46-
@NotNull T feedback
47-
);
39+
void registerPendingPublish(@NotNull PublishInPacket publish, @NotNull PendingPacketHandler handler, int packetId);
40+
void unregisterPendingPacket(@NotNull MqttClient client, @NotNull HasPacketId response);
4841
}

0 commit comments

Comments
 (0)