Skip to content

Commit 9f86ec7

Browse files
committed
[broker-25] start working on re-implement re-try publish strategy
1 parent c368552 commit 9f86ec7

File tree

15 files changed

+202
-210
lines changed

15 files changed

+202
-210
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,14 @@ private interface ChannelFactory extends
9494
@NotNull ClientIdRegistry clientIdRegistry,
9595
@NotNull SubscriptionService subscriptionService,
9696
@NotNull PublishingService publishingService,
97-
@NotNull MqttSessionService mqttSessionService,
98-
@NotNull PublishRetryService publishRetryService
97+
@NotNull MqttSessionService mqttSessionService
9998
) {
10099

101100
var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
102101
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
103102
clientIdRegistry,
104103
authenticationService,
105-
mqttSessionService,
106-
publishRetryService
104+
mqttSessionService
107105
);
108106
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
109107
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
@@ -120,10 +118,9 @@ private interface ChannelFactory extends
120118
@Bean
121119
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
122120
@NotNull ClientIdRegistry clientIdRegistry,
123-
@NotNull MqttSessionService mqttSessionService,
124-
@NotNull PublishRetryService publishRetryService
121+
@NotNull MqttSessionService mqttSessionService
125122
) {
126-
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
123+
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService);
127124
}
128125

129126
@Bean
@@ -145,14 +142,6 @@ private interface ChannelFactory extends
145142
);
146143
}
147144

148-
@Bean
149-
@NotNull PublishRetryService publishRetryService() {
150-
return new DefaultPublishRetryService(
151-
env.getProperty("publish.pending.check.interval", int.class, 60 * 1000),
152-
env.getProperty("publish.retry.interval", int.class, 60 * 1000)
153-
);
154-
}
155-
156145
@Bean
157146
@NotNull InetSocketAddress deviceNetworkAddress(
158147
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
55
import com.ss.mqtt.broker.service.ClientIdRegistry;
66
import com.ss.mqtt.broker.service.MqttSessionService;
7-
import com.ss.mqtt.broker.service.PublishRetryService;
87
import com.ss.rlib.common.util.StringUtils;
98
import lombok.RequiredArgsConstructor;
109
import lombok.extern.log4j.Log4j2;
@@ -18,7 +17,6 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
1817

1918
private final @NotNull ClientIdRegistry clientIdRegistry;
2019
private final @NotNull MqttSessionService sessionService;
21-
private final @NotNull PublishRetryService publishRetryService;
2220

2321
@Override
2422
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
@@ -27,7 +25,6 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
2725
}
2826

