Skip to content

Commit dd3f8c8

Browse files
authored
Update publish related messages (#57)
* update publish related messages
1 parent 9099215 commit dd3f8c8

32 files changed

+471
-236
lines changed
Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,37 @@
11
package javasabr.mqtt.model.reason.code;
22

3-
import java.util.stream.Stream;
4-
import javasabr.rlib.common.util.ObjectUtils;
3+
import javasabr.rlib.common.util.NumberedEnum;
4+
import javasabr.rlib.common.util.NumberedEnumMap;
55
import lombok.Getter;
66
import lombok.RequiredArgsConstructor;
7+
import lombok.experimental.Accessors;
78

9+
@Getter
10+
@Accessors
811
@RequiredArgsConstructor
9-
public enum PublishReleaseReasonCode {
12+
public enum PublishReleaseReasonCode implements NumberedEnum<PublishReleaseReasonCode>, ReasonCode {
1013

1114
/**
1215
* Message released.
1316
*/
14-
SUCCESS((byte) 0x00),
17+
SUCCESS(0x00),
1518
/**
1619
* The Packet Identifier is not known. This is not an error during recovery, but at other times indicates a mismatch
1720
* between the Session State on the Client and Server.
1821
*/
19-
PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92);
22+
PACKET_IDENTIFIER_NOT_FOUND(0x92);
2023

21-
private static final PublishReleaseReasonCode[] VALUES;
24+
private static final NumberedEnumMap<PublishReleaseReasonCode> NUMBERED_MAP =
25+
new NumberedEnumMap<>(PublishReleaseReasonCode.class);
2226

23-
static {
24-
25-
var maxId = Stream
26-
.of(values())
27-
.mapToInt(PublishReleaseReasonCode::value)
28-
.map(value -> Byte.toUnsignedInt((byte) value))
29-
.max()
30-
.orElse(0);
31-
32-
var values = new PublishReleaseReasonCode[maxId + 1];
33-
34-
for (var value : values()) {
35-
values[Byte.toUnsignedInt(value.value)] = value;
36-
}
37-
38-
VALUES = values;
27+
public static PublishReleaseReasonCode ofCode(int code) {
28+
return NUMBERED_MAP.require(code);
3929
}
4030

41-
public static PublishReleaseReasonCode of(int index) {
42-
return ObjectUtils.notNull(
43-
VALUES[index],
44-
index,
45-
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
46-
}
31+
private final int code;
4732

48-
@Getter
49-
private final byte value;
33+
@Override
34+
public int number() {
35+
return code;
36+
}
5037
}

network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ protected Set<MqttMessageProperty> availableProperties() {
9191
protected void applyProperty(MqttMessageProperty property, byte[] value) {
9292
switch (property) {
9393
case AUTHENTICATION_DATA -> authenticationData = value;
94-
default -> unexpectedProperty(property);
94+
default -> unsupportedProperty(property);
9595
}
9696
}
9797

@@ -100,7 +100,7 @@ protected void applyProperty(MqttMessageProperty property, String value) {
100100
switch (property) {
101101
case REASON_STRING -> reason = value;
102102
case AUTHENTICATION_METHOD -> authenticationMethod = value;
103-
default -> unexpectedProperty(property);
103+
default -> unsupportedProperty(property);
104104
}
105105
}
106106
}

network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ protected Set<MqttMessageProperty> availableProperties() {
313313
protected void applyProperty(MqttMessageProperty property, byte[] value) {
314314
switch (property) {
315315
case AUTHENTICATION_DATA -> authenticationData = value;
316-
default -> unexpectedProperty(property);
316+
default -> unsupportedProperty(property);
317317
}
318318
}
319319

@@ -325,7 +325,7 @@ protected void applyProperty(MqttMessageProperty property, String value) {
325325
case RESPONSE_INFORMATION -> responseInformation = value;
326326
case AUTHENTICATION_METHOD -> authenticationMethod = value;
327327
case SERVER_REFERENCE -> serverReference = value;
328-
default -> unexpectedProperty(property);
328+
default -> unsupportedProperty(property);
329329
}
330330
}
331331

@@ -357,7 +357,7 @@ protected void applyProperty(MqttMessageProperty property, long value) {
357357
(int) value,
358358
MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN,
359359
MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX);
360-
default -> unexpectedProperty(property);
360+
default -> unsupportedProperty(property);
361361
}
362362
}
363363
}

network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,15 +351,15 @@ protected Set<MqttMessageProperty> availableProperties() {
351351
protected void applyProperty(MqttMessageProperty property, byte[] value) {
352352
switch (property) {
353353
case AUTHENTICATION_DATA -> authenticationData = value;
354-
default -> unexpectedProperty(property);
354+
default -> unsupportedProperty(property);
355355
}
356356
}
357357

358358
@Override
359359
protected void applyProperty(MqttMessageProperty property, String value) {
360360
switch (property) {
361361
case AUTHENTICATION_METHOD -> authenticationMethod = value;
362-
default -> unexpectedProperty(property);
362+
default -> unsupportedProperty(property);
363363
}
364364
}
365365

@@ -384,7 +384,7 @@ protected void applyProperty(MqttMessageProperty property, long value) {
384384
(int) value,
385385
MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN,
386386
MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX);
387-
default -> unexpectedProperty(property);
387+
default -> unsupportedProperty(property);
388388
}
389389
}
390390
}

