Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package javasabr.mqtt.service.acl

import groovy.transform.Field

import javasabr.mqtt.model.acl.Operation
import javasabr.mqtt.model.acl.rule.Rule
import javasabr.mqtt.model.exception.AclConfigurationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import static javasabr.mqtt.model.acl.Action.ALLOW;
import static javasabr.mqtt.model.acl.Operation.SUBSCRIBE;

import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.acl.Action;
import javasabr.mqtt.model.acl.Operation;
import javasabr.mqtt.model.acl.condition.MqttUserCondition;
import javasabr.mqtt.model.acl.condition.TopicCondition;
import javasabr.mqtt.model.topic.AbstractTopic;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode(callSuper = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import static javasabr.mqtt.model.acl.Action.DENY;
import static javasabr.mqtt.model.acl.Operation.PUBLISH;

import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.acl.Action;
import javasabr.mqtt.model.acl.Operation;
import javasabr.mqtt.model.acl.condition.MqttUserCondition;
import javasabr.mqtt.model.acl.condition.TopicCondition;
import javasabr.mqtt.model.topic.AbstractTopic;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode(callSuper = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import static javasabr.mqtt.model.acl.Action.DENY;
import static javasabr.mqtt.model.acl.Operation.SUBSCRIBE;

import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.acl.Action;
import javasabr.mqtt.model.acl.Operation;
import javasabr.mqtt.model.acl.condition.MqttUserCondition;
import javasabr.mqtt.model.acl.condition.TopicCondition;
import javasabr.mqtt.model.topic.AbstractTopic;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode(callSuper = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
env.getProperty(
"mqtt.external.connection.topic.alias.maximum",
int.class,
MqttProperties.TOPIC_ALIAS_MAX_DEFAULT),
0),
env.getProperty(
"mqtt.external.connection.default.session.expiration.time",
long.class,
Expand All @@ -319,15 +319,15 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
env.getProperty(
"mqtt.external.connection.retain.available",
boolean.class,
MqttProperties.RETAIN_AVAILABLE_DEFAULT),
false), // set false because currently it's not implemented and we should not allow for clients to use it
env.getProperty(
"mqtt.external.connection.wildcard.subscription.available",
boolean.class,
MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT),
env.getProperty(
"mqtt.external.connection.subscription.id.available",
boolean.class,
MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT),
false), // set false because currently it's not implemented and we should not allow for clients to use it
env.getProperty(
"mqtt.external.connection.shared.subscription.available",
boolean.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package javasabr.mqtt.service;

import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.MutableArray;

/**
* Subscription service
*/
public interface SubscriptionService {

NetworkMqttUser resolveClient(Subscriber subscriber);


default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
}
Expand All @@ -32,7 +29,7 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
* @param subscriptions the list of request to subscribe topics
* @return array of subscribe ack reason codes
*/
Array<SubscribeAckReasonCode> subscribe(NetworkMqttUser user, MqttSession session, Array<Subscription> subscriptions);
Array<SubscribeAckReasonCode> subscribe(MqttUser user, MqttSession session, Array<Subscription> subscriptions);

/**
* Removes MQTT client from listening to the topics.
Expand All @@ -41,9 +38,9 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
* @param topicFilters topic filters
* @return array of unsubscribe ack reason codes
*/
Array<UnsubscribeAckReasonCode> unsubscribe(NetworkMqttUser user, MqttSession session, Array<TopicFilter> topicFilters);
Array<UnsubscribeAckReasonCode> unsubscribe(MqttUser user, MqttSession session, Array<TopicFilter> topicFilters);

void cleanSubscriptions(NetworkMqttUser user, MqttSession session);
void cleanSubscriptions(MqttUser user, MqttSession session);

void restoreSubscriptions(NetworkMqttUser user, MqttSession session);
void restoreSubscriptions(MqttUser user, MqttSession session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.SUCCESS;

import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
import javasabr.mqtt.model.session.ActiveSubscriptions;
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.SharedTopicFilter;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.ArrayFactory;
Expand All @@ -37,14 +36,6 @@ public InMemorySubscriptionService() {
this.subscriberTree = new ConcurrentSubscriberTree();
}

@Override
public NetworkMqttUser resolveClient(Subscriber subscriber) {
if (subscriber instanceof SingleSubscriber single) {
return (NetworkMqttUser) single.user();
}
throw new IllegalArgumentException("Unexpected subscriber: " + subscriber);
}

@Override
public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber> container, TopicName topicName) {
Array<SingleSubscriber> matched = subscriberTree.matches(topicName);
Expand All @@ -54,7 +45,7 @@ public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber>

@Override
public Array<SubscribeAckReasonCode> subscribe(
NetworkMqttUser user,
MqttUser user,
MqttSession session,
Array<Subscription> subscriptions) {

Expand All @@ -69,7 +60,7 @@ public Array<SubscribeAckReasonCode> subscribe(
return subscribeResults;
}

private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttSession session, Subscription subscription) {
private SubscribeAckReasonCode addSubscription(MqttUser user, MqttSession session, Subscription subscription) {
MqttClientConnectionConfig connectionConfig = user.connectionConfig();
TopicFilter topicFilter = subscription.topicFilter();
if (topicFilter.isInvalid()) {
Expand All @@ -90,7 +81,7 @@ private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttSession

@Override
public Array<UnsubscribeAckReasonCode> unsubscribe(
NetworkMqttUser user,
MqttUser user,
MqttSession session,
Array<TopicFilter> topicFilters) {

Expand All @@ -105,7 +96,7 @@ public Array<UnsubscribeAckReasonCode> unsubscribe(
return unsubscribeResults;
}

private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttSession session, TopicFilter topicFilter) {
private UnsubscribeAckReasonCode removeSubscription(MqttUser user, MqttSession session, TopicFilter topicFilter) {
if (topicFilter.isInvalid()) {
return UnsubscribeAckReasonCode.TOPIC_FILTER_INVALID;
} else if (subscriberTree.unsubscribe(user, topicFilter)) {
Expand All @@ -119,7 +110,7 @@ private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttSe
}

@Override
public void cleanSubscriptions(NetworkMqttUser user, MqttSession session) {
public void cleanSubscriptions(MqttUser user, MqttSession session) {
Array<Subscription> subscriptions = session
.activeSubscriptions()
.subscriptions();
Expand All @@ -129,7 +120,7 @@ public void cleanSubscriptions(NetworkMqttUser user, MqttSession session) {
}

@Override
public void restoreSubscriptions(NetworkMqttUser user, MqttSession session) {
public void restoreSubscriptions(MqttUser user, MqttSession session) {
Array<Subscription> subscriptions = session
.activeSubscriptions()
.subscriptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected void malformedProtocolError(MqttConnection connection, U user, Excepti
MqttOutMessage feedback = messageOutFactoryService
.resolveFactory(user)
.newDisconnect(user, DisconnectReasonCode.MALFORMED_PACKET, exception.getMessage());
user.send(feedback)
user.sendAsync(feedback)
.thenAccept(_ -> connection.close());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void processValidMessage(
}

private void reject(ExternalNetworkMqttUser user, ConnectAckReasonCode connectAckReasonCode) {
user.sendAsync(messageOutFactoryService
user.sendInBackground(messageOutFactoryService
.resolveFactory(user)
.newConnectAck(user, connectAckReasonCode));
}
Expand Down Expand Up @@ -209,7 +209,7 @@ private Mono<Boolean> onConnected(
subscriptionService.restoreSubscriptions(user, session);

return Mono.fromFuture(user
.send(connectAck)
.sendAsync(connectAck)
.thenApply(result -> onSentConnAck(user, session, result)));
}

Expand All @@ -220,7 +220,7 @@ private boolean onSentConnAck(ConfigurableNetworkMqttUser user, NetworkMqttSessi
return false;
}

session.resendPendingPackets(user);
session.resendNotConfirmedPublishesTo(user);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package javasabr.mqtt.service.message.handler.impl;

import javasabr.mqtt.model.message.TrackableMqttMessage;
import javasabr.mqtt.model.session.ProcessingPublishes;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.network.message.in.MqttInMessage;
import javasabr.mqtt.network.session.NetworkMqttSession;
import javasabr.mqtt.service.MessageOutFactoryService;

public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMqttMessage>
extends AbstractMqttInMessageHandler<ExternalNetworkMqttUser, P> {
public abstract class ProcessingOutPublishesMqttInMessageHandler<M extends MqttInMessage & TrackableMqttMessage>
extends AbstractMqttInMessageHandler<ExternalNetworkMqttUser, M> {

protected PendingOutResponseMqttInMessageHandler(
Class<P> expectedNetworkPacket,
protected ProcessingOutPublishesMqttInMessageHandler(
Class<M> expectedNetworkPacket,
MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, expectedNetworkPacket, messageOutFactoryService);
}
Expand All @@ -21,7 +22,8 @@ protected void processValidMessage(
MqttConnection connection,
ExternalNetworkMqttUser user,
NetworkMqttSession session,
P message) {
session.updateOutPendingPacket(user, message);
M message) {
ProcessingPublishes processingPublishes = session.outProcessingPublishes();
processingPublishes.apply(user, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import javasabr.mqtt.network.message.in.PublishAckMqttInMessage;
import javasabr.mqtt.service.MessageOutFactoryService;

public class PublishAckMqttInMessageHandler extends PendingOutResponseMqttInMessageHandler<PublishAckMqttInMessage> {
public class PublishAckMqttInMessageHandler extends
ProcessingOutPublishesMqttInMessageHandler<PublishAckMqttInMessage> {

public PublishAckMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(PublishAckMqttInMessage.class, messageOutFactoryService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import javasabr.mqtt.service.MessageOutFactoryService;

public class PublishCompleteMqttInMessageHandler extends
PendingOutResponseMqttInMessageHandler<PublishCompleteMqttInMessage> {
ProcessingOutPublishesMqttInMessageHandler<PublishCompleteMqttInMessage> {

public PublishCompleteMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(PublishCompleteMqttInMessage.class, messageOutFactoryService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import javasabr.mqtt.service.MessageOutFactoryService;

public class PublishReceiveMqttInMessageHandler extends
PendingOutResponseMqttInMessageHandler<PublishReceivedMqttInMessage> {
ProcessingOutPublishesMqttInMessageHandler<PublishReceivedMqttInMessage> {

public PublishReceiveMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(PublishReceivedMqttInMessage.class, messageOutFactoryService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void processValidMessage(
}

private void handleUnknownMessageId(ExternalNetworkMqttUser client, int messageId) {
client.sendAsync(messageOutFactoryService
client.sendInBackground(messageOutFactoryService
.resolveFactory(client)
.newPublishCompleted(messageId, PublishCompletedReasonCode.PACKET_IDENTIFIER_NOT_FOUND));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void handleMessageIdIsInUse(
Array<SubscribeAckReasonCode> subscribeResults = Array.repeated(
SubscribeAckReasonCode.PACKET_IDENTIFIER_IN_USE,
subscribeMessage.subscriptionsCount());
user.sendAsync(messageOutFactoryService
user.sendInBackground(messageOutFactoryService
.resolveFactory(user)
.newSubscribeAck(subscribeMessage.messageId(), subscribeResults));
}
Expand All @@ -156,7 +156,7 @@ private void handleSubscriptionIdNotSupported(
MqttOutMessage response = messageOutFactoryService
.resolveFactory(user)
.newSubscribeAck(messageId, subscribeResults);
user.send(response)
user.sendAsync(response)
.thenAccept(_ -> session
.inMessageTracker()
.remove(messageId));
Expand All @@ -171,7 +171,7 @@ private void sendSubscribeResults(
MqttOutMessage response = messageOutFactoryService
.resolveFactory(user)
.newSubscribeAck(messageId, subscribeResults);
user.send(response)
user.sendAsync(response)
.thenAccept(_ -> session
.inMessageTracker()
.remove(messageId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void processValidMessage(
.newUnsubscribeAck(unsubscribeMessage.messageId(), unsubscribeResults);

user
.send(response)
.sendAsync(response)
.thenAccept(_ -> session
.inMessageTracker()
.remove(messageId));
Expand All @@ -83,7 +83,7 @@ private void handleMessageIdIsInUse(
Array<UnsubscribeAckReasonCode> unsubscribeResults = Array.repeated(
UnsubscribeAckReasonCode.PACKET_IDENTIFIER_IN_USE,
unsubscribeMessage.topicFiltersCount());
user.sendAsync(messageOutFactoryService
user.sendInBackground(messageOutFactoryService
.resolveFactory(user)
.newUnsubscribeAck(unsubscribeMessage.messageId(), unsubscribeResults));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public enum PublishHandlingResult {

// CUSTOM
NOT_EXPECTED_CLIENT(
true,
PublishAckReasonCode.UNSPECIFIED_ERROR,
PublishReceivedReasonCode.UNSPECIFIED_ERROR),
SESSION_IS_ALREADY_CLOSED(
true,
PublishAckReasonCode.UNSPECIFIED_ERROR,
PublishReceivedReasonCode.UNSPECIFIED_ERROR);
Expand Down
Loading
Loading