Skip to content

Commit 37bab29

Browse files
committed
add java docs, fix publish issue, extend ConnectPublishSubscribeTest
1 parent 09c9e6e commit 37bab29

File tree

11 files changed

+129
-91
lines changed

11 files changed

+129
-91
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ private interface ChannelFactory extends
8383
}
8484

8585
@Bean
86-
@NotNull PublishingService publishingService() {
87-
return new SimplePublishingService(subscriptionService());
86+
@NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) {
87+
return new SimplePublishingService(subscriptionService);
8888
}
8989

9090
@Bean

src/main/java/com/ss/mqtt/broker/network/MqttClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private void onUnsubscribe(@NotNull UnsubscribeInPacket subscribe) {
115115
}
116116

117117
public void onPublish(@NotNull PublishInPacket publish) {
118-
var ackReasonCode = connection.getPublishingService().publish(connection.getClient(), publish);
118+
var ackReasonCode = connection.getPublishingService().publish(publish);
119119
connection.send(getPacketOutFactory().newPublishAck(
120120
connection.getClient(),
121121
publish.getPacketId(),

src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -173,26 +173,14 @@ protected void writeProperties(@NotNull ByteBuffer buffer) {
173173

174174
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc511988586
175175
writeProperty(buffer, PacketProperty.PAYLOAD_FORMAT_INDICATOR, stringPayload);
176-
writeProperty(
177-
buffer,
176+
writeProperty(buffer,
178177
PacketProperty.MESSAGE_EXPIRY_INTERVAL,
179178
0,
180179
MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT
181180
);
182-
if (topciAlias > MqttPropertyConstants.TOPIC_ALIAS_MIN && topciAlias < MqttPropertyConstants.TOPIC_ALIAS_MAX) {
183-
writeProperty(
184-
buffer,
185-
PacketProperty.TOPIC_ALIAS,
186-
topciAlias,
187-
MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DEFAULT
188-
);
189-
}
190-
if (responseTopic != null) {
191-
writeProperty(buffer, PacketProperty.RESPONSE_TOPIC, responseTopic);
192-
}
193-
if (correlationData != null) {
194-
writeProperty(buffer, PacketProperty.CORRELATION_DATA, correlationData);
195-
}
181+
writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topciAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT);
182+
writeNotEmptyProperty(buffer, PacketProperty.RESPONSE_TOPIC, responseTopic);
183+
writeNotEmptyProperty(buffer, PacketProperty.CORRELATION_DATA, correlationData);
196184
writeStringPairProperties(buffer, PacketProperty.USER_PROPERTY, userProperties);
197185
}
198186

src/main/java/com/ss/mqtt/broker/service/PublishingService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,17 @@
66
import com.ss.rlib.common.util.array.Array;
77
import org.jetbrains.annotations.NotNull;
88

9+
/**
10+
* Publishing service
11+
*/
912
public interface PublishingService {
1013

11-
@NotNull PublishAckReasonCode publish(@NotNull MqttClient mqttClient, @NotNull PublishInPacket publish);
14+
/**
15+
* Sends publish packet to all subscribers
16+
*
17+
* @param publish publish packet to send
18+
* @return publish ack reason code
19+
*/
20+
@NotNull PublishAckReasonCode publish(@NotNull PublishInPacket publish);
1221

1322
}

src/main/java/com/ss/mqtt/broker/service/Subscriber.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@
44
import com.ss.mqtt.broker.model.SubscribeRetainHandling;
55
import com.ss.mqtt.broker.model.SubscribeTopicFilter;
66
import com.ss.mqtt.broker.network.MqttClient;
7+
import lombok.EqualsAndHashCode;
78
import lombok.Getter;
89
import org.jetbrains.annotations.NotNull;
910

10-
import java.util.Objects;
11-
1211
@Getter
12+
@EqualsAndHashCode(of = "mqttClient")
1313
public class Subscriber {
1414

15-
private final MqttClient mqttClient;
16-
private final QoS qos;
17-
private final SubscribeRetainHandling retainHandling;
15+
private final @NotNull MqttClient mqttClient;
16+
private final @NotNull QoS qos;
17+
private final @NotNull SubscribeRetainHandling retainHandling;
1818
private final boolean noLocal;
1919
private final boolean retainAsPublished;
2020

21+
/**
22+
* Creates subscriber
23+
*
24+
* @param mqttClient MQTT client which will become a subscriber
25+
* @param topicFilter topic filter that MQTT client subscribes to
26+
*/
2127
public Subscriber(@NotNull MqttClient mqttClient, @NotNull SubscribeTopicFilter topicFilter) {
2228
this.mqttClient = mqttClient;
2329
this.qos = topicFilter.getQos();
@@ -26,16 +32,4 @@ public Subscriber(@NotNull MqttClient mqttClient, @NotNull SubscribeTopicFilter
2632
this.noLocal = topicFilter.isNoLocal();
2733
}
2834

29-
@Override
30-
public boolean equals(Object o) {
31-
if (this == o) return true;
32-
if (o == null || getClass() != o.getClass()) return false;
33-
var that = (Subscriber) o;
34-
return Objects.equals(mqttClient, that.mqttClient);
35-
}
36-
37-
@Override
38-
public int hashCode() {
39-
return Objects.hash(mqttClient);
40-
}
4135
}

src/main/java/com/ss/mqtt/broker/service/SubscriptionService.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,40 @@
99

1010
import java.util.List;
1111

12+
/**
13+
* Subscription service
14+
*/
1215
public interface SubscriptionService {
1316

17+
/**
18+
* Adds MQTT client to topic filter subscribers
19+
*
20+
* @param mqttClient MQTT client to be added
21+
* @param topicFilters topic filters
22+
* @return array of subscribe ack reason codes
23+
*/
1424
@NotNull Array<SubscribeAckReasonCode> subscribe(
1525
@NotNull MqttClient mqttClient,
16-
@NotNull Array<SubscribeTopicFilter> topicFilter
26+
@NotNull Array<SubscribeTopicFilter> topicFilters
1727
);
1828

29+
/**
30+
* Removes MQTT client from subscribers by array of topic names
31+
*
32+
* @param mqttClient MQTT client to be removed
33+
* @param topicNames topic names
34+
* @return array of unsubscribe ack reason codes
35+
*/
1936
@NotNull Array<UnsubscribeAckReasonCode> unsubscribe(
2037
@NotNull MqttClient mqttClient,
21-
@NotNull Array<String> topicFilter
38+
@NotNull Array<String> topicNames
2239
);
2340

24-
@NotNull Array<MqttClient> getSubscribers(@NotNull String topic);
41+
/**
42+
* Returns subscribers by topic name
43+
*
44+
* @param topicName topic name
45+
* @return array of topic subscribers
46+
*/
47+
@NotNull Array<MqttClient> getSubscribers(@NotNull String topicName);
2548
}

src/main/java/com/ss/mqtt/broker/service/Subscriptions.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,40 @@
77
import com.ss.rlib.common.util.array.Array;
88
import org.jetbrains.annotations.NotNull;
99

10+
/**
11+
* Container of subscriptions
12+
*/
1013
public interface Subscriptions {
1114

1215
/**
13-
* Return full subscribers list
16+
* Returns array of subscribers by topic name
17+
*
18+
* @param topicName topic name on which subscribers should be returned
19+
* @return array of MQTT clients
1420
*/
15-
@NotNull Array<MqttClient> getSubscribers(@NotNull String topic);
21+
@NotNull Array<MqttClient> getSubscribers(@NotNull String topicName);
1622

1723
/**
18-
* Return true if subscription is added
24+
* Returns result of subscription adding
25+
*
26+
* @param topicFilter topic filter that MQTT client wants to subscribe to
27+
* @param mqttClient MQTT client which wants to subscribe
28+
* @return subscribe ack reason code
1929
*/
2030
@NotNull SubscribeAckReasonCode addSubscription(
2131
@NotNull SubscribeTopicFilter topicFilter,
2232
@NotNull MqttClient mqttClient
2333
);
2434

2535
/**
26-
* Return true if subscription is removed
36+
* Returns result of subscription removing
37+
*
38+
* @param topicName topic name that MQTT client wants to unsubscribe from
39+
* @param mqttClient MQTT client which wants to unsubscribe
40+
* @return unsubscribe ack reason code
2741
*/
2842
@NotNull UnsubscribeAckReasonCode removeSubscription(
29-
@NotNull String topicFilter,
43+
@NotNull String topicName,
3044
@NotNull MqttClient mqttClient
3145
);
3246
}

src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
66
import com.ss.mqtt.broker.service.PublishingService;
77
import com.ss.mqtt.broker.service.SubscriptionService;
8-
import com.ss.rlib.common.util.ArrayUtils;
9-
import com.ss.rlib.common.util.StringUtils;
108
import lombok.RequiredArgsConstructor;
119
import org.jetbrains.annotations.NotNull;
1210

11+
/**
12+
* Simple publishing service
13+
*/
1314
@RequiredArgsConstructor
1415
public class SimplePublishingService implements PublishingService {
1516

@@ -31,20 +32,23 @@ public class SimplePublishingService implements PublishingService {
3132
publish.getTopicAlias(),
3233
publish.getPayload(),
3334
publish.isPayloadFormatIndicator(),
34-
StringUtils.EMPTY, //publish.getResponseTopic(),
35-
ArrayUtils.EMPTY_BYTE_ARRAY,
35+
publish.getResponseTopic(),
36+
publish.getCorrelationData(),
3637
publish.getUserProperties()
3738
));
39+
// TODO this reason code only for QoS 0
3840
return PublishAckReasonCode.SUCCESS;
3941
}
4042

4143
@Override
42-
public @NotNull PublishAckReasonCode publish(@NotNull MqttClient mqttClient, @NotNull PublishInPacket publish) {
44+
public @NotNull PublishAckReasonCode publish(@NotNull PublishInPacket publish) {
4345
var subscribers = subscriptionService.getSubscribers(publish.getTopicName());
44-
// TODO choose correct PublishAckReasonCode
45-
return subscribers.stream()
46+
if (subscribers.isEmpty()) {
47+
return PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
48+
}
49+
var success = subscribers.stream()
4650
.map(targetMqttClient -> send(targetMqttClient, publish))
47-
.findFirst()
48-
.orElse(PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS);
51+
.allMatch(ackReasonCode -> ackReasonCode.equals(PublishAckReasonCode.SUCCESS));
52+
return success ? PublishAckReasonCode.SUCCESS : PublishAckReasonCode.UNSPECIFIED_ERROR;
4953
}
5054
}

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptionService.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,36 @@
1414

1515
import java.util.List;
1616

17+
/**
18+
* Simple subscription service
19+
*/
1720
@RequiredArgsConstructor
1821
public class SimpleSubscriptionService implements SubscriptionService {
1922

20-
private final Subscriptions subscriptions;
23+
private final @NotNull Subscriptions subscriptions;
2124

2225
@Override
23-
public Array<SubscribeAckReasonCode> subscribe(
26+
public @NotNull Array<SubscribeAckReasonCode> subscribe(
2427
@NotNull MqttClient mqttClient,
25-
@NotNull Array<SubscribeTopicFilter> topicFilter
28+
@NotNull Array<SubscribeTopicFilter> topicFilters
2629
) {
27-
return topicFilter.stream()
30+
return topicFilters.stream()
2831
.map(subscribeTopicFilter -> subscriptions.addSubscription(subscribeTopicFilter, mqttClient))
2932
.collect(ArrayCollectors.toArray(SubscribeAckReasonCode.class));
3033
}
3134

3235
@Override
33-
public Array<UnsubscribeAckReasonCode> unsubscribe(
36+
public @NotNull Array<UnsubscribeAckReasonCode> unsubscribe(
3437
@NotNull MqttClient mqttClient,
35-
@NotNull Array<String> topicFilter
38+
@NotNull Array<String> topicNames
3639
) {
37-
return topicFilter.stream()
40+
return topicNames.stream()
3841
.map(subscribeTopicFilter -> subscriptions.removeSubscription(subscribeTopicFilter, mqttClient))
3942
.collect(ArrayCollectors.toArray(UnsubscribeAckReasonCode.class));
4043
}
4144

4245
@Override
43-
public @NotNull Array<MqttClient> getSubscribers(@NotNull String topic) {
44-
return subscriptions.getSubscribers(topic);
46+
public @NotNull Array<MqttClient> getSubscribers(@NotNull String topicName) {
47+
return subscriptions.getSubscribers(topicName);
4548
}
4649
}

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptions.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,20 @@
1414
import java.util.HashMap;
1515
import java.util.Map;
1616

17+
/**
18+
* Simple container of subscriptions
19+
*/
1720
public class SimpleSubscriptions implements Subscriptions {
1821

19-
private final Map<String, Array<Subscriber>> subscriptions = new HashMap<>();
22+
private final @NotNull Map<String, Array<Subscriber>> subscriptions = new HashMap<>();
2023

21-
/**
22-
* Return full subscribers list
23-
*/
24-
public @NotNull Array<MqttClient> getSubscribers(@NotNull String topic) {
25-
return subscriptions.get(topic)
24+
public @NotNull Array<MqttClient> getSubscribers(@NotNull String topicName) {
25+
return subscriptions.get(topicName)
2626
.stream()
2727
.map(Subscriber::getMqttClient)
2828
.collect(ArrayCollectors.toArray(MqttClient.class));
2929
}
3030

31-
/**
32-
* Return true if subscription is added
33-
*/
3431
public @NotNull SubscribeAckReasonCode addSubscription(
3532
@NotNull SubscribeTopicFilter topicFilter,
3633
@NotNull MqttClient mqttClient
@@ -44,11 +41,11 @@ public class SimpleSubscriptions implements Subscriptions {
4441
return topicFilter.getQos().getSubscribeAckReasonCode();
4542
}
4643

47-
/**
48-
* Return true if subscription is removed
49-
*/
50-
public @NotNull UnsubscribeAckReasonCode removeSubscription(@NotNull String topicFilter, @NotNull MqttClient mqttClient) {
51-
var subscribers = subscriptions.getOrDefault(topicFilter, Array.empty());
44+
public @NotNull UnsubscribeAckReasonCode removeSubscription(
45+
@NotNull String topicName,
46+
@NotNull MqttClient mqttClient
47+
) {
48+
var subscribers = subscriptions.getOrDefault(topicName, Array.empty());
5249
if (subscribers.removeIf(subscriber -> mqttClient.equals(subscriber.getMqttClient()))) {
5350
return UnsubscribeAckReasonCode.SUCCESS;
5451
}

0 commit comments

Comments
 (0)