Skip to content

Commit 0ffaf7b

Browse files
committed
[broker-15] working on test coverage
1 parent d36a5bc commit 0ffaf7b

22 files changed

+191
-44
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.ss.mqtt.broker.config;
22

33
import com.ss.mqtt.broker.model.QoS;
4+
import lombok.Data;
45
import lombok.Getter;
6+
import lombok.NoArgsConstructor;
57
import lombok.RequiredArgsConstructor;
68
import org.jetbrains.annotations.NotNull;
79

@@ -24,4 +26,7 @@ public class MqttConnectionConfig {
2426
private final boolean wildcardSubscriptionAvailable;
2527
private final boolean subscriptionIdAvailable;
2628
private final boolean sharedSubscriptionAvailable;
29+
30+
31+
@NoArgsConstructor @Data class MyClass { final String field = "";}
2732
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ public class PublishAckInPacketHandler extends AbstractPacketHandler<UnsafeMqttC
1010

1111
@Override
1212
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) {
13-
client.getSession().unregisterPendingPacket(client, packet);
13+
client.getSession().updatePendingPacket(client, packet);
1414
}
1515
}

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ public interface MqttPropertyConstants {
2525

2626
boolean PAYLOAD_FORMAT_INDICATOR_DEFAULT = false;
2727

28-
long MESSAGE_EXPIRY_INTERVAL_DEFAULT = 0;
2928
long MESSAGE_EXPIRY_INTERVAL_UNDEFINED = -1;
29+
long MESSAGE_EXPIRY_INTERVAL_INFINITY = 0;
3030

3131
int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1;
3232
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;

src/main/java/com/ss/mqtt/broker/model/MqttSession.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ interface PendingPacketHandler {
3636
void removeExpiredPackets();
3737
void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval);
3838

39+
boolean hasPendingPackets();
40+
3941
void registerPendingPublish(@NotNull PublishInPacket publish, @NotNull PendingPacketHandler handler, int packetId);
40-
void unregisterPendingPacket(@NotNull MqttClient client, @NotNull HasPacketId response);
42+
void updatePendingPacket(@NotNull MqttClient client, @NotNull HasPacketId response);
4143
}

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public void registerPendingPublish(
7474
pendingPublishes.runInWriteLock(pendingPublish, Array::add);
7575
}
7676

77+
@Override
78+
public boolean hasPendingPackets() {
79+
return !pendingPublishes.isEmpty();
80+
}
81+
7782
@Override
7883
public void removeExpiredPackets() {
7984

@@ -93,13 +98,15 @@ public void removeExpiredPackets() {
9398
var publish = pendingPublish.publish;
9499
var messageExpiryInterval = publish.getMessageExpiryInterval();
95100

96-
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED) {
101+
if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ||
102+
messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) {
97103
continue;
98104
}
99105

100106
var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000);
101107

102108
if (expiredTime < currentTime) {
109+
log.debug("Remove pending publish {} by expiration reason", publish);
103110
publishes.fastRemove(i);
104111
i--;
105112
length--;
@@ -113,19 +120,26 @@ public void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInter
113120
var currentTime = System.currentTimeMillis();
114121
var stamp = pendingPublishes.readLock();
115122
try {
123+
116124
for (var pendingPublish : pendingPublishes) {
117-
if (currentTime - pendingPublish.lastAttemptTime > retryInterval) {
118-
pendingPublish.lastAttemptTime = currentTime;
119-
pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId);
125+
126+
if (currentTime - pendingPublish.lastAttemptTime <= retryInterval) {
127+
continue;
120128
}
129+
130+
log.debug("Re-try to send publish {}", pendingPublish.publish);
131+
132+
pendingPublish.lastAttemptTime = currentTime;
133+
pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId);
121134
}
135+
122136
} finally {
123137
pendingPublishes.readUnlock(stamp);
124138
}
125139
}
126140

127141
@Override
128-
public void unregisterPendingPacket(
142+
public void updatePendingPacket(
129143
@NotNull MqttClient client,
130144
@NotNull HasPacketId response
131145
) {

src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public class PublishInPacket extends MqttReadablePacket {
265265

266266
private @NotNull byte[] correlationData;
267267

268-
private long messageExpiryInterval = MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT;
268+
private long messageExpiryInterval = MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED;
269269
private int topicAlias = MqttPropertyConstants.TOPIC_ALIAS_DEFAULT;
270270
private boolean payloadFormatIndicator = MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT;
271271

@@ -315,8 +315,8 @@ protected void applyProperty(@NotNull PacketProperty property, long value) {
315315
topicAlias = NumberUtils.validate(
316316
(int) value,
317317
MqttPropertyConstants.TOPIC_ALIAS_MIN,
318-
MqttPropertyConstants.TOPIC_ALIAS_MAX)
319-
;
318+
MqttPropertyConstants.TOPIC_ALIAS_MAX
319+
);
320320
break;
321321
case MESSAGE_EXPIRY_INTERVAL:
322322
messageExpiryInterval = value;

src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ protected void writeProperties(@NotNull ByteBuffer buffer) {
176176
writeProperty(buffer,
177177
PacketProperty.MESSAGE_EXPIRY_INTERVAL,
178178
0,
179-
MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT
179+
MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED
180180
);
181181
writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topicAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT);
182182
writeNotEmptyProperty(buffer, PacketProperty.RESPONSE_TOPIC, responseTopic);

src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ public interface PublishRetryService {
88
void register(@NotNull MqttClient client);
99

1010
void unregister(@NotNull MqttClient client);
11+
12+
boolean exist(@NotNull String clientId);
1113
}

src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import com.ss.rlib.common.util.array.Array;
77
import com.ss.rlib.common.util.array.ArrayFactory;
88
import com.ss.rlib.common.util.array.ConcurrentArray;
9+
import lombok.extern.log4j.Log4j2;
910
import org.jetbrains.annotations.NotNull;
1011

1112
import java.io.Closeable;
1213

14+
@Log4j2
1315
public class DefaultPublishRetryService implements PublishRetryService, Closeable {
1416

1517
private final @NotNull ConcurrentArray<MqttClient> registeredClients;
@@ -39,6 +41,11 @@ public void unregister(@NotNull MqttClient client) {
3941
registeredClients.runInWriteLock(client, Array::fastRemove);
4042
}
4143

44+
@Override
45+
public boolean exist(@NotNull String clientId) {
46+
return registeredClients.anyMatchInReadLock(clientId, (id, client) -> id.equals(client.getClientId()));
47+
}
48+
4249
private void checkPendingPackets() {
4350

4451
var toCheck = ArrayFactory.newArray(MqttClient.class);
@@ -53,6 +60,8 @@ private void checkPendingPackets() {
5360

5461
registeredClients.runInReadLock(toCheck, Array::copyTo);
5562

63+
log.debug("Check pending packets in {} client(s)", toCheck.size());
64+
5665
for (var client : toCheck) {
5766

5867
var session = client.getSession();
@@ -68,7 +77,7 @@ private void checkPendingPackets() {
6877
}
6978

7079
if (toUnregister.isEmpty()) {
71-
return;
80+
continue;
7281
}
7382

7483
// unregister closed clients
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.ss.mqtt.broker.test
2+
3+
import spock.lang.Specification
4+
5+
class UnitSpecification extends Specification {
6+
}

0 commit comments

Comments
 (0)