Skip to content

Commit 75e206e

Browse files
committed
[broker-11] finish fixing issues with mqtt sessions
1 parent bc4b9bb commit 75e206e

File tree

10 files changed

+56
-17
lines changed

10 files changed

+56
-17
lines changed

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public interface MqttPropertyConstants {
4141

4242
int SUBSCRIPTION_ID_NOT_DEFINED = 0;
4343

44-
boolean SESSIONS_ENABLED_DEFAULT = false;
44+
boolean SESSIONS_ENABLED_DEFAULT = true;
4545
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;
4646
boolean RETAIN_AVAILABLE_DEFAULT = false;
4747
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = false;

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import lombok.Getter;
55
import lombok.RequiredArgsConstructor;
66
import lombok.Setter;
7+
import lombok.ToString;
78
import org.jetbrains.annotations.NotNull;
89

10+
@ToString
911
@RequiredArgsConstructor
1012
public class DefaultMqttSession implements UnsafeMqttSession {
1113

src/main/java/com/ss/mqtt/broker/network/client/impl/AbstractMqttClientReleaseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
3838

3939
Mono<?> asyncActions = null;
4040

41-
if (session != null) {
42-
asyncActions = sessionService.store(clientId, session);
41+
if (session != null && client.getConnectionConfig().isSessionsEnabled()) {
42+
asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval());
4343
client.setSession(null);
4444
}
4545

src/main/java/com/ss/mqtt/broker/network/packet/in/DisconnectInPacket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B
9393
}
9494
}
9595