2927
protected @NotNull Mono<?> releaseImpl(@NotNull T client) {
30-
publishRetryService.unregister(client);
3128

3229
var clientId = client.getClientId();
3330
client.setClientId(StringUtils.EMPTY);

src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@
33
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
44
import com.ss.mqtt.broker.service.ClientIdRegistry;
55
import com.ss.mqtt.broker.service.MqttSessionService;
6-
import com.ss.mqtt.broker.service.PublishRetryService;
76
import org.jetbrains.annotations.NotNull;
87

98
public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {
109

1110
public DefaultMqttClientReleaseHandler(
1211
@NotNull ClientIdRegistry clientIdRegistry,
13-
@NotNull MqttSessionService sessionService,
14-
@NotNull PublishRetryService publishRetryService
12+
@NotNull MqttSessionService sessionService
1513
) {
16-
super(clientIdRegistry, sessionService, publishRetryService);
14+
super(clientIdRegistry, sessionService);
1715
}
1816
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,31 @@
11
package com.ss.mqtt.broker.handler.packet.in;
22

3+
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
34
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
45
import static com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;
5-
import static com.ss.mqtt.broker.model.MqttPropertyConstants.*;
66
import static com.ss.mqtt.broker.util.ReactorUtils.ifTrue;
77
import com.ss.mqtt.broker.exception.ConnectionRejectException;
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
9-
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
109
import com.ss.mqtt.broker.model.MqttSession;
10+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1111
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1212
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
1313
import com.ss.mqtt.broker.service.AuthenticationService;
1414
import com.ss.mqtt.broker.service.ClientIdRegistry;
1515
import com.ss.mqtt.broker.service.MqttSessionService;
16-
import com.ss.mqtt.broker.service.PublishRetryService;
1716
import com.ss.rlib.common.util.StringUtils;
1817
import lombok.RequiredArgsConstructor;
18+
import lombok.extern.log4j.Log4j2;
1919
import org.jetbrains.annotations.NotNull;
2020
import reactor.core.publisher.Mono;
2121

22+
@Log4j2
2223
@RequiredArgsConstructor
2324
public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, ConnectInPacket> {
2425

2526
private final @NotNull ClientIdRegistry clientIdRegistry;
2627
private final @NotNull AuthenticationService authenticationService;
2728
private final @NotNull MqttSessionService mqttSessionService;
28-
private final @NotNull PublishRetryService publishRetryService;
2929

3030
@Override
3131
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
@@ -127,19 +127,27 @@ private Mono<Boolean> onConnected(
127127
packet.isRequestProblemInformation()
128128
);
129129

130-
client.send(client.getPacketOutFactory().newConnectAck(
130+
var response = client.getPacketOutFactory().newConnectAck(
131131
client,
132132
ConnectAckReasonCode.SUCCESS,
133133
sessionRestored,
134134
packet.getClientId(),
135135
packet.getSessionExpiryInterval(),
136136
packet.getKeepAlive(),
137137
packet.getReceiveMax()
138-
));
138+
);
139+
140+
return Mono.fromFuture(client.sendWithFeedback(response)
141+
.thenApply(result -> {
139142

140-
publishRetryService.register(client);
143+
if (!result) {
144+
log.warn("Was issue with sending conn ack packet to client {}", client.getClientId());
145+
return false;
146+
}
141147

142-
return Mono.just(Boolean.TRUE);
148+
session.resendPendingPackets(client);
149+
return true;
150+
}));
143151
}
144152

145153
private boolean checkPacketException(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {

src/main/java/com/ss/mqtt/broker/handler/publish/out/PersistentPublishOutHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ protected void handleImpl(
2727
}
2828

2929
@Override
30-
public void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) {
30+
public void resend(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) {
3131
sendPublish(client, packet, packetId, true);
3232
}
3333
}

src/main/java/com/ss/mqtt/broker/model/MqttSession.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ interface UnsafeMqttSession extends MqttSession {
1212
void setExpirationTime(long expirationTime);
1313

1414
void clear();
15+
16+
void onPersisted();
17+
18+
void onRestored();
1519
}
1620

1721
interface PendingPacketHandler {
@@ -21,7 +25,7 @@ interface PendingPacketHandler {
2125
*/
2226
boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response);
2327

24-
default void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) {}
28+
default void resend(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) {}
2529
}
2630

2731
@NotNull String getClientId();
@@ -33,11 +37,11 @@ default void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket pac
3337
*/
3438
long getExpirationTime();
3539

36-
void removeExpiredPackets();
37-
void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval);
40+
void resendPendingPackets(@NotNull MqttClient client);
3841

3942
boolean hasOutPending();
4043
boolean hasInPending();
44+
4145
boolean hasInPending(int packetId);
4246
boolean hasOutPending(int packetId);
4347

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 15 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -23,41 +23,9 @@ public class DefaultMqttSession implements UnsafeMqttSession {
2323
@Getter
2424
@AllArgsConstructor
2525
private static class PendingPublish {
26-
2726
private final @NotNull PublishInPacket publish;
2827
private final @NotNull PendingPacketHandler handler;
29-
private final long registeredTime;
3028
private final int packetId;
31-
32-
private volatile long lastAttemptTime;
33-
}
34-
35-
private static void removeExpiredPackets(@NotNull Array<PendingPublish> publishes) {
36-
37-
var currentTime = System.currentTimeMillis();
38-
var array = publishes.array();
39-
40-
for (int i = 0, length = publishes.size(); i < length; i++) {
41-
42-
var pendingPublish = array[i];
43-
44-
var publish = pendingPublish.publish;
45-
var messageExpiryInterval = publish.getMessageExpiryInterval();
46-
47-
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ||
48-
messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) {
49-
continue;
50-
}
51-
52-
var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000);
53-
54-
if (expiredTime < currentTime) {
55-
log.debug("Remove pending publish {} by expiration reason", publish);
56-
publishes.fastRemove(i);
57-
i--;
58-
length--;
59-
}
60-
}
6129
}
6230

