Skip to content

Commit 8116d5c

Browse files
committed
[broker-15] base implementation of publishing with QoS 1
1 parent c15acb3 commit 8116d5c

File tree

11 files changed

+107
-73
lines changed

11 files changed

+107
-73
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ allprojects {
3232

3333
ext {
3434
annotationVersion = "17.0.0"
35-
rlibVersion = "9.5.0"
35+
rlibVersion = "9.6.0"
3636
lombokVersion = '1.18.4'
3737
springbootVersion = '2.2.0.RELEASE'
3838
springVersion = '5.1.6.RELEASE'

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,6 @@ private interface ChannelFactory extends
8888
);
8989
}
9090

91-
@Bean
92-
@NotNull PacketIdGenerator packetIdGenerator() {
93-
return new DefaultPacketIdGenerator();
94-
}
95-
9691
@Bean
9792
PacketInHandler @NotNull [] devicePacketHandlers(
9893
@NotNull AuthenticationService authenticationService,
@@ -164,10 +159,10 @@ private interface ChannelFactory extends
164159
}
165160

166161
@Bean
167-
@NotNull PublishOutHandler[] publishOutHandlers(@NotNull PacketIdGenerator packetIdGenerator) {
162+
@NotNull PublishOutHandler[] publishOutHandlers() {
168163
return new PublishOutHandler[] {
169164
new Qos0PublishOutHandler(),
170-
new Qos1PublishOutHandler(packetIdGenerator),
165+
new Qos1PublishOutHandler(),
171166
new Qos2PublishOutHandler(),
172167
};
173168
}

src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,22 @@
44
import com.ss.mqtt.broker.model.QoS;
55
import com.ss.mqtt.broker.model.Subscriber;
66
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
7-
import com.ss.mqtt.broker.service.PacketIdGenerator;
87
import lombok.RequiredArgsConstructor;
98
import org.jetbrains.annotations.NotNull;
109

1110
@RequiredArgsConstructor
1211
public class Qos1PublishOutHandler extends AbstractPublishOutHandler {
1312

14-
private final @NotNull PacketIdGenerator packetIdGenerator;
15-
1613
@Override
1714
public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
1815

1916
var client = subscriber.getMqttClient();
17+
var session = client.getSession();
2018
var packetOutFactory = client.getPacketOutFactory();
19+
2120
var publish = packetOutFactory.newPublish(
2221
client,
23-
packetIdGenerator.nextPacketId(),
22+
session.nextPacketId(),
2423
QoS.AT_LEAST_ONCE_DELIVERY,
2524
packet.isRetained(),
2625
packet.isDuplicate(),
@@ -33,7 +32,6 @@ public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscrib
3332
packet.getUserProperties()
3433
);
3534

36-
var session = client.getSession();
3735
session.registerPendingPublish(publish);
3836

3937
client.send(publish);

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ public interface MqttPropertyConstants {
55
QoS MAXIMUM_QOS_DEFAULT = QoS.EXACTLY_ONCE_DELIVERY;
66

77
int MAXIMUM_PROTOCOL_PACKET_SIZE = 256 * 1024 * 1024;
8+
int MAXIMUM_PACKET_ID = 0xFFFF;
89

910
long SESSION_EXPIRY_INTERVAL_DISABLED = 0;
1011
long SESSION_EXPIRY_INTERVAL_DEFAULT = 120;

src/main/java/com/ss/mqtt/broker/model/MqttSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ interface PendingCallback<T extends HasPacketId> {
2727

2828
@NotNull String getClientId();
2929

30+
int nextPacketId();
31+
3032
/**
3133
* @return the expiration time in ms or -1 if it should not be expired now.
3234
*/

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ss.mqtt.broker.model.impl;
22

3+
import com.ss.mqtt.broker.model.MqttPropertyConstants;
34
import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession;
45
import com.ss.mqtt.broker.network.client.MqttClient;
56
import com.ss.mqtt.broker.network.packet.HasPacketId;
@@ -16,6 +17,7 @@
1617
import org.jetbrains.annotations.NotNull;
1718

1819
import java.util.Collection;
20+
import java.util.concurrent.atomic.AtomicInteger;
1921

2022
@Log4j2
2123
@ToString(of = "clientId")
@@ -30,12 +32,27 @@ private static class PendingPublish<T extends HasPacketId> {
3032

3133
private final @NotNull String clientId;
3234
private final @NotNull ConcurrentArray<PendingPublish<?>> pendingPublishes;
35+
private final @NotNull AtomicInteger packetIdGenerator;
3336

3437
private volatile @Getter @Setter long expirationTime = -1;
3538

3639
public DefaultMqttSession(@NotNull String clientId) {
3740
this.clientId = clientId;
3841
this.pendingPublishes = ConcurrentArray.ofType(PendingPublish.class);
42+
this.packetIdGenerator = new AtomicInteger(0);
43+
}
44+
45+
@Override
46+
public int nextPacketId() {
47+
48+
var nextId = packetIdGenerator.incrementAndGet();
49+
50+
if (nextId >= MqttPropertyConstants.MAXIMUM_PACKET_ID) {
51+
packetIdGenerator.compareAndSet(nextId, 0);
52+
return nextPacketId();
53+
}
54+
55+
return nextId;
3956
}
4057

4158
@Override
@@ -54,13 +71,10 @@ public <T extends MqttReadablePacket & HasPacketId> void registerPendingPublish(
5471
@NotNull PublishOutPacket publish,
5572
@NotNull PendingCallback<T> callback
5673
) {
57-
// FIXME add new method to array
58-
var stamp = pendingPublishes.writeLock();
59-
try {
60-
pendingPublishes.add(new PendingPublish<>(publish, callback, System.currentTimeMillis()));
61-
} finally {
62-
pendingPublishes.writeUnlock(stamp);
63-
}
74+
pendingPublishes.runInWriteLock(
75+
new PendingPublish<>(publish, callback, System.currentTimeMillis()),
76+
Array::add
77+
);
6478
}
6579

6680
@Override
@@ -70,11 +84,10 @@ public <T extends MqttReadablePacket & HasPacketId> void unregisterPendingPacket
7084
) {
7185

7286
var packetId = feedback.getPacketId();
73-
74-
// FIXME add new method to array
75-
var pendingPublish = pendingPublishes.findAnyInReadLock(
87+
var pendingPublish = pendingPublishes.findAnyConvertedToIntInReadLock(
7688
packetId,
77-
(id, pending) -> id == pending.publish.getPacketId()
89+
pending -> pending.publish.getPacketId(),
90+
(id, targetId) -> id == targetId
7891
);
7992

8093
if (pendingPublish == null) {

src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,16 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B
6868
packetId = readUnsignedShort(buffer);
6969

7070
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901123
71-
if (connection.isSupported(MqttVersion.MQTT_5)) {
71+
if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) {
7272
reasonCode = PublishAckReasonCode.of(readUnsignedByte(buffer));
7373
}
7474
}
7575

76+
@Override
77+
protected boolean isPropertiesSupported(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) {
78+
return connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining();
79+
}
80+
7681
@Override
7782
protected @NotNull Set<PacketProperty> getAvailableProperties() {
7883
return AVAILABLE_PROPERTIES;

src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.service.impl;
22

33
import com.ss.mqtt.broker.model.MqttSession;
4+
import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession;
45
import com.ss.mqtt.broker.model.impl.DefaultMqttSession;
56
import com.ss.mqtt.broker.service.MqttSessionService;
67
import com.ss.rlib.common.concurrent.util.ThreadUtils;
@@ -18,7 +19,7 @@
1819
@Log4j2
1920
public class InMemoryMqttSessionService implements MqttSessionService, Closeable {
2021

21-
private final @NotNull ConcurrentObjectDictionary<String, MqttSession> storedSession;
22+
private final @NotNull ConcurrentObjectDictionary<String, UnsafeMqttSession> storedSession;
2223
private final @NotNull Thread cleanThread;
2324

2425
private final int cleanInterval;
@@ -65,10 +66,10 @@ public InMemoryMqttSessionService(int cleanInterval) {
6566
@Override
6667
public @NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval) {
6768

68-
var unsafe = (MqttSession.UnsafeMqttSession) session;
69+
var unsafe = (UnsafeMqttSession) session;
6970
unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000));
7071

71-
storedSession.runInWriteLock(clientId, session, ObjectDictionary::put);
72+
storedSession.runInWriteLock(clientId, unsafe, ObjectDictionary::put);
7273

7374
log.debug("Stored session for client {}", clientId);
7475

@@ -77,8 +78,8 @@ public InMemoryMqttSessionService(int cleanInterval) {
7778

7879
private void cleanup() {
7980

80-
var toCheck = ArrayFactory.newArray(MqttSession.class);
81-
var toRemove = ArrayFactory.newArray(MqttSession.class);
81+
var toCheck = ArrayFactory.newArray(UnsafeMqttSession.class);
82+
var toRemove = ArrayFactory.newArray(UnsafeMqttSession.class);
8283

8384
while (!closed) {
8485

@@ -110,13 +111,18 @@ private void cleanup() {
110111
// if we already have new session under the same client id
111112
if (removed != null && removed != session) {
112113
dictionary.put(session.getClientId(), removed);
114+
} else if (removed != null) {
115+
removed.clear();
113116
}
114117
}
115118
});
116119
}
117120
}
118121

119-
private boolean findToRemove(@NotNull Array<MqttSession> toCheck, @NotNull Array<MqttSession> toRemove) {
122+
private boolean findToRemove(
123+
@NotNull Array<UnsafeMqttSession> toCheck,
124+
@NotNull Array<UnsafeMqttSession> toRemove
125+
) {
120126

121127
var currentTime = System.currentTimeMillis();
122128

0 commit comments

Comments
 (0)