Skip to content

Commit bc4b9bb

Browse files
committed
Merge branch 'develop' into feature-broker-11
# Conflicts: # src/main/java/com/ss/mqtt/broker/network/packet/in/handler/ConnectInPacketHandler.java
2 parents 8264b2c + 3d8679c commit bc4b9bb

26 files changed

+301
-131
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,36 @@ private interface ChannelFactory extends
172172
int.class,
173173
MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT
174174
),
175+
env.getProperty(
176+
"mqtt.connection.min.keep.alive",
177+
int.class,
178+
MqttPropertyConstants.SERVER_KEEP_ALIVE_DEFAULT
179+
),
180+
env.getProperty(
181+
"mqtt.connection.receive.maximum",
182+
int.class,
183+
MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT
184+
),
185+
env.getProperty(
186+
"mqtt.connection.topic.alias.maximum",
187+
int.class,
188+
MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DISABLED
189+
),
190+
env.getProperty(
191+
"mqtt.connection.default.session.expiration.time",
192+
long.class,
193+
MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT
194+
),
195+
env.getProperty(
196+
"mqtt.connection.keep.alive.enabled",
197+
boolean.class,
198+
MqttPropertyConstants.KEEP_ALIVE_ENABLED_DEFAULT
199+
),
200+
env.getProperty(
201+
"mqtt.connection.sessions.enabled",
202+
boolean.class,
203+
MqttPropertyConstants.SESSIONS_ENABLED_DEFAULT
204+
),
175205
env.getProperty(
176206
"mqtt.connection.retain.available",
177207
boolean.class,
@@ -185,7 +215,7 @@ private interface ChannelFactory extends
185215
env.getProperty(
186216
"mqtt.connection.subscription.id.available",
187217
boolean.class,
188-
MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE
218+
MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT
189219
),
190220
env.getProperty(
191221
"mqtt.connection.shared.subscription.available",

src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@ public class MqttConnectionConfig {
1212
private final @NotNull QoS maxQos;
1313

1414
private final int maximumPacketSize;
15+
private final int minKeepAliveTime;
16+
private final int receiveMaximum;
17+
private final int topicAliasMaximum;
1518

19+
private final long defaultSessionExpiryInterval;
20+
21+
private final boolean keepAliveEnabled;
22+
private final boolean sessionsEnabled;
1623
private final boolean retainAvailable;
1724
private final boolean wildcardSubscriptionAvailable;
1825
private final boolean subscriptionIdAvailable;

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,32 @@ public interface MqttPropertyConstants {
66

77
int MAXIMUM_PROTOCOL_PACKET_SIZE = 256 * 1024 * 1024;
88

9-
long SESSION_EXPIRY_INTERVAL_DEFAULT = 0;
9+
long SESSION_EXPIRY_INTERVAL_DISABLED = 0;
10+
long SESSION_EXPIRY_INTERVAL_DEFAULT = 120;
1011
long SESSION_EXPIRY_INTERVAL_MIN = 0;
1112
long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFFL;
1213
long SESSION_EXPIRY_INTERVAL_UNDEFINED = -1;
1314

14-
int RECEIVE_MAXIMUM_DEFAULT = 0xFFFF;
15+
int RECEIVE_MAXIMUM_UNDEFINED = -1;
1516
int RECEIVE_MAXIMUM_MIN = 1;
16-
int RECEIVE_MAXIMUM_MAX = RECEIVE_MAXIMUM_DEFAULT;
17+
int RECEIVE_MAXIMUM_DEFAULT = 10;
18+
int RECEIVE_MAXIMUM_MAX = 0xFFFF;
1719

18-
int MAXIMUM_PACKET_SIZE_DEFAULT = MAXIMUM_PROTOCOL_PACKET_SIZE;
20+
int MAXIMUM_PACKET_SIZE_UNDEFINED = -1;
21+
int MAXIMUM_PACKET_SIZE_DEFAULT = 1024;
1922
int MAXIMUM_PACKET_SIZE_MIN = 1;
2023
int MAXIMUM_PACKET_SIZE_MAX = MAXIMUM_PROTOCOL_PACKET_SIZE;
2124

2225
boolean PAYLOAD_FORMAT_INDICATOR_DEFAULT = false;
2326

2427
long MESSAGE_EXPIRY_INTERVAL_DEFAULT = 0;
2528

26-
int TOPIC_ALIAS_MAXIMUM_DEFAULT = 0;
29+
int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1;
30+
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;
2731

2832
int SERVER_KEEP_ALIVE_UNDEFINED = -1;
33+
int SERVER_KEEP_ALIVE_DISABLED = 0;
34+
int SERVER_KEEP_ALIVE_DEFAULT = 0;
2935
int SERVER_KEEP_ALIVE_MIN = 0;
3036
int SERVER_KEEP_ALIVE_MAX = 0xFFFF;
3137

@@ -35,8 +41,10 @@ public interface MqttPropertyConstants {
3541

3642
int SUBSCRIPTION_ID_NOT_DEFINED = 0;
3743

38-
boolean RETAIN_AVAILABLE_DEFAULT = true;
39-
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = true;
40-
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = true;
41-
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE = true;
44+
boolean SESSIONS_ENABLED_DEFAULT = false;
45+
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;
46+
boolean RETAIN_AVAILABLE_DEFAULT = false;
47+
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
48+
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
49+
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = false;
4250
}

src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
public class SubscribeTopicFilter {
99

1010
/**
11-
* The subscriber's topic filter.
11+
* The subscriber's topic name.
1212
*/
13-
private final String topicFilter;
13+
private final String topicName;
1414

1515
/**
1616
* Maximum QoS field. This gives the maximum QoS level at which the Server

src/main/java/com/ss/mqtt/broker/network/client/UnsafeMqttClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ void configure(
1818
int receiveMax,
1919
int maximumPacketSize,
2020
int topicAliasMaximum,
21-
int keepAlive
21+
int keepAlive,
22+
boolean requestResponseInformation,
23+
boolean requestProblemInformation
2224
);
2325

2426
void setClientId(@NotNull String clientId);

src/main/java/com/ss/mqtt/broker/network/client/impl/AbstractMqttClient.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,26 @@ public abstract class AbstractMqttClient implements UnsafeMqttClient {
3535
private volatile @Setter @NotNull String clientId;
3636
private volatile @Setter @Getter @Nullable MqttSession session;
3737

38-
private volatile long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
39-
private volatile int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
40-
private volatile int maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
41-
private volatile int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
42-
private volatile int keepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_MAX;
38+
private volatile long sessionExpiryInterval;
39+
private volatile int receiveMax;
40+
private volatile int maximumPacketSize;
41+
private volatile int topicAliasMaximum;
42+
private volatile int keepAlive;
43+
44+
private volatile boolean requestResponseInformation = false;
45+
private volatile boolean requestProblemInformation = false;
4346

4447
public AbstractMqttClient(@NotNull MqttConnection connection, @NotNull MqttClientReleaseHandler releaseHandler) {
4548
this.connection = connection;
4649
this.releaseHandler = releaseHandler;
4750
this.released = new AtomicBoolean(false);
4851
this.clientId = StringUtils.EMPTY;
52+
var config = connection.getConfig();
53+
this.sessionExpiryInterval = config.getDefaultSessionExpiryInterval();
54+
this.receiveMax = config.getReceiveMaximum();
55+
this.maximumPacketSize = config.getMaximumPacketSize();
56+
this.topicAliasMaximum = config.getTopicAliasMaximum();
57+
this.keepAlive = config.getMinKeepAliveTime();
4958
}
5059

5160
@Override
@@ -67,13 +76,17 @@ public void configure(
6776
int receiveMax,
6877
int maximumPacketSize,
6978
int topicAliasMaximum,
70-
int keepAlive
79+
int keepAlive,
80+
boolean requestResponseInformation,
81+
boolean requestProblemInformation
7182
) {
7283
this.sessionExpiryInterval = sessionExpiryInterval;
7384
this.receiveMax = receiveMax;
7485
this.maximumPacketSize = maximumPacketSize;
7586
this.topicAliasMaximum = topicAliasMaximum;
7687
this.keepAlive = keepAlive;
88+
this.requestProblemInformation = requestProblemInformation;
89+
this.requestResponseInformation = requestResponseInformation;
7790
}
7891

7992
@Override

src/main/java/com/ss/mqtt/broker/network/packet/factory/Mqtt311PacketOutFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
1616
@NotNull String requestedClientId,
1717
long requestedSessionExpiryInterval,
1818
int requestedKeepAlive,
19+
int requestedReceiveMax,
1920
@NotNull String reason,
2021
@NotNull String serverReference,
2122
@NotNull String responseInformation,

src/main/java/com/ss/mqtt/broker/network/packet/factory/Mqtt5PacketOutFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
1616
@NotNull String requestedClientId,
1717
long requestedSessionExpiryInterval,
1818
int requestedKeepAlive,
19+
int requestedReceiveMax,
1920
@NotNull String reason,
2021
@NotNull String serverReference,
2122
@NotNull String responseInformation,
@@ -30,6 +31,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
3031
requestedClientId,
3132
requestedSessionExpiryInterval,
3233
requestedKeepAlive,
34+
requestedReceiveMax,
3335
reason,
3436
serverReference,
3537
responseInformation,

src/main/java/com/ss/mqtt/broker/network/packet/factory/MqttPacketOutFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public abstract class MqttPacketOutFactory {
1717
@NotNull String requestedClientId,
1818
long requestedSessionExpiryInterval,
1919
int requestedKeepAlive,
20+
int requestedReceiveMax,
2021
@NotNull String reason,
2122
@NotNull String serverReference,
2223
@NotNull String responseInformation,
@@ -31,7 +32,8 @@ public abstract class MqttPacketOutFactory {
3132
boolean sessionPresent,
3233
@NotNull String requestedClientId,
3334
long requestedSessionExpiryInterval,
34-
int requestedKeepAlive
35+
int requestedKeepAlive,
36+
int requestedReceiveMax
3537
) {
3638
return newConnectAck(
3739
client,
@@ -40,6 +42,7 @@ public abstract class MqttPacketOutFactory {
4042
requestedClientId,
4143
requestedSessionExpiryInterval,
4244
requestedKeepAlive,
45+
requestedReceiveMax,
4346
StringUtils.EMPTY,
4447
StringUtils.EMPTY,
4548
StringUtils.EMPTY,
@@ -60,6 +63,7 @@ public abstract class MqttPacketOutFactory {
6063
StringUtils.EMPTY,
6164
client.getSessionExpiryInterval(),
6265
client.getKeepAlive(),
66+
client.getReceiveMax(),
6367
StringUtils.EMPTY,
6468
StringUtils.EMPTY,
6569
StringUtils.EMPTY,

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectAckInPacket.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,21 +269,21 @@ public ConnectAckInPacket(byte info) {
269269
this.userProperties = Array.empty();
270270
this.reasonCode = ConnectAckReasonCode.SUCCESS;
271271
this.maximumQos = QoS.EXACTLY_ONCE_DELIVERY;
272-
this.sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
273-
this.receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
274272
this.retainAvailable = MqttPropertyConstants.RETAIN_AVAILABLE_DEFAULT;
275273
this.assignedClientId = StringUtils.EMPTY;
276-
this.topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
277274
this.reason = StringUtils.EMPTY;
278275
this.sharedSubscriptionAvailable = MqttPropertyConstants.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT;
279276
this.wildcardSubscriptionAvailable = MqttPropertyConstants.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT;
280-
this.subscriptionIdAvailable = MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
281-
this.serverKeepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_UNDEFINED;
277+
this.subscriptionIdAvailable = MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT;
282278
this.responseInformation = StringUtils.EMPTY;
283279
this.serverReference = StringUtils.EMPTY;
284280
this.authenticationMethod = StringUtils.EMPTY;
285281
this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY;
286-
this.maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
282+
this.serverKeepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_UNDEFINED;
283+
this.maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_UNDEFINED;
284+
this.sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_UNDEFINED;
285+
this.topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_UNDEFINED;
286+
this.receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_UNDEFINED;
287287
}
288288

289289
@Override

0 commit comments

Comments
 (0)