6331
private static void registerPublish(
@@ -66,11 +34,7 @@ private static void registerPublish(
6634
int packetId,
6735
@NotNull ConcurrentArray<PendingPublish> pendingPublishes
6836
) {
69-
70-
var currentTime = System.currentTimeMillis();
71-
var pendingPublish = new PendingPublish(publish, handler, currentTime, packetId, currentTime);
72-
73-
pendingPublishes.runInWriteLock(pendingPublish, Array::add);
37+
pendingPublishes.runInWriteLock(new PendingPublish(publish, handler, packetId), Array::add);
7438
}
7539

7640
private void updatePendingPacket(
@@ -180,33 +144,11 @@ public boolean hasInPending(int packetId) {
180144
}
181145

182146
@Override
183-
public void removeExpiredPackets() {
184-
if (!pendingOutPublishes.isEmpty()) {
185-
pendingOutPublishes.runInWriteLock(DefaultMqttSession::removeExpiredPackets);
186-
}
187-
}
188-
189-
@Override
190-
public void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval) {
191-
var currentTime = System.currentTimeMillis();
192-
var stamp = pendingOutPublishes.readLock();
193-
try {
194-
195-
for (var pendingPublish : pendingOutPublishes) {
196-
197-
if (currentTime - pendingPublish.lastAttemptTime <= retryInterval) {
198-
continue;
199-
}
200-
201-
log.debug("Re-try to send publish {}", pendingPublish.publish);
202-
203-
pendingPublish.lastAttemptTime = currentTime;
204-
pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId);
205-
}
206-
207-
} finally {
208-
pendingOutPublishes.readUnlock(stamp);
209-
}
147+
public void resendPendingPackets(@NotNull MqttClient mqttClient) {
148+
pendingOutPublishes.forEachInReadLock(mqttClient, (client, pending) -> {
149+
log.debug("Re-try to send publish {}", pending.publish);
150+
pending.handler.resend(client, pending.publish, pending.packetId);
151+
});
210152
}
211153

212154
@Override
@@ -221,6 +163,15 @@ public void updateInPendingPacket(@NotNull MqttClient client, @NotNull HasPacket
221163

222164
@Override
223165
public void clear() {
166+
pendingInPublishes.runInWriteLock(Collection::clear);
224167
pendingOutPublishes.runInWriteLock(Collection::clear);
225168
}
169+
170+
@Override
171+
public void onPersisted() {
172+
pendingInPublishes.runInWriteLock(Collection::clear);
173+
}
174+
175+
@Override
176+
public void onRestored() { }
226177
}

src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.jetbrains.annotations.Nullable;
2020
import reactor.core.publisher.Mono;
2121

22+
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
@Getter
@@ -92,6 +93,11 @@ public void send(@NotNull MqttWritablePacket packet) {
9293
connection.send(packet);
9394
}
9495

96+
@Override
97+
public @NotNull CompletableFuture<Boolean> sendWithFeedback(@NotNull MqttWritablePacket packet) {
98+
return connection.sendWithFeedback(packet);
99+
}
100+
95101
public void reject(@NotNull ConnectAckReasonCode reasonCode) {
96102
connection
97103
.sendWithFeedback(getPacketOutFactory().newConnectAck(this, reasonCode))

src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.jetbrains.annotations.Nullable;
1212
import reactor.core.publisher.Mono;
1313

14+
import java.util.concurrent.CompletableFuture;
15+
1416
public interface MqttClient {
1517

1618
interface UnsafeMqttClient extends MqttClient {
@@ -52,4 +54,5 @@ void configure(
5254
long getSessionExpiryInterval();
5355

5456
void send(@NotNull MqttWritablePacket packet);
57+
@NotNull CompletableFuture<Boolean> sendWithFeedback(@NotNull MqttWritablePacket packet);
5558
}

src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)