Skip to content

Commit da788c0

Browse files
committed
[broker-25] update mqtt mock client and test with it
1 parent e345640 commit da788c0

File tree

15 files changed

+286
-23
lines changed

15 files changed

+286
-23
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ allprojects {
5050
implementation "com.spaceshift:rlib.network:$rlibVersion"
5151
implementation "com.spaceshift:rlib.logger.slf4j:$rlibVersion"
5252
implementation "org.springframework.boot:spring-boot-starter:$springbootVersion"
53-
implementation "org.springframework.boot:spring-boot-starter-logging:$springbootVersion"
53+
implementation "org.springframework.boot:spring-boot-starter-log4j2:$springbootVersion"
5454
implementation "io.projectreactor:reactor-core:$projectReactorVersion"
5555

5656
compileOnly "org.jetbrains:annotations:$annotationVersion"
@@ -113,4 +113,5 @@ wrapper {
113113

114114
configurations.each {
115115
it.exclude group: "org.slf4j", module: "slf4j-log4j12"
116+
it.exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
116117
}

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
4343
@NotNull byte[] correlationData,
4444
@NotNull Array<StringPair> userProperties
4545
) {
46-
return newPublish(packetId, qos, retained, duplicate, topicName, payload);
46+
return new Publish311OutPacket(
47+
packetId,
48+
qos,
49+
retained,
50+
duplicate,
51+
topicName,
52+
payload
53+
);
4754
}
4855

4956
@Override

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
7474
retained,
7575
duplicate,
7676
topicName,
77-
topicAlias,
7877
payload,
78+
topicAlias,
7979
stringPayload,
8080
responseTopic,
8181
correlationData,

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
2020

2121
@Override
2222
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
23+
var clientId = client.getClientId();
2324
//noinspection unchecked
24-
return releaseImpl((T) client);
25+
return releaseImpl((T) client)
26+
.doOnNext(aVoid -> log.info("Client {} was released.", clientId));
2527
}
2628

2729
protected @NotNull Mono<?> releaseImpl(@NotNull T client) {

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public boolean hasInPending(int packetId) {
146146
@Override
147147
public void resendPendingPackets(@NotNull MqttClient mqttClient) {
148148
pendingOutPublishes.forEachInReadLock(mqttClient, (client, pending) -> {
149-
log.debug("Re-try to send publish {}", pending.publish);
149+
log.info("Re-try to send publish {}", pending.publish);
150150
pending.handler.resend(client, pending.publish, pending.packetId);
151151
});
152152
}

src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ protected byte getPacketFlags() {
5555
protected void writeVariableHeader(@NotNull ByteBuffer buffer) {
5656
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800412
5757
writeString(buffer, topicName);
58-
writeShort(buffer, packetId);
58+
if (qos.ordinal() > QoS.AT_MOST_ONCE_DELIVERY.ordinal()) {
59+
writeShort(buffer, packetId);
60+
}
5961
}
6062

6163
@Override
6264
protected void writePayload(@NotNull ByteBuffer buffer) {
6365
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800413
64-
writeBytes(buffer, payload);
66+
buffer.put(payload);
6567
}
6668
}

src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ public Publish5OutPacket(
148148
boolean retained,
149149
boolean duplicate,
150150
@NotNull String topicName,
151-
int topicAlias,
152151
@NotNull byte[] payload,
152+
int topicAlias,
153153
boolean stringPayload,
154154
@NotNull String responseTopic,
155155
@NotNull byte[] correlationData,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
authentication.allow.anonymous=false
2+

src/main/resources/log4j2.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<Configuration status="INFO">
3+
<Appenders>
4+
<Console name="Console" target="SYSTEM_OUT">
5+
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
6+
</Console>
7+
</Appenders>
8+
<Loggers>
9+
<Root level="INFO">
10+
<AppenderRef ref="Console"/>
11+
</Root>
12+
</Loggers>
13+
</Configuration>
Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,82 @@
11
package com.ss.mqtt.broker.test.integration
22

3+
import com.hivemq.client.mqtt.datatypes.MqttQos
4+
import com.ss.mqtt.broker.model.QoS
5+
import com.ss.mqtt.broker.model.SubscribeTopicFilter
6+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode
7+
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode
38
import com.ss.mqtt.broker.network.packet.in.ConnectAckInPacket
9+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket
10+
import com.ss.mqtt.broker.network.packet.in.SubscribeAckInPacket
411
import com.ss.mqtt.broker.network.packet.out.Connect311OutPacket
12+
import com.ss.mqtt.broker.network.packet.out.Subscribe311OutPacket
513
import com.ss.mqtt.broker.service.MqttSessionService
14+
import com.ss.rlib.common.util.array.Array
615
import org.springframework.beans.factory.annotation.Autowired
716

817
class PublishRetryTest extends IntegrationSpecification {
918

1019
@Autowired
1120
MqttSessionService mqttSessionService
1221

13-
def "mqtt5 client should be generate session with one pending packet"() {
22+
def "mqtt 3.1.1 client should be generate session with one pending packet"() {
1423
given:
15-
def client = buildMqtt311MockClient()
16-
def clientId = generateClientId()
24+
def publisher = buildClient()
25+
def subscriber = buildMqtt311MockClient()
26+
def subscriberId = generateClientId()
1727
when:
18-
client.connect()
19-
client.send(new Connect311OutPacket(clientId, keepAlive))
28+
29+
publisher.connect().join()
30+
31+
subscriber.connect()
32+
subscriber.send(new Connect311OutPacket(subscriberId, keepAlive))
33+
34+
def connectAck = subscriber.readNext() as ConnectAckInPacket
35+
2036
then:
21-
client.readNext() instanceof ConnectAckInPacket
37+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
38+
when:
39+
40+
subscriber.send(new Subscribe311OutPacket(
41+
Array.of(new SubscribeTopicFilter("/test/retry/$subscriberId", QoS.AT_LEAST_ONCE_DELIVERY)),
42+
1
43+
))
44+
45+
def subscribeAck = subscriber.readNext() as SubscribeAckInPacket
46+
47+
then:
48+
subscribeAck.reasonCodes.stream()
49+
.allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_1 })
50+
when:
51+
52+
publisher.publishWith()
53+
.topic("/test/retry/$subscriberId")
54+
.qos(MqttQos.AT_MOST_ONCE)
55+
.payload(publishPayload)
56+
.send()
57+
.join()
58+
59+
def receivedPublish = subscriber.readNext() as PublishInPacket
60+
61+
then:
62+
receivedPublish.payload == publishPayload
63+
when:
64+
65+
subscriber.close()
66+
67+
subscriber.connect()
68+
subscriber.send(new Connect311OutPacket(subscriberId, keepAlive))
69+
70+
connectAck = subscriber.readNext() as ConnectAckInPacket
71+
def receivedDupPublish = subscriber.readNext() as PublishInPacket
72+
73+
then:
74+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
75+
receivedDupPublish.duplicate
76+
receivedDupPublish.packetId == receivedPublish.packetId
77+
receivedDupPublish.payload == publishPayload
78+
cleanup:
79+
subscriber.close()
80+
publisher.disconnect().join()
2281
}
2382
}

0 commit comments

Comments
 (0)