Skip to content

Commit 88ef60b

Browse files
committed
[broker-14] fix filter by qos, fix tests
1 parent 946dc50 commit 88ef60b

File tree

6 files changed

+26
-22
lines changed

6 files changed

+26
-22
lines changed

src/main/java/com/ss/mqtt/broker/model/TopicSubscriber.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
import com.ss.rlib.common.util.dictionary.ObjectDictionary;
1111
import org.jetbrains.annotations.NotNull;
1212

13-
import java.util.Collection;
14-
1513
public class TopicSubscriber {
1614

17-
private static boolean filterSubscriber(@NotNull Array<Subscriber> resultArray, @NotNull Subscriber subscriber) {
18-
var found = resultArray.findAny(subscriber, Subscriber::equals);
19-
if (found != null && found.getQos().ordinal() < subscriber.getQos().ordinal()) {
20-
resultArray.remove(found);
15+
private static boolean filterByQos(@NotNull Array<Subscriber> subscribers, @NotNull Subscriber candidate) {
16+
var existed = subscribers.findAny(candidate, Subscriber::equals);
17+
if (existed == null) {
2118
return true;
19+
}
20+
if (existed.getQos().ordinal() < candidate.getQos().ordinal()) {
21+
return subscribers.remove(existed);
2222
} else {
2323
return false;
2424
}
@@ -33,7 +33,7 @@ private static TopicSubscriber collectSubscribers(
3333
if (ts != null) {
3434
long stamp = ts.subscribers.readLock();
3535
try {
36-
ts.subscribers.forEachFiltered(resultSubscribers, TopicSubscriber::filterSubscriber, Collection::add);
36+
ts.subscribers.forEachFiltered(resultSubscribers, TopicSubscriber::filterByQos, Array::add);
3737
} finally {
3838
ts.subscribers.readUnlock(stamp);
3939
}
@@ -50,7 +50,7 @@ public void addSubscriber(@NotNull TopicFilter topicFilter, @NotNull Subscriber
5050
}
5151

5252
private void addSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull Subscriber subscriber) {
53-
if (level == topicFilter.levels.length - 1) {
53+
if (level == topicFilter.levels.length) {
5454
subscribers.runInWriteLock(subscriber, ConcurrentArray::add);
5555
} else {
5656
var topicSubscriber = topicSubscribers.getInWriteLock(
@@ -67,7 +67,7 @@ public boolean removeSubscriber(@NotNull TopicFilter topicFilter, @NotNull MqttC
6767
}
6868

6969
private boolean removeSubscriber(int level, @NotNull TopicFilter topicFilter, @NotNull MqttClient mqttClient) {
70-
if (level == topicFilter.levels.length - 1) {
70+
if (level == topicFilter.levels.length) {
7171
return subscribers.removeIfInWriteLock(mqttClient, (client, subscriber) -> client.equals(subscriber.getMqttClient()));
7272
} else {
7373
var topicSubscriber = topicSubscribers.getInReadLock(topicFilter.levels[level], ObjectDictionary::get);

src/main/java/com/ss/mqtt/broker/service/SubscriptionService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.ss.mqtt.broker.service;
22

33
import com.ss.mqtt.broker.model.*;
4+
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode;
5+
import com.ss.mqtt.broker.model.reason.code.UnsubscribeAckReasonCode;
46
import com.ss.mqtt.broker.network.client.MqttClient;
57
import com.ss.rlib.common.function.NotNullNullableBiFunction;
68
import com.ss.rlib.common.util.array.Array;

src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.ss.mqtt.broker.service.impl;
22

3-
import com.ss.mqtt.broker.model.PublishAckReasonCode;
43
import com.ss.mqtt.broker.model.QoS;
54
import com.ss.mqtt.broker.model.Subscriber;
5+
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
66
import com.ss.mqtt.broker.network.client.MqttClient;
77
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
88
import com.ss.mqtt.broker.service.PublishingService;

src/main/java/com/ss/mqtt/broker/service/impl/SimpleSubscriptionService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static com.ss.mqtt.broker.model.ActionResult.FAILED;
44
import static com.ss.mqtt.broker.model.ActionResult.SUCCESS;
55
import com.ss.mqtt.broker.model.*;
6+
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode;
7+
import com.ss.mqtt.broker.model.reason.code.UnsubscribeAckReasonCode;
68
import com.ss.mqtt.broker.network.client.MqttClient;
79
import com.ss.mqtt.broker.service.SubscriptionService;
810
import com.ss.rlib.common.function.NotNullNullableBiFunction;

src/test/groovy/com/ss/mqtt/broker/test/network/in/SubscribeInPacketTest.groovy

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
2929
result
3030
packet.topicFilters.size() == 2
3131
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
32-
packet.topicFilters.get(0).getTopicFilter() == topicFilter
32+
packet.topicFilters.get(0).getTopicFilter().toString() == topicFilter
3333
packet.topicFilters.get(0).isNoLocal()
3434
!packet.topicFilters.get(0).isRetainAsPublished()
3535
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
3636
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
37-
packet.topicFilters.get(1).getTopicFilter() == topicFilter2
37+
packet.topicFilters.get(1).getTopicFilter().toString() == topicFilter2
3838
packet.topicFilters.get(1).isNoLocal()
3939
!packet.topicFilters.get(1).isRetainAsPublished()
4040
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
@@ -69,12 +69,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
6969
result
7070
packet.topicFilters.size() == 2
7171
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
72-
packet.topicFilters.get(0).getTopicFilter() == topicFilter
72+
packet.topicFilters.get(0).getTopicFilter().toString() == topicFilter
7373
!packet.topicFilters.get(0).isNoLocal()
7474
packet.topicFilters.get(0).isRetainAsPublished()
7575
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
7676
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
77-
packet.topicFilters.get(1).getTopicFilter() == topicFilter2
77+
packet.topicFilters.get(1).getTopicFilter().toString() == topicFilter2
7878
packet.topicFilters.get(1).isNoLocal()
7979
!packet.topicFilters.get(1).isRetainAsPublished()
8080
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_SUBSCRIBE_ONLY_IF_THE_SUBSCRIPTION_DOES_NOT_CURRENTLY_EXIST
@@ -98,12 +98,12 @@ class SubscribeInPacketTest extends BaseInPacketTest {
9898
result
9999
packet.topicFilters.size() == 2
100100
packet.topicFilters.get(0).getQos() == QoS.AT_LEAST_ONCE_DELIVERY
101-
packet.topicFilters.get(0).getTopicFilter() == topicFilter
101+
packet.topicFilters.get(0).getTopicFilter().toString() == topicFilter
102102
!packet.topicFilters.get(0).isNoLocal()
103103
!packet.topicFilters.get(0).isRetainAsPublished()
104104
packet.topicFilters.get(0).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE
105105
packet.topicFilters.get(1).getQos() == QoS.EXACTLY_ONCE_DELIVERY
106-
packet.topicFilters.get(1).getTopicFilter() == topicFilter2
106+
packet.topicFilters.get(1).getTopicFilter().toString() == topicFilter2
107107
!packet.topicFilters.get(1).isNoLocal()
108108
!packet.topicFilters.get(1).isRetainAsPublished()
109109
packet.topicFilters.get(1).getRetainHandling() == SubscribeRetainHandling.SEND_AT_THE_TIME_OF_SUBSCRIBE

src/test/groovy/com/ss/mqtt/broker/test/network/in/UnsubscribeInPacketTest.groovy

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ class UnsubscribeInPacketTest extends BaseInPacketTest {
2323
then:
2424
result
2525
packet.topicFilters.size() == 2
26-
packet.topicFilters.get(0) == topicFilter
27-
packet.topicFilters.get(1) == topicFilter2
26+
packet.topicFilters.get(0).toString() == topicFilter
27+
packet.topicFilters.get(1).toString() == topicFilter2
2828
packet.packetId == packetId
2929
packet.userProperties == Array.empty()
3030
}
@@ -51,8 +51,8 @@ class UnsubscribeInPacketTest extends BaseInPacketTest {
5151
then:
5252
result
5353
packet.topicFilters.size() == 2
54-
packet.topicFilters.get(0) == topicFilter
55-
packet.topicFilters.get(1) == topicFilter2
54+
packet.topicFilters.get(0).toString() == topicFilter
55+
packet.topicFilters.get(1).toString() == topicFilter2
5656
packet.packetId == packetId
5757
packet.userProperties == userProperties
5858
when:
@@ -69,8 +69,8 @@ class UnsubscribeInPacketTest extends BaseInPacketTest {
6969
then:
7070
result
7171
packet.topicFilters.size() == 2
72-
packet.topicFilters.get(0) == topicFilter
73-
packet.topicFilters.get(1) == topicFilter2
72+
packet.topicFilters.get(0).toString() == topicFilter
73+
packet.topicFilters.get(1).toString() == topicFilter2
7474
packet.packetId == packetId
7575
packet.userProperties == Array.empty()
7676
}

0 commit comments

Comments
 (0)