96+
@Override
97+
protected boolean isPropertiesSupported(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) {
98+
return connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining();
99+
}
100+
96101
@Override
97102
protected @NotNull Set<PacketProperty> getAvailableProperties() {
98103
return AVAILABLE_PROPERTIES;

src/main/java/com/ss/mqtt/broker/network/packet/in/handler/ConnectInPacketHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,14 @@ private Mono<Boolean> onConnected(
8282
) {
8383

8484
var connection = client.getConnection();
85+
var config = connection.getConfig();
8586

8687
// if it was closed in parallel
87-
if (connection.isClosed()) {
88+
if (connection.isClosed() && config.isSessionsEnabled()) {
8889
// store the session again
89-
return mqttSessionService.store(client.getClientId(), session);
90+
return mqttSessionService.store(client.getClientId(), session, config.getDefaultSessionExpiryInterval());
9091
}
9192

92-
var config = connection.getConfig();
93-
9493
// select result keep alive time
9594
var minimalKeepAliveTime = Math.max(config.getMinKeepAliveTime(), packet.getKeepAlive());
9695
var keepAlive = config.isKeepAliveEnabled() ? minimalKeepAliveTime : SERVER_KEEP_ALIVE_DISABLED;
@@ -115,6 +114,7 @@ private Mono<Boolean> onConnected(
115114
var topicAliasMaximum = packet.getTopicAliasMaximum() == TOPIC_ALIAS_MAXIMUM_UNDEFINED ?
116115
TOPIC_ALIAS_MAXIMUM_DISABLED : Math.min(packet.getTopicAliasMaximum(), config.getTopicAliasMaximum());
117116

117+
client.setSession(session);
118118
client.configure(
119119
sessionExpiryInterval,
120120
receiveMax,

src/main/java/com/ss/mqtt/broker/service/MqttSessionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ public interface MqttSessionService {
1010

1111
@NotNull Mono<MqttSession> create(@NotNull String clientId);
1212

13-
@NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session);
13+
@NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval);
1414
}

src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
import com.ss.rlib.common.util.dictionary.ConcurrentObjectDictionary;
1010
import com.ss.rlib.common.util.dictionary.DictionaryFactory;
1111
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
12+
import lombok.extern.log4j.Log4j2;
1213
import org.jetbrains.annotations.NotNull;
1314
import reactor.core.publisher.Mono;
1415

1516
import java.io.Closeable;
1617

18+
@Log4j2
1719
public class InMemoryMqttSessionService implements MqttSessionService, Closeable {
1820

1921
private final @NotNull ConcurrentObjectDictionary<String, MqttSession> storedSession;
@@ -34,18 +36,42 @@ public InMemoryMqttSessionService(int cleanInterval) {
3436

3537
@Override
3638
public @NotNull Mono<MqttSession> restore(@NotNull String clientId) {
39+
3740
var session = storedSession.getInWriteLock(clientId, ObjectDictionary::remove);
41+
42+
if (session != null) {
43+
log.debug("Restored session for client {}", clientId);
44+
} else {
45+
log.debug("No stored session for client {}", clientId);
46+
}
47+
3848
return Mono.justOrEmpty(session);
3949
}
4050

4151
@Override
4252
public @NotNull Mono<MqttSession> create(@NotNull String clientId) {
53+
54+
var session = storedSession.getInWriteLock(clientId, ObjectDictionary::remove);
55+
56+
if (session != null) {
57+
log.debug("Removed old session for client {}", clientId);
58+
}
59+
60+
log.debug("Created new session for client {}", clientId);
61+
4362
return Mono.just(new DefaultMqttSession(clientId));
4463
}
4564

4665
@Override
47-
public @NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session) {
66+
public @NotNull Mono<Boolean> store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval) {
67+
68+
var unsafe = (MqttSession.UnsafeMqttSession) session;
69+
unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000));
70+
4871
storedSession.runInWriteLock(clientId, session, ObjectDictionary::put);
72+
73+
log.debug("Stored session for client {}", clientId);
74+
4975
return Mono.just(Boolean.TRUE);
5076
}
5177

@@ -79,6 +105,8 @@ private void cleanup() {
79105

80106
var removed = dictionary.remove(session.getClientId());
81107

108+
log.debug("Removed expired session for client {}", session.getClientId());
109+
82110
// if we already have new session under the same client id
83111
if (removed != null && removed != session) {
84112
dictionary.put(session.getClientId(), removed);

src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class ConnectionTest extends MqttBrokerTest {
2121
then:
2222
result.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS
2323
result.sessionExpiryInterval.present
24-
result.sessionExpiryInterval.getAsLong() == MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DISABLED
24+
result.sessionExpiryInterval.getAsLong() == MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT
2525
result.serverKeepAlive.present
2626
result.serverKeepAlive.getAsInt() == MqttPropertyConstants.SERVER_KEEP_ALIVE_DISABLED
2727
!result.serverReference.present
@@ -40,7 +40,7 @@ class ConnectionTest extends MqttBrokerTest {
4040
then:
4141
result.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS
4242
result.sessionExpiryInterval.present
43-
result.sessionExpiryInterval.getAsLong() == MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DISABLED
43+
result.sessionExpiryInterval.getAsLong() == MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT
4444
result.serverKeepAlive.present
4545
result.serverKeepAlive.getAsInt() == MqttPropertyConstants.SERVER_KEEP_ALIVE_DISABLED
4646
!result.serverReference.present

src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class ClientIdRegistryTest extends MqttBrokerTest {
117117
!clientIdRegistry.register(clientId).block()
118118
when:
119119
client.disconnect().join()
120-
Thread.sleep(50)
120+
Thread.sleep(100)
121121
then:
122122
clientIdRegistry.register(clientId).block()
123123
clientIdRegistry.unregister(clientId).block()

src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.test.integration.service
22

33
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
4+
import com.ss.mqtt.broker.config.MqttConnectionConfig
45
import com.ss.mqtt.broker.service.ClientIdRegistry
56
import com.ss.mqtt.broker.service.MqttSessionService
67
import com.ss.mqtt.broker.test.integration.MqttBrokerTest
@@ -14,6 +15,9 @@ class MqttSessionServiceTest extends MqttBrokerTest {
1415
@Autowired
1516
MqttSessionService mqttSessionService
1617

18+
@Autowired
19+
MqttConnectionConfig connectionConfig
20+
1721
def "subscriber should create and re-use mqtt session"() {
1822
given:
1923
def clientId = clientIdRegistry.generate().block()
@@ -27,21 +31,21 @@ class MqttSessionServiceTest extends MqttBrokerTest {
2731
mqttSessionService.restore(clientId).block() == null
2832
when:
2933
client.disconnect().join()
30-
Thread.sleep(50)
34+
Thread.sleep(100)
3135
def restored = mqttSessionService.restore(clientId).block()
32-
mqttSessionService.store(clientId, restored).block()
3336
then:
3437
restored != null
3538
when:
39+
mqttSessionService.store(clientId, restored, connectionConfig.getDefaultSessionExpiryInterval()).block()
3640
client.connect().join()
3741
shouldNoSession = mqttSessionService.restore(clientId).block()
3842
then:
3943
shouldNoSession == null
4044
when:
4145
client.disconnect().join()
42-
Thread.sleep(50)
43-
def restored2 = mqttSessionService.restore(clientId).block()
46+
Thread.sleep(100)
47+
restored = mqttSessionService.restore(clientId).block()
4448
then:
45-
restored == restored2
49+
restored != null
4650
}
4751
}

0 commit comments

Comments
 (0)