Skip to content

Commit e345640

Browse files
committed
[broker-25] implement subscribe out packets, fix subscribe in packet
1 parent ca6e2a9 commit e345640

File tree

12 files changed

+265
-20
lines changed

12 files changed

+265
-20
lines changed

src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public interface MqttPropertyConstants {
4242
int TOPIC_ALIAS_MAX = 0xFFFF;
4343
int TOPIC_ALIAS_NOT_SET = 0;
4444

45-
int SUBSCRIPTION_ID_NOT_DEFINED = 0;
45+
int SUBSCRIPTION_ID_UNDEFINED = 0;
4646

4747
boolean SESSIONS_ENABLED_DEFAULT = true;
4848
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;

src/main/java/com/ss/mqtt/broker/model/SubscribeRetainHandling.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ public enum SubscribeRetainHandling {
66
/**
77
* Send retained messages at the time of the subscribe.
88
*/
9-
SEND_AT_THE_TIME_OF_SUBSCRIBE,
9+
SEND,
1010
/**
1111
* Send retained messages at subscribe only if the subscription does not currently exist.
1212
*/
13-
SEND_AT_SUBSCRIBE_ONLY_IF_THE_SUBSCRIPTION_DOES_NOT_CURRENTLY_EXIST,
13+
SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST,
1414
/**
1515
* Do not send retained messages at the time of the subscribe.
1616
*/
17-
DO_NOT_SEND_AT_THE_TIME_OF_THE_SUBSCRIBE,
17+
DO_NOT_SEND,
1818
INVALID;
1919

2020
private static final SubscribeRetainHandling[] VALUES = values();
2121

2222
public static @NotNull SubscribeRetainHandling of(int level) {
23-
if (level < 0 || level > DO_NOT_SEND_AT_THE_TIME_OF_THE_SUBSCRIBE.ordinal()) {
23+
if (level < 0 || level > DO_NOT_SEND.ordinal()) {
2424
return SubscribeRetainHandling.INVALID;
2525
} else {
2626
return VALUES[level];

src/main/java/com/ss/mqtt/broker/model/SubscribeTopicFilter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package com.ss.mqtt.broker.model;
22

3+
import lombok.EqualsAndHashCode;
34
import lombok.Getter;
45
import lombok.RequiredArgsConstructor;
6+
import lombok.ToString;
7+
import org.jetbrains.annotations.NotNull;
58

69
@Getter
10+
@ToString
11+
@EqualsAndHashCode
712
@RequiredArgsConstructor
813
public class SubscribeTopicFilter {
914

@@ -37,4 +42,8 @@ public class SubscribeTopicFilter {
3742
* Retained messages sent when the subscription is established have the RETAIN flag set to 1.
3843
*/
3944
private final boolean retainAsPublished;
45+
46+
public SubscribeTopicFilter(@NotNull String topicName, @NotNull QoS qos) {
47+
this(topicName, qos, SubscribeRetainHandling.SEND, true, true);
48+
}
4049
}

src/main/java/com/ss/mqtt/broker/network/packet/in/SubscribeInPacket.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class SubscribeInPacket extends MqttReadablePacket {
4848
public SubscribeInPacket(byte info) {
4949
super(info);
5050
this.topicFilters = ArrayFactory.newArray(SubscribeTopicFilter.class);
51-
this.subscriptionId = MqttPropertyConstants.SUBSCRIPTION_ID_NOT_DEFINED;
51+
this.subscriptionId = MqttPropertyConstants.SUBSCRIPTION_ID_UNDEFINED;
5252
}
5353

5454
@Override
@@ -79,15 +79,15 @@ protected void readPayload(@NotNull MqttConnection connection, @NotNull ByteBuff
7979
var options = readUnsignedByte(buffer);
8080

8181
var qos = QoS.of(options & 0x03);
82-
var retainHandling = isMqtt5 ? SubscribeRetainHandling.of((options >> 4) & 0x03) :
83-
SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE;
82+
var retainHandling = isMqtt5 ?
83+
SubscribeRetainHandling.of((options >> 4) & 0x03) : SubscribeRetainHandling.SEND;
8484

8585
if (qos == QoS.INVALID || retainHandling == SubscribeRetainHandling.INVALID) {
8686
throw new IllegalStateException("Unsupported qos or retain handling");
8787
}
8888

8989
var noLocal = !isMqtt5 || NumberUtils.isSetBit(options, 2);
90-
var rap = isMqtt5 && NumberUtils.isSetBit(options, 3);
90+
var rap = !isMqtt5 || NumberUtils.isSetBit(options, 3);
9191

9292
topicFilters.add(new SubscribeTopicFilter(topicFilter, qos, retainHandling, noLocal, rap));
9393
}

src/main/java/com/ss/mqtt/broker/network/packet/out/Connect5OutPacket.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import com.ss.mqtt.broker.model.MqttVersion;
55
import com.ss.mqtt.broker.model.PacketProperty;
66
import com.ss.mqtt.broker.model.QoS;
7+
import com.ss.mqtt.broker.model.data.type.StringPair;
78
import com.ss.mqtt.broker.util.MqttDataUtils;
9+
import com.ss.rlib.common.util.array.Array;
810
import org.jetbrains.annotations.NotNull;
911

1012
import java.nio.ByteBuffer;
@@ -178,6 +180,7 @@ public class Connect5OutPacket extends Connect311OutPacket {
178180
);
179181

180182
// properties
183+
private final @NotNull Array<StringPair> userProperties;
181184
private final @NotNull String authenticationMethod;
182185
private final @NotNull byte[] authenticationData;
183186

@@ -198,6 +201,7 @@ public Connect5OutPacket(
198201
int keepAlive,
199202
boolean willRetain,
200203
boolean cleanStart,
204+
@NotNull Array<StringPair> userProperties,
201205
@NotNull String authenticationMethod,
202206
@NotNull byte[] authenticationData,
203207
long sessionExpiryInterval,
@@ -208,6 +212,7 @@ public Connect5OutPacket(
208212
boolean requestProblemInformation
209213
) {
210214
super(username, willTopic, clientId, password, willPayload, willQos, keepAlive, willRetain, cleanStart);
215+
this.userProperties = userProperties;
211216
this.authenticationMethod = authenticationMethod;
212217
this.authenticationData = authenticationData;
213218
this.sessionExpiryInterval = sessionExpiryInterval;
@@ -248,6 +253,7 @@ protected void appendWillProperties(@NotNull ByteBuffer buffer) {
248253
@Override
249254
protected void writeProperties(@NotNull ByteBuffer buffer) {
250255
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901046
256+
writeStringPairProperties(buffer, PacketProperty.USER_PROPERTY, userProperties);
251257
writeNotEmptyProperty(buffer, PacketProperty.AUTHENTICATION_METHOD, authenticationMethod);
252258
writeNotEmptyProperty(buffer, PacketProperty.AUTHENTICATION_DATA, authenticationData);
253259
writeProperty(
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.ss.mqtt.broker.network.packet.out;
2+
3+
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
4+
import com.ss.mqtt.broker.network.packet.PacketType;
5+
import com.ss.rlib.common.util.array.Array;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
import java.nio.ByteBuffer;
10+
11+
/**
12+
* Subscribe request.
13+
*/
14+
@RequiredArgsConstructor
15+
public class Subscribe311OutPacket extends MqttWritablePacket {
16+
17+
private static final byte PACKET_TYPE = (byte) PacketType.SUBSCRIBE.ordinal();
18+
19+
private final @NotNull Array<SubscribeTopicFilter> topicFilters;
20+
private final int packetId;
21+
22+
@Override
23+
protected byte getPacketType() {
24+
return PACKET_TYPE;
25+
}
26+
27+
@Override
28+
protected void writeVariableHeader(@NotNull ByteBuffer buffer) {
29+
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718065
30+
writeShort(buffer, packetId);
31+
}
32+
33+
@Override
34+
protected void writePayload(@NotNull ByteBuffer buffer) {
35+
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718066
36+
for (var topicFilter : topicFilters) {
37+
writeString(buffer, topicFilter.getTopicName());
38+
writeByte(buffer, buildSubscriptionOptions(topicFilter));
39+
}
40+
}
41+
42+
protected int buildSubscriptionOptions(@NotNull SubscribeTopicFilter topicFilter) {
43+
return topicFilter.getQos().ordinal();
44+
}
45+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.ss.mqtt.broker.network.packet.out;
2+
3+
import com.ss.mqtt.broker.model.MqttPropertyConstants;
4+
import com.ss.mqtt.broker.model.PacketProperty;
5+
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
6+
import com.ss.mqtt.broker.model.data.type.StringPair;
7+
import com.ss.rlib.common.util.array.Array;
8+
import org.jetbrains.annotations.NotNull;
9+
10+
import java.nio.ByteBuffer;
11+
import java.util.EnumSet;
12+
import java.util.Set;
13+
14+
/**
15+
* Subscribe request.
16+
*/
17+
public class Subscribe5OutPacket extends Subscribe311OutPacket {
18+
19+
private static final Set<PacketProperty> AVAILABLE_PROPERTIES = EnumSet.of(
20+
/*
21+
Followed by a Variable Byte Integer representing the identifier of the subscription. The Subscription
22+
Identifier can have the value of 1 to 268,435,455. It is a Protocol Error if the Subscription Identifier has a
23+
value of 0. It is a Protocol Error to include the Subscription Identifier more than once.
24+
25+
The Subscription Identifier is associated with any subscription created or modified as the result of this
26+
SUBSCRIBE packet. If there is a Subscription Identifier, it is stored with the subscription. If this property is
27+
not specified, then the absence of a Subscription Identifier is stored with the subscription.
28+
*/
29+
PacketProperty.SUBSCRIPTION_IDENTIFIER,
30+
/*
31+
The User Property is allowed to appear multiple times to represent multiple name, value pairs. The same
32+
name is allowed to appear more than once.
33+
*/
34+
PacketProperty.USER_PROPERTY
35+
);
36+
37+
// properties
38+
private final @NotNull Array<StringPair> userProperties;
39+
private final int subscriptionId;
40+
41+
public Subscribe5OutPacket(@NotNull Array<SubscribeTopicFilter> topicFilters, int packetId) {
42+
this(topicFilters, packetId, Array.empty(), MqttPropertyConstants.SUBSCRIPTION_ID_UNDEFINED);
43+
}
44+
45+
public Subscribe5OutPacket(
46+
@NotNull Array<SubscribeTopicFilter> topicFilters,
47+
int packetId,
48+
@NotNull Array<StringPair> userProperties,
49+
int subscriptionId
50+
) {
51+
super(topicFilters, packetId);
52+
this.userProperties = userProperties;
53+
this.subscriptionId = subscriptionId;
54+
}
55+
56+
protected int buildSubscriptionOptions(@NotNull SubscribeTopicFilter topicFilter) {
57+
58+
var subscriptionOptions = 0;
59+
60+
subscriptionOptions |= topicFilter.getRetainHandling().ordinal() << 4;
61+
62+
if (topicFilter.isRetainAsPublished()) {
63+
subscriptionOptions |= 0b0000_1000;
64+
}
65+
66+
if (topicFilter.isNoLocal()) {
67+
subscriptionOptions |= 0b0000_0100;
68+
}
69+
70+
subscriptionOptions |= topicFilter.getQos().ordinal();
71+
72+
return subscriptionOptions;
73+
}
74+
75+
@Override
76+
protected boolean isPropertiesSupported() {
77+
return true;
78+
}
79+
80+
@Override
81+
protected void writeProperties(@NotNull ByteBuffer buffer) {
82+
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901164
83+
writeStringPairProperties(buffer, PacketProperty.USER_PROPERTY, userProperties);
84+
writeProperty(
85+
buffer,
86+
PacketProperty.SUBSCRIPTION_IDENTIFIER,
87+
subscriptionId,
88+
MqttPropertyConstants.SUBSCRIPTION_ID_UNDEFINED
89+
);
90+
}
91+
}

src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package com.ss.mqtt.broker.test.network
33
import com.ss.mqtt.broker.config.MqttConnectionConfig
44
import com.ss.mqtt.broker.model.MqttVersion
55
import com.ss.mqtt.broker.model.QoS
6+
import com.ss.mqtt.broker.model.SubscribeRetainHandling
7+
import com.ss.mqtt.broker.model.SubscribeTopicFilter
68
import com.ss.mqtt.broker.model.data.type.StringPair
79
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode
810
import com.ss.mqtt.broker.model.reason.code.UnsubscribeAckReasonCode
@@ -54,7 +56,23 @@ class NetworkUnitSpecification extends UnitSpecification {
5456
public static final publishTopic = "publish/Topic"
5557
public static final responseTopic = "response/Topic"
5658
public static final topicFilter = "topic/Filter"
59+
public static final topicFilter1Obj311 = new SubscribeTopicFilter(topicFilter, QoS.AT_LEAST_ONCE_DELIVERY)
60+
public static final topicFilter1Obj5 = new SubscribeTopicFilter(
61+
topicFilter,
62+
QoS.AT_LEAST_ONCE_DELIVERY,
63+
SubscribeRetainHandling.DO_NOT_SEND,
64+
true,
65+
false,
66+
)
5767
public static final topicFilter2 = "topic/Filter2"
68+
public static final topicFilter2Obj311 = new SubscribeTopicFilter(topicFilter2, QoS.EXACTLY_ONCE_DELIVERY)
69+
public static final topicFilter2Obj5 = new SubscribeTopicFilter(
70+
topicFilter2,
71+
QoS.EXACTLY_ONCE_DELIVERY,
72+
SubscribeRetainHandling.DO_NOT_SEND,
73+
true,
74+
false,
75+
)
5876
public static final serverReference = "serverReference"
5977
public static final contentType = "application/json"
6078
public static final subscribeAckReasonCodes = ArrayFactory.asArray(
@@ -74,6 +92,8 @@ class NetworkUnitSpecification extends UnitSpecification {
7492
)
7593
public static final subscriptionIds = IntegerArray.of(subscriptionId, subscriptionId2)
7694
public static final topicFilters = Array.of(topicFilter, topicFilter2)
95+
public static final topicFiltersObj311 = Array.of(topicFilter1Obj311, topicFilter2Obj311)
96+
public static final topicFiltersObj5 = Array.of(topicFilter1Obj5, topicFilter2Obj5)
7797
public static final publishPayload = "publishPayload".getBytes(StandardCharsets.UTF_8)
7898
public static final correlationData = "correlationData".getBytes(StandardCharsets.UTF_8)
7999

src/test/groovy/com/ss/mqtt/broker/test/network/in/SubscribeInPacketTest.groovy

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ class SubscribeInPacketTest extends BaseInPacketTest {
3131
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
3232
packet.topicFilters.get(0).getTopicName() == topicFilter
3333
packet.topicFilters.get(0).isNoLocal()
34-
!packet.topicFilters.get(0).isRetainAsPublished()
35-
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
34+
packet.topicFilters.get(0).isRetainAsPublished()
35+
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND
3636
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
3737
packet.topicFilters.get(1).getTopicName() == topicFilter2
3838
packet.topicFilters.get(1).isNoLocal()
39-
!packet.topicFilters.get(1).isRetainAsPublished()
40-
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
39+
packet.topicFilters.get(1).isRetainAsPublished()
40+
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND
4141
packet.packetId == packetId
4242
packet.userProperties == Array.empty()
43-
packet.subscriptionId == MqttPropertyConstants.SUBSCRIPTION_ID_NOT_DEFINED
43+
packet.subscriptionId == MqttPropertyConstants.SUBSCRIPTION_ID_UNDEFINED
4444
}
4545

4646
def "should read packet correctly as mqtt 5.0"() {
@@ -72,12 +72,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
7272
packet.topicFilters.get(0).getTopicName() == topicFilter
7373
!packet.topicFilters.get(0).isNoLocal()
7474
packet.topicFilters.get(0).isRetainAsPublished()
75-
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
75+
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND
7676
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
7777
packet.topicFilters.get(1).getTopicName() == topicFilter2
7878
packet.topicFilters.get(1).isNoLocal()
7979
!packet.topicFilters.get(1).isRetainAsPublished()
80-
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_SUBSCRIBE_ONLY_IF_THE_SUBSCRIPTION_DOES_NOT_CURRENTLY_EXIST
80+
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST
8181
packet.packetId == packetId
8282
packet.userProperties == userProperties
8383
packet.subscriptionId == subscriptionId
@@ -101,14 +101,14 @@ class SubscribeInPacketTest extends BaseInPacketTest {
101101
packet.topicFilters.get(0).getTopicName() == topicFilter
102102
!packet.topicFilters.get(0).isNoLocal()
103103
!packet.topicFilters.get(0).isRetainAsPublished()
104-
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
104+
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND
105105
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
106106
packet.topicFilters.get(1).getTopicName() == topicFilter2
107107
!packet.topicFilters.get(1).isNoLocal()
108108
!packet.topicFilters.get(1).isRetainAsPublished()
109-
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
109+
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND
110110
packet.packetId == packetId
111111
packet.userProperties == Array.empty()
112-
packet.subscriptionId == MqttPropertyConstants.SUBSCRIPTION_ID_NOT_DEFINED
112+
packet.subscriptionId == MqttPropertyConstants.SUBSCRIPTION_ID_UNDEFINED
113113
}
114114
}

src/test/groovy/com/ss/mqtt/broker/test/network/out/Connect5OutPacketTest.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class Connect5OutPacketTest extends BaseOutPacketTest {
2323
keepAlive,
2424
willRetain,
2525
cleanStart,
26+
userProperties,
2627
authMethod,
2728
authData,
2829
sessionExpiryInterval,
@@ -48,7 +49,7 @@ class Connect5OutPacketTest extends BaseOutPacketTest {
4849
reader.clientId == clientId
4950
reader.password == userPassword
5051
reader.keepAlive == keepAlive
51-
reader.userProperties == Array.empty()
52+
reader.userProperties == userProperties
5253
reader.cleanStart == cleanStart
5354
reader.willRetain == willRetain
5455
reader.authenticationMethod == authMethod

0 commit comments

Comments
 (0)