Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package javasabr.mqtt.service.message.handler.impl;

import static javasabr.mqtt.base.util.ReactorUtils.ifTrue;
import static javasabr.mqtt.model.MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED;
import static javasabr.mqtt.model.MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED;
import static javasabr.mqtt.model.MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET;
import static javasabr.mqtt.model.MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET;
import static javasabr.mqtt.model.MqttProperties.SERVER_KEEP_ALIVE_DISABLED;
import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED;
import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED;
import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET;
import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED;
import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED;
import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET;
import static javasabr.mqtt.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
import static javasabr.mqtt.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;

Expand Down Expand Up @@ -143,22 +143,22 @@ private void resolveClientConnectionConfig(MqttClient.UnsafeMqttClient client, C
? packet.sessionExpiryInterval()
: SESSION_EXPIRY_INTERVAL_DISABLED;

if (sessionExpiryInterval == SESSION_EXPIRY_INTERVAL_UNDEFINED) {
if (sessionExpiryInterval == SESSION_EXPIRY_INTERVAL_IS_NOT_SET) {
sessionExpiryInterval = serverConfig.defaultSessionExpiryInterval();
}

// select result receive max
int receiveMaxPublishes = packet.receiveMaxPublishes() == RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED
int receiveMaxPublishes = packet.receiveMaxPublishes() == RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET
? serverConfig.receiveMaxPublishes()
: Math.min(packet.receiveMaxPublishes(), serverConfig.receiveMaxPublishes());

// select result maximum packet size
var maximumPacketSize = packet.maxPacketSize() == MAXIMUM_MESSAGE_SIZE_UNDEFINED
var maximumPacketSize = packet.maxPacketSize() == MAXIMUM_MESSAGE_SIZE_IS_NOT_SET
? serverConfig.maxMessageSize()
: Math.min(packet.maxPacketSize(), serverConfig.maxMessageSize());

// select result topic alias maximum
var topicAliasMaxValue = packet.topicAliasMaxValue() == TOPIC_ALIAS_MAXIMUM_UNDEFINED
var topicAliasMaxValue = packet.topicAliasMaxValue() == TOPIC_ALIAS_MAXIMUM_IS_NOT_SET
? TOPIC_ALIAS_MAXIMUM_DISABLED
: Math.min(packet.topicAliasMaxValue(), serverConfig.topicAliasMaxValue());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private void handleInvalidTopicAlias(ExternalMqttClient client) {
private void handleInvalidPayloadFormat(ExternalMqttClient client) {
MqttOutMessage response = messageOutFactoryService
.resolveFactory(client)
.newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.INVALID_PAYLOAD_FORMAT);
.newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT);
client.closeWithReason(response);
}

Expand All @@ -218,7 +218,7 @@ private void handleInvalidResponseTopicName(ExternalMqttClient client) {
private void handleInvalidMessageExpiryInterval(ExternalMqttClient client) {
MqttOutMessage response = messageOutFactoryService
.resolveFactory(client)
.newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.INVALID_MESSAGE_EXPIRY_INTERVAL);
.newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL);
client.closeWithReason(response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public MqttOutMessage newDisconnect(
@Override
public MqttOutMessage newAuthenticate(
AuthenticateReasonCode reasonCode,
String authenticateMethod,
byte[] authenticateData,
Array<StringPair> userProperties,
String reason) {
@Nullable String reason,
@Nullable String authenticateMethod,
byte @Nullable [] authenticateData,
Array<StringPair> userProperties) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,16 @@ public MqttOutMessage newDisconnect(
@Override
public MqttOutMessage newAuthenticate(
AuthenticateReasonCode reasonCode,
String authenticateMethod,
byte[] authenticateData,
Array<StringPair> userProperties,
String reason) {
return new AuthenticationMqtt5OutMessage(userProperties, reasonCode, reason, authenticateMethod, authenticateData);
@Nullable String reason,
@Nullable String authenticateMethod,
byte @Nullable [] authenticateData,
Array<StringPair> userProperties) {
return new AuthenticationMqtt5OutMessage(
reasonCode,
reason,
authenticateMethod,
authenticateData,
userProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,21 @@ public MqttOutMessage newDisconnect(

public abstract MqttOutMessage newAuthenticate(
AuthenticateReasonCode reasonCode,
String authenticateMethod,
byte[] authenticateData,
Array<StringPair> userProperties,
String reason);
@Nullable String reason,
@Nullable String authenticateMethod,
byte @Nullable [] authenticateData,
Array<StringPair> userProperties);

public MqttOutMessage newAuthenticate(
AuthenticateReasonCode reasonCode,
String authenticateMethod,
byte[] authenticateData) {
@Nullable String authenticateMethod,
byte @Nullable [] authenticateData) {
return newAuthenticate(
reasonCode,
null,
authenticateMethod,
authenticateData, EMPTY_USER_PROPERTIES,
StringUtils.EMPTY);
authenticateData,
EMPTY_USER_PROPERTIES);
}

public abstract MqttOutMessage newPingRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification {
then:
def disconnectReason = mqttClient.nextSentMessage(DisconnectMqtt5OutMessage)
disconnectReason.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR
disconnectReason.reason() == MqttProtocolErrors.INVALID_PAYLOAD_FORMAT
disconnectReason.reason() == MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT
disconnectReason.serverReference() == null
}

Expand All @@ -244,7 +244,7 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification {
then:
def disconnectReason = mqttClient.nextSentMessage(DisconnectMqtt5OutMessage)
disconnectReason.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR
disconnectReason.reason() == MqttProtocolErrors.INVALID_MESSAGE_EXPIRY_INTERVAL
disconnectReason.reason() == MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL
disconnectReason.serverReference() == null
}

Expand Down
32 changes: 19 additions & 13 deletions model/src/main/java/javasabr/mqtt/model/MqttProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ public interface MqttProperties {
long SESSION_EXPIRY_INTERVAL_DEFAULT = 120;
long SESSION_EXPIRY_INTERVAL_MIN = 0;
long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFFL;
long SESSION_EXPIRY_INTERVAL_UNDEFINED = -1;
long SESSION_EXPIRY_INTERVAL_IS_NOT_SET = -1;

int RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED = -1;
int RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET = 0;
int RECEIVE_MAXIMUM_PUBLISHES_MIN = 1;
int RECEIVE_MAXIMUM_PUBLISHES_DEFAULT = 10;
int RECEIVE_MAXIMUM_PUBLISHES_MAX = 0xFFFF;
int RECEIVE_MAXIMUM_PUBLISHES_DEFAULT = RECEIVE_MAXIMUM_PUBLISHES_MAX;

int MAXIMUM_MESSAGE_SIZE_UNDEFINED = -1;
int MAXIMUM_MESSAGE_SIZE_IS_NOT_SET = -1;
int MAXIMUM_MESSAGE_SIZE_DEFAULT = 3074;
int MAXIMUM_MESSAGE_SIZE_MIN = 128;
int MAXIMUM_MESSAGE_SIZE_MIN = 64;
int MAXIMUM_MESSAGE_SIZE_MAX = MAXIMUM_PROTOCOL_MESSAGE_SIZE;

int MAXIMUM_STRING_LENGTH = 1024;
Expand All @@ -31,10 +31,10 @@ public interface MqttProperties {
long MESSAGE_EXPIRY_INTERVAL_INFINITY = 0;
long MESSAGE_EXPIRY_INTERVAL_MIN = 0;

int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1;
int TOPIC_ALIAS_MAXIMUM_IS_NOT_SET = 0;
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;

int SERVER_KEEP_ALIVE_UNDEFINED = -1;
int SERVER_KEEP_ALIVE_IS_NOT_SET = -1;
int SERVER_KEEP_ALIVE_DISABLED = 0;
int SERVER_KEEP_ALIVE_DEFAULT = 0;
int SERVER_KEEP_ALIVE_MIN = 0;
Expand All @@ -52,12 +52,18 @@ public interface MqttProperties {

int MESSAGE_ID_IS_NOT_SET = 0;

boolean SESSIONS_ENABLED_DEFAULT = true;
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;
boolean RETAIN_AVAILABLE_DEFAULT = false;
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
boolean RETAIN_AVAILABLE_DEFAULT = true;
int RETAIN_AVAILABLE_IS_NOT_SET = -1;

boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = true;
int WILDCARD_SUBSCRIPTION_AVAILABLE_IS_NOT_SET = -1;

boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = true;
int SUBSCRIPTION_IDENTIFIER_AVAILABLE_IS_NOT_SET = -1;

int PACKET_ID_FOR_QOS_0 = 0;
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = true;
int SHARED_SUBSCRIPTION_AVAILABLE_IS_NOT_SET = -1;

boolean SESSIONS_ENABLED_DEFAULT = true;
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;
}
16 changes: 14 additions & 2 deletions model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,20 @@ public interface MqttProtocolErrors {
String NO_ANY_TOPIC_FILTERS = "Not provided any information about 'Topic Filters'";
String NO_ANY_TOPIC_NANE = "Not provided any information about TopicName";
//String INVALID_TOPIC_ALIAS = "Provided invalid TopicAlias";
String INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat";
String INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval";

String PROVIDED_INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat";
String PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval";
String PROVIDED_INVALID_SESSION_EXPIRY_INTERVAL = "Provided invalid 'Session Expiry Interval'";
String PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES = "Provided invalid 'Receive Maximum'";
String PROVIDED_INVALID_MAX_QOS = "Provided invalid 'Maximum QoS'";
String PROVIDED_INVALID_RETAIN_AVAILABLE = "Provided invalid 'Retain Available'";
String PROVIDED_INVALID_MAX_MESSAGE_SIZE = "Provided invalid 'Maximum Packet Size'";
String PROVIDED_INVALID_TOPIC_ALIAS_MAX = "Provided invalid 'Topic Alias Maximum'";
String PROVIDED_INVALID_WILDCARD_SUBSCRIPTION_AVAILABLE = "Provided invalid 'Wildcard Subscription Available'";
String PROVIDED_INVALID_SUBSCRIPTION_IDENTIFIERS_AVAILABLE = "Provided invalid 'Subscription Identifiers Available'";
String PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE = "Provided invalid 'Shared Subscription Available'";
String PROVIDED_INVALID_SERVER_KEEP_ALIVE = "Provided invalid 'Server Keep Alive'";

String INVALID_RESPONSE_TOPIC_NAME = "Provided invalid ResponseTopicName";
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Provided unsupported 'QoS' or 'RetainHandling'";
String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,40 @@
package javasabr.mqtt.model.reason.code;

import java.util.stream.Stream;
import javasabr.rlib.common.util.ObjectUtils;
import javasabr.rlib.common.util.NumberedEnum;
import javasabr.rlib.common.util.NumberedEnumMap;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;

@Getter
@Accessors
@RequiredArgsConstructor
public enum AuthenticateReasonCode {
public enum AuthenticateReasonCode implements NumberedEnum<AuthenticateReasonCode>, ReasonCode {

/**
* Authentication is successful. Server.
*/
SUCCESS((byte) 0x00),
SUCCESS(0x00),
/**
* Continue the authentication with another step. Client or Server.
*/
CONTINUE_AUTHENTICATION((byte) 0x18),
CONTINUE_AUTHENTICATION(0x18),
/**
* Initiate a re-authentication. Client.
*/
RE_AUTHENTICATE((byte) 0x19);
RE_AUTHENTICATE(0x19);

private static final AuthenticateReasonCode[] VALUES;
private static final NumberedEnumMap<AuthenticateReasonCode> NUMBERED_MAP =
new NumberedEnumMap<>(AuthenticateReasonCode.class);

static {

var maxId = Stream
.of(values())
.mapToInt(AuthenticateReasonCode::value)
.max()
.orElse(0);

var values = new AuthenticateReasonCode[maxId + 1];

for (var value : values()) {
values[value.value] = value;
}

VALUES = values;
public static AuthenticateReasonCode ofCode(int code) {
return NUMBERED_MAP.require(code);
}

public static AuthenticateReasonCode of(int index) {
return ObjectUtils.notNull(
VALUES[index],
index,
arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg));
}
private final int code;

private @Getter
final byte value;
@Override
public int number() {
return code;
}
}
Loading
Loading