Skip to content

Commit 84f8146

Browse files
committed
[broker-8] update working with connect properties
1 parent 5347f06 commit 84f8146

17 files changed

+198
-72
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,36 @@ private interface ChannelFactory extends
138138
int.class,
139139
MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT
140140
),
141+
env.getProperty(
142+
"mqtt.connection.min.keep.alive",
143+
int.class,
144+
MqttPropertyConstants.SERVER_KEEP_ALIVE_DEFAULT
145+
),
146+
env.getProperty(
147+
"mqtt.connection.receive.maximum",
148+
int.class,
149+
MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT
150+
),
151+
env.getProperty(
152+
"mqtt.connection.topic.alias.maximum",
153+
int.class,
154+
MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DISABLED
155+
),
156+
env.getProperty(
157+
"mqtt.connection.default.session.expiration.time",
158+
long.class,
159+
MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT
160+
),
161+
env.getProperty(
162+
"mqtt.connection.keep.alive.enabled",
163+
boolean.class,
164+
MqttPropertyConstants.KEEP_ALIVE_ENABLED_DEFAULT
165+
),
166+
env.getProperty(
167+
"mqtt.connection.sessions.enabled",
168+
boolean.class,
169+
MqttPropertyConstants.SESSIONS_ENABLED_DEFAULT
170+
),
141171
env.getProperty(
142172
"mqtt.connection.retain.available",
143173
boolean.class,
@@ -151,7 +181,7 @@ private interface ChannelFactory extends
151181
env.getProperty(
152182
"mqtt.connection.subscription.id.available",
153183
boolean.class,
154-
MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE
184+
MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT
155185
),
156186
env.getProperty(
157187
"mqtt.connection.shared.subscription.available",

src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@ public class MqttConnectionConfig {
1212
private final @NotNull QoS maxQos;
1313

1414
private final int maximumPacketSize;
15+
private final int minKeepAliveTime;
16+
private final int receiveMaximum;
17+
private final int topicAliasMaximum;
1518

19+
private final long defaultSessionExpiryInterval;
20+
21+
private final boolean keepAliveEnabled;
22+
private final boolean sessionsEnabled;
1623
private final boolean retainAvailable;
1724
private final boolean wildcardSubscriptionAvailable;
1825
private final boolean subscriptionIdAvailable;

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,32 @@ public interface MqttPropertyConstants {
66

77
int MAXIMUM_PROTOCOL_PACKET_SIZE = 256 * 1024 * 1024;
88

9-
long SESSION_EXPIRY_INTERVAL_DEFAULT = 0;
9+
long SESSION_EXPIRY_INTERVAL_DISABLED = 0;
10+
long SESSION_EXPIRY_INTERVAL_DEFAULT = 120;
1011
long SESSION_EXPIRY_INTERVAL_MIN = 0;
1112
long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFFL;
1213
long SESSION_EXPIRY_INTERVAL_UNDEFINED = -1;
1314

14-
int RECEIVE_MAXIMUM_DEFAULT = 0xFFFF;
15+
int RECEIVE_MAXIMUM_UNDEFINED = -1;
1516
int RECEIVE_MAXIMUM_MIN = 1;
16-
int RECEIVE_MAXIMUM_MAX = RECEIVE_MAXIMUM_DEFAULT;
17+
int RECEIVE_MAXIMUM_DEFAULT = 10;
18+
int RECEIVE_MAXIMUM_MAX = 0xFFFF;
1719

18-
int MAXIMUM_PACKET_SIZE_DEFAULT = MAXIMUM_PROTOCOL_PACKET_SIZE;
20+
int MAXIMUM_PACKET_SIZE_UNDEFINED = -1;
21+
int MAXIMUM_PACKET_SIZE_DEFAULT = 1024;
1922
int MAXIMUM_PACKET_SIZE_MIN = 1;
2023
int MAXIMUM_PACKET_SIZE_MAX = MAXIMUM_PROTOCOL_PACKET_SIZE;
2124

2225
boolean PAYLOAD_FORMAT_INDICATOR_DEFAULT = false;
2326

2427
long MESSAGE_EXPIRY_INTERVAL_DEFAULT = 0;
2528

26-
int TOPIC_ALIAS_MAXIMUM_DEFAULT = 0;
29+
int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1;
30+
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;
2731

2832
int SERVER_KEEP_ALIVE_UNDEFINED = -1;
33+
int SERVER_KEEP_ALIVE_DISABLED = 0;
34+
int SERVER_KEEP_ALIVE_DEFAULT = 0;
2935
int SERVER_KEEP_ALIVE_MIN = 0;
3036
int SERVER_KEEP_ALIVE_MAX = 0xFFFF;
3137

@@ -35,8 +41,10 @@ public interface MqttPropertyConstants {
3541

3642
int SUBSCRIPTION_ID_NOT_DEFINED = 0;
3743

38-
boolean RETAIN_AVAILABLE_DEFAULT = true;
39-
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = true;
40-
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = true;
41-
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE = true;
44+
boolean SESSIONS_ENABLED_DEFAULT = false;
45+
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;
46+
boolean RETAIN_AVAILABLE_DEFAULT = false;
47+
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
48+
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
49+
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = false;
4250
}

src/main/java/com/ss/mqtt/broker/network/client/UnsafeMqttClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ void configure(
1616
int receiveMax,
1717
int maximumPacketSize,
1818
int topicAliasMaximum,
19-
int keepAlive
19+
int keepAlive,
20+
boolean requestResponseInformation,
21+
boolean requestProblemInformation
2022
);
2123

2224
void setClientId(@NotNull String clientId);

src/main/java/com/ss/mqtt/broker/network/client/impl/AbstractMqttClient.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,24 @@ public abstract class AbstractMqttClient implements UnsafeMqttClient {
2424

2525
private volatile @Setter @NotNull String clientId;
2626

27-
private volatile long sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
28-
private volatile int receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
29-
private volatile int maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
30-
private volatile int topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
31-
private volatile int keepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_MAX;
27+
private volatile long sessionExpiryInterval;
28+
private volatile int receiveMax;
29+
private volatile int maximumPacketSize;
30+
private volatile int topicAliasMaximum;
31+
private volatile int keepAlive;
32+
33+
private volatile boolean requestResponseInformation = false;
34+
private volatile boolean requestProblemInformation = false;
3235

3336
public AbstractMqttClient(@NotNull MqttConnection connection) {
3437
this.connection = connection;
3538
this.clientId = StringUtils.EMPTY;
39+
var config = connection.getConfig();
40+
this.sessionExpiryInterval = config.getDefaultSessionExpiryInterval();
41+
this.receiveMax = config.getReceiveMaximum();
42+
this.maximumPacketSize = config.getMaximumPacketSize();
43+
this.topicAliasMaximum = config.getTopicAliasMaximum();
44+
this.keepAlive = config.getMinKeepAliveTime();
3645
}
3746

3847
@Override
@@ -54,13 +63,17 @@ public void configure(
5463
int receiveMax,
5564
int maximumPacketSize,
5665
int topicAliasMaximum,
57-
int keepAlive
66+
int keepAlive,
67+
boolean requestResponseInformation,
68+
boolean requestProblemInformation
5869
) {
5970
this.sessionExpiryInterval = sessionExpiryInterval;
6071
this.receiveMax = receiveMax;
6172
this.maximumPacketSize = maximumPacketSize;
6273
this.topicAliasMaximum = topicAliasMaximum;
6374
this.keepAlive = keepAlive;
75+
this.requestProblemInformation = requestProblemInformation;
76+
this.requestResponseInformation = requestResponseInformation;
6477
}
6578

6679
@Override

src/main/java/com/ss/mqtt/broker/network/packet/factory/Mqtt311PacketOutFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
1616
@NotNull String requestedClientId,
1717
long requestedSessionExpiryInterval,
1818
int requestedKeepAlive,
19+
int requestedReceiveMax,
1920
@NotNull String reason,
2021
@NotNull String serverReference,
2122
@NotNull String responseInformation,

src/main/java/com/ss/mqtt/broker/network/packet/factory/Mqtt5PacketOutFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
1616
@NotNull String requestedClientId,
1717
long requestedSessionExpiryInterval,
1818
int requestedKeepAlive,
19+
int requestedReceiveMax,
1920
@NotNull String reason,
2021
@NotNull String serverReference,
2122
@NotNull String responseInformation,
@@ -30,6 +31,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
3031
requestedClientId,
3132
requestedSessionExpiryInterval,
3233
requestedKeepAlive,
34+
requestedReceiveMax,
3335
reason,
3436
serverReference,
3537
responseInformation,

src/main/java/com/ss/mqtt/broker/network/packet/factory/MqttPacketOutFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public abstract class MqttPacketOutFactory {
1717
@NotNull String requestedClientId,
1818
long requestedSessionExpiryInterval,
1919
int requestedKeepAlive,
20+
int requestedReceiveMax,
2021
@NotNull String reason,
2122
@NotNull String serverReference,
2223
@NotNull String responseInformation,
@@ -31,7 +32,8 @@ public abstract class MqttPacketOutFactory {
3132
boolean sessionPresent,
3233
@NotNull String requestedClientId,
3334
long requestedSessionExpiryInterval,
34-
int requestedKeepAlive
35+
int requestedKeepAlive,
36+
int requestedReceiveMax
3537
) {
3638
return newConnectAck(
3739
client,
@@ -40,6 +42,7 @@ public abstract class MqttPacketOutFactory {
4042
requestedClientId,
4143
requestedSessionExpiryInterval,
4244
requestedKeepAlive,
45+
requestedReceiveMax,
4346
StringUtils.EMPTY,
4447
StringUtils.EMPTY,
4548
StringUtils.EMPTY,
@@ -60,6 +63,7 @@ public abstract class MqttPacketOutFactory {
6063
StringUtils.EMPTY,
6164
client.getSessionExpiryInterval(),
6265
client.getKeepAlive(),
66+
client.getReceiveMax(),
6367
StringUtils.EMPTY,
6468
StringUtils.EMPTY,
6569
StringUtils.EMPTY,

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectAckInPacket.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,21 +269,21 @@ public ConnectAckInPacket(byte info) {
269269
this.userProperties = Array.empty();
270270
this.reasonCode = ConnectAckReasonCode.SUCCESS;
271271
this.maximumQos = QoS.EXACTLY_ONCE_DELIVERY;
272-
this.sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
273-
this.receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
274272
this.retainAvailable = MqttPropertyConstants.RETAIN_AVAILABLE_DEFAULT;
275273
this.assignedClientId = StringUtils.EMPTY;
276-
this.topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
277274
this.reason = StringUtils.EMPTY;
278275
this.sharedSubscriptionAvailable = MqttPropertyConstants.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT;
279276
this.wildcardSubscriptionAvailable = MqttPropertyConstants.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT;
280-
this.subscriptionIdAvailable = MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
281-
this.serverKeepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_UNDEFINED;
277+
this.subscriptionIdAvailable = MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT;
282278
this.responseInformation = StringUtils.EMPTY;
283279
this.serverReference = StringUtils.EMPTY;
284280
this.authenticationMethod = StringUtils.EMPTY;
285281
this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY;
286-
this.maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
282+
this.serverKeepAlive = MqttPropertyConstants.SERVER_KEEP_ALIVE_UNDEFINED;
283+
this.maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_UNDEFINED;
284+
this.sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_UNDEFINED;
285+
this.topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_UNDEFINED;
286+
this.receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_UNDEFINED;
287287
}
288288

289289
@Override

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,10 @@ public ConnectInPacket(byte info) {
227227
this.authenticationMethod = StringUtils.EMPTY;
228228
this.willPayload = ArrayUtils.EMPTY_BYTE_ARRAY;
229229
this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY;
230-
this.sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT;
231-
this.receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT;
232-
this.maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT;
233-
this.topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT;
230+
this.sessionExpiryInterval = MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_UNDEFINED;
231+
this.receiveMax = MqttPropertyConstants.RECEIVE_MAXIMUM_UNDEFINED;
232+
this.maximumPacketSize = MqttPropertyConstants.MAXIMUM_PACKET_SIZE_UNDEFINED;
233+
this.topicAliasMaximum = MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_UNDEFINED;
234234
this.requestResponseInformation = false;
235235
this.requestProblemInformation = false;
236236
}
@@ -277,7 +277,7 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B
277277
hasUserName = NumberUtils.isSetBit(flags, 7);
278278
hasPassword = NumberUtils.isSetBit(flags, 6);
279279
willFlag = NumberUtils.isSetBit(flags, 2);
280-
keepAlive = readShort(buffer);
280+
keepAlive = readUnsignedShort(buffer);
281281
}
282282

283283
@Override

0 commit comments

Comments
 (0)