Skip to content

Commit 9bf8458

Browse files
authored
Improve publish implementation, part 2 (#56)
Improve publish implementation, part 2 (#56)
1 parent 0bd0dab commit 9bf8458

File tree

118 files changed

+1735
-777
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+1735
-777
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,12 @@ PublishDeliveringService publishDeliveringService(
228228
@Bean
229229
MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
230230
SubscriptionService subscriptionService,
231-
PublishDeliveringService publishDeliveringService) {
232-
return new Qos0MqttPublishInMessageHandler(subscriptionService, publishDeliveringService);
231+
PublishDeliveringService publishDeliveringService,
232+
MessageOutFactoryService messageOutFactoryService) {
233+
return new Qos0MqttPublishInMessageHandler(
234+
subscriptionService,
235+
publishDeliveringService,
236+
messageOutFactoryService);
233237
}
234238

235239
@Bean

lombok.config

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
lombok.log.custom.declaration = javasabr.rlib.logger.api.Logger javasabr.rlib.logger.api.LoggerManager.getLogger(TYPE)
1+
lombok.log.custom.declaration = javasabr.rlib.logger.api.Logger javasabr.rlib.logger.api.LoggerManager.getLogger(TYPE)
2+
lombok.accessors.fluent=true
3+
lombok.accessors.chain=false

model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public interface MqttProtocolErrors {
99
String INVALID_RESPONSE_TOPIC_NAME = "Provided invalid ResponseTopicName";
1010
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Unsupported 'QoS' or 'RetainHandling'";
1111
String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'";
12+
String NOT_EXPECTED_MESSAGE_ID = "'Packet Identifier' must be zero'";
1213
String PROTOCOL_LEVEL_UNSUPPORTED_NO_LOCAL_OPTION = "'NoLocal' option is not available on this protocol level";
1314
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_AS_PUBLISH_OPTION = "'RetainAsPublished' option is not available on this protocol level";
1415
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_HANDLING_OPTION = "'RetainHandling' option is not available on this protocol level";
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package javasabr.mqtt.model;
2+
3+
public interface MqttUser {}

model/src/main/java/javasabr/mqtt/model/QoS.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import lombok.experimental.FieldDefaults;
1111

1212
@Getter
13+
@Accessors
1314
@RequiredArgsConstructor
14-
@Accessors(fluent = true, chain = false)
1515
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
1616
public enum QoS implements NumberedEnum<QoS> {
1717
AT_MOST_ONCE(0, SubscribeAckReasonCode.GRANTED_QOS_0),
@@ -38,7 +38,11 @@ public QoS lower(QoS alternative) {
3838
return level > alternative.level ? alternative : this;
3939
}
4040

41-
public boolean isLower(QoS another) {
41+
public boolean isLowerThan(QoS another) {
4242
return level < another.level;
4343
}
44+
45+
public boolean isHigherThan(QoS another) {
46+
return level > another.level;
47+
}
4448
}

network/src/main/java/javasabr/mqtt/network/message/MqttMessageType.java renamed to model/src/main/java/javasabr/mqtt/model/message/MqttMessageType.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.network.message;
1+
package javasabr.mqtt.model.message;
22

33
import lombok.AccessLevel;
44
import lombok.CustomLog;
@@ -41,12 +41,12 @@ public enum MqttMessageType {
4141
/**
4242
* A PUBREL Packet is the response to a PUBREC Packet. It is the third packet of the QoS 2 protocol exchange.
4343
*/
44-
PUBLISH_RELEASED(6),
44+
PUBLISH_RELEASE(6),
4545
/**
4646
* The PUBCOMP packet is the response to a PUBREL packet. It is the fourth and final packet of the QoS 2 protocol
4747
* exchange.
4848
*/
49-
PUBLISH_COMPLETED(7),
49+
PUBLISH_COMPLETE(7),
5050
/**
5151
* The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions. Each Subscription
5252
* registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client in order to
@@ -98,7 +98,7 @@ public enum MqttMessageType {
9898
* such as challenge / response authentication. It is a Protocol Error for the Client or Server to send an AUTH packet
9999
* if the CONNECT packet did not contain the same Authentication Method.
100100
*/
101-
AUTHENTICATE(15),
101+
AUTHENTICATION(15),
102102

103103
/**
104104
* Not supported
@@ -108,7 +108,7 @@ public enum MqttMessageType {
108108
private static final MqttMessageType[] VALUES = values();
109109

110110
public static MqttMessageType fromByte(byte messageType) {
111-
if (messageType < 0 || messageType > AUTHENTICATE.typeIndex()) {
111+
if (messageType < 0 || messageType > AUTHENTICATION.typeIndex()) {
112112
log.warning(messageType, "Invalid message type:[%s]"::formatted);
113113
return INVALID;
114114
} else {

model/src/main/java/javasabr/mqtt/model/publishing/Publish.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package javasabr.mqtt.model.publishing;
22

33
import javasabr.mqtt.base.util.DebugUtils;
4+
import javasabr.mqtt.model.MqttProperties;
45
import javasabr.mqtt.model.PayloadFormat;
56
import javasabr.mqtt.model.QoS;
67
import javasabr.mqtt.model.TrackableMessage;
@@ -26,10 +27,34 @@ public record Publish(
2627
PayloadFormat payloadFormat,
2728
Array<StringPair> userProperties) implements TrackableMessage {
2829

30+
private static final Array<StringPair> EMPTY_USER_PROPERTIES = Array.empty(StringPair.class);
31+
2932
static {
3033
DebugUtils.registerIncludedFields("topicName", "messageId", "qos", "topicAlias", "payloadFormat");
3134
}
3235

36+
public static Publish minimal(int messageId, QoS qos, TopicName topicName, byte[] payload) {
37+
return new Publish(
38+
messageId,
39+
qos,
40+
topicName,
41+
null,
42+
payload,
43+
false,
44+
false,
45+
null,
46+
IntArray.EMPTY,
47+
null,
48+
MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET,
49+
MqttProperties.TOPIC_ALIAS_NOT_SET,
50+
PayloadFormat.BINARY,
51+
EMPTY_USER_PROPERTIES);
52+
}
53+
54+
public static Publish minimal(QoS qos, TopicName topicName, byte[] payload) {
55+
return minimal(MqttProperties.MESSAGE_ID_IS_NOT_SET, qos, topicName, payload);
56+
}
57+
3358
public Publish withDuplicated() {
3459
if (duplicated) {
3560
return this;

model/src/main/java/javasabr/mqtt/model/reason/code/AuthenticateReasonCode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public enum AuthenticateReasonCode {
2727

2828
var maxId = Stream
2929
.of(values())
30-
.mapToInt(AuthenticateReasonCode::getValue)
30+
.mapToInt(AuthenticateReasonCode::value)
3131
.max()
3232
.orElse(0);
3333

model/src/main/java/javasabr/mqtt/model/reason/code/ConnectAckReasonCode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public enum ConnectAckReasonCode {
105105

106106
var maxId = Stream
107107
.of(values())
108-
.mapToInt(ConnectAckReasonCode::getMqtt5)
108+
.mapToInt(ConnectAckReasonCode::mqtt5)
109109
.map(value -> Byte.toUnsignedInt((byte) value))
110110
.max()
111111
.orElse(0);
Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,69 @@
11
package javasabr.mqtt.model.reason.code;
22

3-
import java.util.stream.Stream;
4-
import javasabr.rlib.common.util.ObjectUtils;
5-
import lombok.AccessLevel;
3+
import javasabr.rlib.common.util.NumberedEnum;
4+
import javasabr.rlib.common.util.NumberedEnumMap;
65
import lombok.Getter;
76
import lombok.RequiredArgsConstructor;
87
import lombok.experimental.Accessors;
9-
import lombok.experimental.FieldDefaults;
108

119
@Getter
10+
@Accessors
1211
@RequiredArgsConstructor
13-
@Accessors(fluent = true, chain = false)
14-
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
15-
public enum PublishAckReasonCode {
12+
public enum PublishAckReasonCode implements NumberedEnum<PublishAckReasonCode>, ReasonCode {
1613
/**
1714
* The message is accepted. Publication of the QoS 1 message proceeds.
1815
*/
19-
SUCCESS((byte) 0x00),
16+
SUCCESS(0x00),
2017
/**
2118
* The message is accepted but there are no subscribers. This is sent only by the Server. If the Server knows that
2219
* there are no matching subscribers, it MAY use this Reason Code instead of 0x00 (Success).
2320
*/
24-
NO_MATCHING_SUBSCRIBERS((byte) 0x10),
21+
NO_MATCHING_SUBSCRIBERS(0x10),
2522

2623
// ERRORS
2724

2825
/**
2926
* The receiver does not accept the publish but either does not want to reveal the reason, or it does not match one of
3027
* the other values.
3128
*/
32-
UNSPECIFIED_ERROR((byte) 0x80),
29+
UNSPECIFIED_ERROR(0x80),
3330
/**
3431
* The PUBLISH is valid but the receiver is not willing to accept it.
3532
*/
36-
IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x83),
33+
IMPLEMENTATION_SPECIFIC_ERROR(0x83),
3734
/**
3835
* The PUBLISH is not authorized.
3936
*/
40-
NOT_AUTHORIZED((byte) 0x87),
37+
NOT_AUTHORIZED(0x87),
4138
/**
4239
* The Topic Name is not malformed, but is not accepted by this Client or Server.
4340
*/
44-
TOPIC_NAME_INVALID((byte) 0x90),
41+
TOPIC_NAME_INVALID(0x90),
4542
/**
4643
* The Packet Identifier is already in use. This might indicate a mismatch in the Session State between the Client and
4744
* Server.
4845
*/
49-
PACKET_IDENTIFIER_IN_USE((byte) 0x91),
46+
PACKET_IDENTIFIER_IN_USE(0x91),
5047
/**
5148
* An implementation or administrative imposed limit has been exceeded.
5249
*/
53-
QUOTA_EXCEEDED((byte) 0x97),
50+
QUOTA_EXCEEDED(0x97),
5451
/**
5552
* The payload format does not match the specified Payload Format Indicator.
5653
*/
57-
PAYLOAD_FORMAT_INVALID((byte) 0x99);
54+
PAYLOAD_FORMAT_INVALID(0x99);
5855

59-
private static final PublishAckReasonCode[] VALUES;
56+
private static final NumberedEnumMap<PublishAckReasonCode> NUMBERED_MAP =
57+
new NumberedEnumMap<>(PublishAckReasonCode.class);
6058

61-
static {
62-
63-
int maxValue = Stream
64-
.of(values())
65-
.mapToInt(PublishAckReasonCode::value)
66-
.map(value -> Byte.toUnsignedInt((byte) value))
67-
.max()
68-
.orElse(0);
69-
70-
var values = new PublishAckReasonCode[maxValue + 1];
71-
72-
for (var value : values()) {
73-
values[Byte.toUnsignedInt(value.value)] = value;
74-
}
75-
76-
VALUES = values;
59+
public static PublishAckReasonCode ofCode(int code) {
60+
return NUMBERED_MAP.require(code);
7761
}
7862

79-
public static PublishAckReasonCode ofValue(int value) {
80-
return ObjectUtils.notNull(
81-
VALUES[value],
82-
value,
83-
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
84-
}
63+
private final int code;
8564

86-
byte value;
65+
@Override
66+
public int number() {
67+
return code;
68+
}
8769
}

0 commit comments

Comments
 (0)