Skip to content

Commit cc220b1

Browse files
authored
Update subscribe/unsubscribe related messages (#60)
1 parent dd3f8c8 commit cc220b1

File tree

63 files changed

+589
-517
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+589
-517
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import javasabr.mqtt.model.reason.code.ConnectAckReasonCode
66
import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode
77
import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode
88
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode
9-
import javasabr.mqtt.model.subscribtion.Subscription
9+
import javasabr.mqtt.model.subscription.Subscription
1010
import javasabr.mqtt.model.topic.TopicFilter
1111
import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
1212
import javasabr.mqtt.network.message.in.PublishMqttInMessage

model/src/main/java/javasabr/mqtt/model/MqttMessageProperty.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package javasabr.mqtt.model;
22

3-
import java.util.stream.Stream;
43
import javasabr.mqtt.model.data.type.MqttDataType;
54
import javasabr.rlib.common.util.ClassUtils;
5+
import javasabr.rlib.common.util.NumberedEnum;
6+
import javasabr.rlib.common.util.NumberedEnumMap;
67
import javasabr.rlib.common.util.ObjectUtils;
78
import lombok.AccessLevel;
89
import lombok.Getter;
910
import lombok.experimental.Accessors;
1011
import lombok.experimental.FieldDefaults;
1112
import org.jspecify.annotations.Nullable;
1213

13-
@Accessors(fluent = true, chain = false)
14+
@Accessors
1415
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
15-
public enum MqttMessageProperty {
16+
public enum MqttMessageProperty implements NumberedEnum<MqttMessageProperty> {
1617
PAYLOAD_FORMAT_INDICATOR(0x01, MqttDataType.BYTE),
1718
MESSAGE_EXPIRY_INTERVAL(0x02, MqttDataType.INTEGER),
1819
CONTENT_TYPE(0x03, MqttDataType.UTF_8_STRING),
@@ -41,38 +42,17 @@ public enum MqttMessageProperty {
4142
SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29, MqttDataType.BYTE),
4243
SHARED_SUBSCRIPTION_AVAILABLE(0x2A, MqttDataType.BYTE);
4344

44-
private static final MqttMessageProperty[] PROPERTIES;
45-
46-
static {
47-
48-
int maxId = Stream
49-
.of(values())
50-
.mapToInt(MqttMessageProperty::id)
51-
.max()
52-
.orElse(0);
53-
54-
var result = new MqttMessageProperty[maxId + 1];
55-
56-
Stream
57-
.of(values())
58-
.forEach(prop -> result[prop.id] = prop);
59-
60-
PROPERTIES = result;
61-
}
45+
private static final NumberedEnumMap<MqttMessageProperty> NUMBERED_MAP =
46+
new NumberedEnumMap<>(MqttMessageProperty.class);
6247

6348
public static MqttMessageProperty byId(int id) {
64-
if (id < 0 || id >= PROPERTIES.length) {
65-
throw new IllegalArgumentException("Unknown property with id: " + id);
66-
} else {
67-
return PROPERTIES[id];
68-
}
49+
return NUMBERED_MAP.require(id);
6950
}
7051

7152
@Getter
7253
byte id;
7354
@Getter
7455
MqttDataType dataType;
75-
7656
@Nullable
7757
Object defaultValue;
7858

@@ -86,6 +66,11 @@ public static MqttMessageProperty byId(int id) {
8666
this.defaultValue = defaultValue;
8767
}
8868

69+
@Override
70+
public int number() {
71+
return id;
72+
}
73+
8974
public <T> T defaultValue() {
9075
return ClassUtils.unsafeNNCast(ObjectUtils.notNull(defaultValue));
9176
}

model/src/main/java/javasabr/mqtt/model/MqttProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public interface MqttProperties {
4747
int TOPIC_ALIAS_INVALID = 0;
4848

4949
int SUBSCRIPTION_ID_IS_NOT_SET = 0;
50+
int SUBSCRIPTION_ID_MIN = 1;
51+
int SUBSCRIPTION_ID_MAX = 268_435_455;
52+
5053
int MESSAGE_ID_IS_NOT_SET = 0;
5154

5255
boolean SESSIONS_ENABLED_DEFAULT = true;
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package javasabr.mqtt.model;
22

33
public interface MqttProtocolErrors {
4-
String NO_ANY_TOPIC_FILTER = "Not provided any information about TopicFilters";
4+
String NO_ANY_TOPIC_FILTERS = "Not provided any information about 'Topic Filters'";
55
String NO_ANY_TOPIC_NANE = "Not provided any information about TopicName";
66
//String INVALID_TOPIC_ALIAS = "Provided invalid TopicAlias";
77
String INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat";
88
String INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval";
99
String INVALID_RESPONSE_TOPIC_NAME = "Provided invalid ResponseTopicName";
10-
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Unsupported 'QoS' or 'RetainHandling'";
10+
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Provided unsupported 'QoS' or 'RetainHandling'";
1111
String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'";
1212
String NOT_EXPECTED_MESSAGE_ID = "'Packet Identifier' must be zero'";
13-
String PROTOCOL_LEVEL_UNSUPPORTED_NO_LOCAL_OPTION = "'NoLocal' option is not available on this protocol level";
14-
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_AS_PUBLISH_OPTION = "'RetainAsPublished' option is not available on this protocol level";
15-
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_HANDLING_OPTION = "'RetainHandling' option is not available on this protocol level";
13+
String INVALID_SUBSCRIPTION_ID = "Provided invalid 'Subscription Identifier'";
14+
String PROTOCOL_LEVEL_UNSUPPORTED_NO_LOCAL_OPTION = "'No Local' option is not available on this protocol level";
15+
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_AS_PUBLISH_OPTION = "'Retain As Published' option is not available on this protocol level";
16+
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_HANDLING_OPTION = "'Retain Handling' option is not available on this protocol level";
1617
}

model/src/main/java/javasabr/mqtt/model/message/MqttMessageType.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package javasabr.mqtt.model.message;
22

3-
import lombok.AccessLevel;
3+
import javasabr.rlib.common.util.NumberedEnum;
4+
import javasabr.rlib.common.util.NumberedEnumMap;
45
import lombok.CustomLog;
56
import lombok.Getter;
67
import lombok.RequiredArgsConstructor;
78
import lombok.experimental.Accessors;
8-
import lombok.experimental.FieldDefaults;
99

1010
@Getter
1111
@CustomLog
12-
@Accessors(fluent = true)
12+
@Accessors
1313
@RequiredArgsConstructor
14-
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
15-
public enum MqttMessageType {
14+
public enum MqttMessageType implements NumberedEnum<MqttMessageType> {
1615
RESERVED(0),
1716
/**
1817
* After a Network Connection is established by a Client to a Server, the first Packet sent from the Client to the
@@ -105,16 +104,17 @@ public enum MqttMessageType {
105104
*/
106105
INVALID(16);
107106

108-
private static final MqttMessageType[] VALUES = values();
107+
private static final NumberedEnumMap<MqttMessageType> NUMBERED_MAP =
108+
new NumberedEnumMap<>(MqttMessageType.class);
109109

110-
public static MqttMessageType fromByte(byte messageType) {
111-
if (messageType < 0 || messageType > AUTHENTICATION.typeIndex()) {
112-
log.warning(messageType, "Invalid message type:[%s]"::formatted);
113-
return INVALID;
114-
} else {
115-
return VALUES[messageType];
116-
}
110+
public static MqttMessageType fromByte(int messageType) {
111+
return NUMBERED_MAP.resolve(messageType, MqttMessageType.INVALID);
117112
}
118113

119-
int typeIndex;
114+
private final int typeIndex;
115+
116+
@Override
117+
public int number() {
118+
return typeIndex;
119+
}
120120
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@NullMarked
2+
package javasabr.mqtt.model.message;
3+
4+
import org.jspecify.annotations.NullMarked;

model/src/main/java/javasabr/mqtt/model/reason/code/SubscribeAckReasonCode.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@
22

33
import javasabr.rlib.common.util.NumberedEnum;
44
import javasabr.rlib.common.util.NumberedEnumMap;
5-
import lombok.AccessLevel;
65
import lombok.Getter;
76
import lombok.RequiredArgsConstructor;
87
import lombok.experimental.Accessors;
9-
import lombok.experimental.FieldDefaults;
108

119
@Getter
10+
@Accessors
1211
@RequiredArgsConstructor
13-
@Accessors(fluent = true, chain = false)
14-
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
15-
public enum SubscribeAckReasonCode implements NumberedEnum<SubscribeAckReasonCode> {
12+
public enum SubscribeAckReasonCode implements NumberedEnum<SubscribeAckReasonCode>, ReasonCode {
1613
/**
1714
* The subscription is accepted and the maximum QoS sent will be QoS 0. This might be a lower QoS than was requested.
1815
*/
@@ -73,7 +70,7 @@ public static SubscribeAckReasonCode ofCode(int code) {
7370
return NUMBERED_MAP.require(code);
7471
}
7572

76-
int code;
73+
private final int code;
7774

7875
@Override
7976
public int number() {
Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,58 @@
11
package javasabr.mqtt.model.reason.code;
22

3-
import java.util.stream.Stream;
4-
import javasabr.rlib.common.util.ObjectUtils;
3+
import javasabr.rlib.common.util.NumberedEnum;
4+
import javasabr.rlib.common.util.NumberedEnumMap;
55
import lombok.Getter;
66
import lombok.RequiredArgsConstructor;
7+
import lombok.experimental.Accessors;
78

9+
@Getter
10+
@Accessors
811
@RequiredArgsConstructor
9-
public enum UnsubscribeAckReasonCode {
12+
public enum UnsubscribeAckReasonCode implements NumberedEnum<UnsubscribeAckReasonCode>, ReasonCode {
1013
/**
1114
* The subscription is deleted.
1215
*/
13-
SUCCESS((byte) 0x00),
16+
SUCCESS(0x00),
1417
/**
1518
* The subscription is accepted and the maximum QoS sent will be QoS 1. This might be a lower QoS than was requested.
1619
*/
17-
NO_SUBSCRIPTION_EXISTED((byte) 0x11),
20+
NO_SUBSCRIPTION_EXISTED(0x11),
1821

1922
// ERRORS
20-
2123
/**
2224
* The unsubscribe could not be completed and the Server either does not wish to reveal the reason or none of the
2325
* other Reason Codes apply.
2426
*/
25-
UNSPECIFIED_ERROR((byte) 0x80),
27+
UNSPECIFIED_ERROR(0x80),
2628
/**
2729
* The UNSUBSCRIBE is valid but the Server does not accept it.
2830
*/
29-
IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83),
31+
IMPLEMENTATION_SPECIFIC_ERROR(0x83),
3032
/**
3133
* The Client is not authorized to unsubscribe.
3234
*/
33-
NOT_AUTHORIZED((byte) 0x87),
35+
NOT_AUTHORIZED(0x87),
3436
/**
3537
* The Topic Filter is correctly formed but is not allowed for this Client.
3638
*/
37-
TOPIC_FILTER_INVALID((byte) 0x8F),
39+
TOPIC_FILTER_INVALID(0x8F),
3840
/**
3941
* The specified Packet Identifier is already in use.
4042
*/
41-
PACKET_IDENTIFIER_IN_USE((byte) 0x91);
42-
43-
private static final UnsubscribeAckReasonCode[] VALUES;
44-
45-
static {
46-
47-
var maxId = Stream
48-
.of(values())
49-
.mapToInt(UnsubscribeAckReasonCode::value)
50-
.map(value -> Byte.toUnsignedInt((byte) value))
51-
.max()
52-
.orElse(0);
43+
PACKET_IDENTIFIER_IN_USE(0x91);
5344

54-
var values = new UnsubscribeAckReasonCode[maxId + 1];
45+
private static final NumberedEnumMap<UnsubscribeAckReasonCode> NUMBERED_MAP =
46+
new NumberedEnumMap<>(UnsubscribeAckReasonCode.class);
5547

56-
for (var value : values()) {
57-
values[Byte.toUnsignedInt(value.value)] = value;
58-
}
59-
60-
VALUES = values;
48+
public static UnsubscribeAckReasonCode ofCode(int code) {
49+
return NUMBERED_MAP.require(code);
6150
}
6251

63-
public static UnsubscribeAckReasonCode of(int index) {
64-
return ObjectUtils.notNull(
65-
VALUES[index],
66-
index,
67-
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
68-
}
52+
private final int code;
6953

70-
@Getter
71-
private final byte value;
54+
@Override
55+
public int number() {
56+
return code;
57+
}
7258
}

model/src/main/java/javasabr/mqtt/model/session/ActiveSubscriptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package javasabr.mqtt.model.session;
22

3-
import javasabr.mqtt.model.subscribtion.Subscription;
3+
import javasabr.mqtt.model.subscription.Subscription;
44
import javasabr.mqtt.model.topic.TopicFilter;
55
import javasabr.rlib.collections.array.Array;
66

model/src/main/java/javasabr/mqtt/model/subscriber/SharedSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.util.Collection;
44
import java.util.concurrent.atomic.AtomicInteger;
5-
import javasabr.mqtt.model.subscribtion.SubscriptionOwner;
5+
import javasabr.mqtt.model.subscription.SubscriptionOwner;
66
import javasabr.mqtt.model.topic.SharedTopicFilter;
77
import javasabr.rlib.collections.array.Array;
88
import javasabr.rlib.collections.array.ArrayFactory;

0 commit comments

Comments
 (0)