network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ protected void applyProperty(MqttMessageProperty property, long value) {
117117
break;
118118
}
119119
default: {
120-
unexpectedProperty(property);
120+
unsupportedProperty(property);
121121
}
122122
}
123123
}
@@ -134,7 +134,7 @@ protected void applyProperty(MqttMessageProperty property, String value) {
134134
break;
135135
}
136136
default: {
137-
unexpectedProperty(property);
137+
unsupportedProperty(property);
138138
}
139139
}
140140
}

network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,12 @@ protected byte[] readPayload(ByteBuffer buffer) {
258258
return data;
259259
}
260260

261-
protected void unexpectedProperty(MqttMessageProperty property) {
262-
throw new MalformedProtocolMqttException("Unsupported property:[" + property + "]");
261+
protected void unsupportedProperty(MqttMessageProperty property) {
262+
throw new MalformedProtocolMqttException("Unsupported property:[%s]".formatted(property));
263+
}
264+
265+
protected void alreadyPresentedProperty(MqttMessageProperty property) {
266+
throw new MalformedProtocolMqttException("[%s] is already presented".formatted(property));
263267
}
264268

265269
@Override

network/src/main/java/javasabr/mqtt/network/message/in/PublishAckMqttInMessage.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@
1414
import lombok.Getter;
1515
import lombok.experimental.Accessors;
1616
import lombok.experimental.FieldDefaults;
17+
import org.jspecify.annotations.Nullable;
1718

1819
/**
1920
* Publish acknowledgment (QoS 1).
2021
*/
2122
@Getter
22-
@Accessors(fluent = true, chain = false)
23-
@FieldDefaults(level = AccessLevel.PRIVATE)
24-
public class PublishAckMqttInMessage extends MqttInMessage implements TrackableMessage {
23+
@Accessors
24+
@FieldDefaults(level = AccessLevel.PROTECTED)
25+
public class PublishAckMqttInMessage extends TrackableMqttInMessage implements TrackableMessage {
2526

2627
private static final int MESSAGE_TYPE = MqttMessageType.PUBLISH_ACK.ordinal();
28+
public static final byte MESSAGE_FLAGS = 0b0000_0000;
2729

2830
static {
2931
DebugUtils.registerIncludedFields("reasonCode", "messageId");
@@ -51,15 +53,13 @@ public class PublishAckMqttInMessage extends MqttInMessage implements TrackableM
5153
MqttMessageProperty.USER_PROPERTY);
5254

5355
PublishAckReasonCode reasonCode;
54-
int messageId;
55-
5656
// properties
57+
@Nullable
5758
String reason;
5859

5960
public PublishAckMqttInMessage(byte messageFlags) {
6061
super(messageFlags);
6162
this.reasonCode = PublishAckReasonCode.SUCCESS;
62-
this.reason = "";
6363
}
6464

6565
@Override
@@ -68,11 +68,13 @@ public byte messageType() {
6868
}
6969

7070
@Override
71-
protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) {
72-
73-
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718045
74-
messageId = readShortUnsigned(buffer);
71+
protected boolean validMessageFlags(byte messageFlags) {
72+
return messageFlags == MESSAGE_FLAGS;
73+
}
7574

75+
@Override
76+
protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) {
77+
super.readVariableHeader(connection, buffer);
7678
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901123
7779
if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) {
7880
reasonCode = PublishAckReasonCode.ofCode(readByteUnsigned(buffer));
@@ -92,8 +94,13 @@ protected Set<MqttMessageProperty> availableProperties() {
9294
@Override
9395
protected void applyProperty(MqttMessageProperty property, String value) {
9496
switch (property) {
95-
case REASON_STRING -> reason = value;
96-
default -> unexpectedProperty(property);
97+
case REASON_STRING -> {
98+
if (reason != null) {
99+
alreadyPresentedProperty(property);
100+
}
101+
reason = value;
102+
}
103+
default -> unsupportedProperty(property);
97104
}
98105
}
99106
}

network/src/main/java/javasabr/mqtt/network/message/in/PublishCompleteMqttInMessage.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@
1414
import lombok.Getter;
1515
import lombok.experimental.Accessors;
1616
import lombok.experimental.FieldDefaults;
17+
import org.jspecify.annotations.Nullable;
1718

1819
/**
1920
* Publish complete (QoS 2 delivery part 3).
2021
*/
2122
@Getter
22-
@Accessors(fluent = true, chain = false)
23+
@Accessors
2324
@FieldDefaults(level = AccessLevel.PRIVATE)
24-
public class PublishCompleteMqttInMessage extends MqttInMessage implements TrackableMessage {
25+
public class PublishCompleteMqttInMessage extends TrackableMqttInMessage implements TrackableMessage {
2526

2627
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_COMPLETE.ordinal();
28+
public static final byte MESSAGE_FLAGS = 0b0000_0000;
2729

2830
static {
2931
DebugUtils.registerIncludedFields("reasonCode", "messageId");
@@ -51,29 +53,29 @@ public class PublishCompleteMqttInMessage extends MqttInMessage implements Track
5153
MqttMessageProperty.USER_PROPERTY);
5254

5355
PublishCompletedReasonCode reasonCode;
54-
int messageId;
5556

5657
// properties
58+
@Nullable
5759
String reason;
5860

5961
public PublishCompleteMqttInMessage(byte messageFlags) {
6062
super(messageFlags);
6163
this.reasonCode = PublishCompletedReasonCode.SUCCESS;
62-
this.reason = "";
6364
}
6465

6566
@Override
6667
public byte messageType() {
6768
return MESSAGE_TYPE;
6869
}
6970

71+
@Override
72+
protected boolean validMessageFlags(byte messageFlags) {
73+
return messageFlags == MESSAGE_FLAGS;
74+
}
75+
7076
@Override
7177
protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) {
7278
super.readVariableHeader(connection, buffer);
73-
74-
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718083
75-
messageId = readShortUnsigned(buffer);
76-
7779
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901154
7880
if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) {
7981
reasonCode = PublishCompletedReasonCode.ofCode(readByteUnsigned(buffer));
@@ -94,8 +96,13 @@ protected Set<MqttMessageProperty> availableProperties() {
9496
@Override
9597
protected void applyProperty(MqttMessageProperty property, String value) {
9698
switch (property) {
97-
case REASON_STRING -> reason = value;
98-
default -> unexpectedProperty(property);
99+
case REASON_STRING -> {
100+
if (reason != null) {
101+
alreadyPresentedProperty(property);
102+
}
103+
reason = value;
104+
}
105+
default -> unsupportedProperty(property);
99106
}
100107
}
101108
}

network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import javasabr.mqtt.model.MqttProperties;
1010
import javasabr.mqtt.model.PayloadFormat;
1111
import javasabr.mqtt.model.QoS;
12-
import javasabr.mqtt.model.exception.MalformedProtocolMqttException;
1312
import javasabr.mqtt.model.message.MqttMessageType;
1413
import javasabr.mqtt.network.MqttConnection;
1514
import javasabr.rlib.collections.array.ArrayFactory;
@@ -328,7 +327,7 @@ protected void applyProperty(MqttMessageProperty property, long value) {
328327
case PAYLOAD_FORMAT_INDICATOR -> payloadFormat = PayloadFormat.fromCode(value);
329328
case TOPIC_ALIAS -> {
330329
if (topicAlias != MqttProperties.TOPIC_ALIAS_NOT_SET) {
331-
throw new MalformedProtocolMqttException("[%s] is already presented".formatted(property));
330+
alreadyPresentedProperty(property);
332331
}
333332
topicAlias = Math.toIntExact(value);
334333
}
@@ -339,7 +338,7 @@ protected void applyProperty(MqttMessageProperty property, long value) {
339338
}
340339
subscriptionIds.add((int) value);
341340
}
342-
default -> unexpectedProperty(property);
341+
default -> unsupportedProperty(property);
343342
}
344343
}
345344

@@ -348,17 +347,17 @@ protected void applyProperty(MqttMessageProperty property, String value) {
348347
switch (property) {
349348
case RESPONSE_TOPIC -> {
350349
if (rawResponseTopicName != null) {
351-
throw new MalformedProtocolMqttException("[%s] is already presented".formatted(property));
350+
alreadyPresentedProperty(property);
352351
}
353352
rawResponseTopicName = value;
354353
}
355354
case CONTENT_TYPE -> {
356355
if (contentType != null) {
357-
throw new MalformedProtocolMqttException("[%s] is already presented".formatted(property));
356+
alreadyPresentedProperty(property);
358357
}
359358
contentType = value;
360359
}
361-
default -> unexpectedProperty(property);
360+
default -> unsupportedProperty(property);
362361
}
363362
}
364363

@@ -367,11 +366,11 @@ protected void applyProperty(MqttMessageProperty property, byte[] value) {
367366
switch (property) {
368367
case CORRELATION_DATA -> {
369368
if (correlationData != null) {
370-
throw new MalformedProtocolMqttException("[%s] is already presented".formatted(property));
369+
alreadyPresentedProperty(property);
371370
}
372371
correlationData = value;
373372
}
374-
default -> unexpectedProperty(property);
373+
default -> unsupportedProperty(property);
375374
}
376375
}
377376
}

0 commit comments

Comments
 (0)