Skip to content

Commit 1464ea5

Browse files
authored
Merge pull request #28 from JavaSaBr/feature-broker-22
[broker-22] implement QoS 2
2 parents e6f9fa7 + da0be17 commit 1464ea5

19 files changed

+491
-188
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ private interface ChannelFactory extends
111111
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
112112
handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler();
113113
handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler();
114+
handlers[PacketType.PUBLISH_RECEIVED.ordinal()] = new PublishReceiveInPacketHandler();
115+
handlers[PacketType.PUBLISH_RELEASED.ordinal()] = new PublishReleaseInPacketHandler();
116+
handlers[PacketType.PUBLISH_COMPLETED.ordinal()] = new PublishCompleteInPacketHandler();
114117

115118
return handlers;
116119
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
package com.ss.mqtt.broker.exception;
22

3+
import lombok.NoArgsConstructor;
4+
5+
@NoArgsConstructor
36
public class MalformedPacketMqttException extends MqttException {}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
4+
import com.ss.mqtt.broker.network.packet.HasPacketId;
5+
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
@RequiredArgsConstructor
10+
public class PendingOutResponseInPacketHandler<R extends MqttReadablePacket & HasPacketId> extends
11+
AbstractPacketHandler<UnsafeMqttClient, R> {
12+
13+
@Override
14+
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull R packet) {
15+
var session = client.getSession();
16+
if (session != null) {
17+
session.updateOutPendingPacket(client, packet);
18+
}
19+
}
20+
}
Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
11
package com.ss.mqtt.broker.handler.packet.in;
22

3-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
43
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
54
import lombok.RequiredArgsConstructor;
6-
import org.jetbrains.annotations.NotNull;
75

86
@RequiredArgsConstructor
9-
public class PublishAckInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishAckInPacket> {
10-
11-
@Override
12-
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) {
13-
client.getSession().updatePendingPacket(client, packet);
14-
}
15-
}
7+
public class PublishAckInPacketHandler extends PendingOutResponseInPacketHandler<PublishAckInPacket> {}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.packet.in.PublishCompleteInPacket;
4+
import lombok.RequiredArgsConstructor;
5+
6+
@RequiredArgsConstructor
7+
public class PublishCompleteInPacketHandler extends PendingOutResponseInPacketHandler<PublishCompleteInPacket> {}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.packet.in.PublishReceivedInPacket;
4+
import lombok.RequiredArgsConstructor;
5+
6+
@RequiredArgsConstructor
7+
public class PublishReceiveInPacketHandler extends PendingOutResponseInPacketHandler<PublishReceivedInPacket> {}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
4+
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
5+
import com.ss.mqtt.broker.network.packet.in.PublishReleaseInPacket;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
@RequiredArgsConstructor
10+
public class PublishReleaseInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishReleaseInPacket> {
11+
12+
@Override
13+
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishReleaseInPacket packet) {
14+
var session = client.getSession();
15+
if (session != null) {
16+
session.updateInPendingPacket(client, packet);
17+
}
18+
}
19+
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ abstract class AbstractPublishInHandler implements PublishInHandler {
1717
protected final @NotNull PublishOutHandler[] publishOutHandlers;
1818

1919
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
var result = subscriptionService.forEachTopicSubscriber(
20+
handleResult(client, packet, subscriptionService.forEachTopicSubscriber(
2121
packet.getTopicName(),
2222
packet,
23-
this::publish
24-
);
25-
handleResult(client, packet, result);
23+
this::sendToSubscriber
24+
));
2625
}
2726

28-
private @NotNull ActionResult publish(
27+
private @NotNull ActionResult sendToSubscriber(
2928
@NotNull Subscriber subscriber,
3029
@NotNull PublishInPacket packet
3130
) {

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,44 @@ public Qos1PublishInHandler(
1717
super(subscriptionService, publishOutHandlers);
1818
}
1919

20+
@Override
21+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
22+
23+
var session = client.getSession();
24+
25+
// it means this client was already closed
26+
if (session == null) {
27+
return;
28+
}
29+
30+
super.handle(client, packet);
31+
}
32+
2033
@Override
2134
protected void handleResult(
2235
@NotNull MqttClient client,
2336
@NotNull PublishInPacket packet,
2437
@NotNull ActionResult result
2538
) {
26-
var reasonCode = switch (result) {
27-
case EMPTY -> PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
28-
case SUCCESS -> PublishAckReasonCode.SUCCESS;
29-
default -> PublishAckReasonCode.UNSPECIFIED_ERROR;
30-
};
31-
var ackPacket = client.getPacketOutFactory().newPublishAck(
39+
40+
PublishAckReasonCode reasonCode;
41+
42+
switch (result) {
43+
case EMPTY:
44+
reasonCode = PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
45+
break;
46+
case SUCCESS:
47+
reasonCode = PublishAckReasonCode.SUCCESS;
48+
break;
49+
default:
50+
reasonCode = PublishAckReasonCode.UNSPECIFIED_ERROR;
51+
break;
52+
}
53+
54+
client.send(client.getPacketOutFactory().newPublishAck(
3255
client,
3356
packet.getPacketId(),
3457
reasonCode
35-
);
36-
client.send(ackPacket);
58+
));
3759
}
3860
}

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos2PublishInHandler.java

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package com.ss.mqtt.broker.handler.publish.in;
22

33
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.ActionResult;
5+
import com.ss.mqtt.broker.model.MqttSession;
6+
import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode;
7+
import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode;
48
import com.ss.mqtt.broker.network.client.MqttClient;
9+
import com.ss.mqtt.broker.network.packet.HasPacketId;
510
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
11+
import com.ss.mqtt.broker.network.packet.in.PublishReleaseInPacket;
612
import com.ss.mqtt.broker.service.SubscriptionService;
713
import org.jetbrains.annotations.NotNull;
814

9-
public class Qos2PublishInHandler extends AbstractPublishInHandler {
15+
public class Qos2PublishInHandler extends AbstractPublishInHandler implements MqttSession.PendingPacketHandler {
1016

1117
public Qos2PublishInHandler(
1218
@NotNull SubscriptionService subscriptionService,
@@ -17,6 +23,77 @@ public Qos2PublishInHandler(
1723

1824
@Override
1925
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20-
throw new UnsupportedOperationException();
26+
27+
var session = client.getSession();
28+
29+
// it means this client was already closed
30+
if (session == null) {
31+
return;
32+
}
33+
34+
// if this packet is re-try from client
35+
if (packet.isDuplicate()) {
36+
// if this packet was accepted before then we can skip it
37+
if (session.hasInPending(packet.getPacketId())) {
38+
return;
39+
}
40+
}
41+
42+
super.handle(client, packet);
43+
}
44+
45+
@Override
46+
protected void handleResult(
47+
@NotNull MqttClient client,
48+
@NotNull PublishInPacket packet,
49+
@NotNull ActionResult result
50+
) {
51+
52+
// because it was checked
53+
final MqttSession session = client.getSession();
54+
55+
// it means this client was already closed
56+
if (session == null) {
57+
return;
58+
}
59+
60+
PublishReceivedReasonCode reasonCode;
61+
62+
switch (result) {
63+
case EMPTY:
64+
reasonCode = PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS;
65+
break;
66+
case SUCCESS:
67+
reasonCode = PublishReceivedReasonCode.SUCCESS;
68+
break;
69+
default:
70+
reasonCode = PublishReceivedReasonCode.UNSPECIFIED_ERROR;
71+
break;
72+
}
73+
74+
session.registerInPublish(packet, this, packet.getPacketId());
75+
76+
client.send(client.getPacketOutFactory().newPublishReceived(
77+
client,
78+
packet.getPacketId(),
79+
reasonCode
80+
));
81+
}
82+
83+
@Override
84+
public boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response) {
85+
86+
if (!(response instanceof PublishReleaseInPacket)) {
87+
throw new IllegalStateException("Unexpected response " + response);
88+
}
89+
90+
var packetOutFactory = client.getPacketOutFactory();
91+
client.send(packetOutFactory.newPublishCompleted(
92+
client,
93+
response.getPacketId(),
94+
PublishCompletedReasonCode.SUCCESS
95+
));
96+
97+
return true;
2198
}
2299
}

0 commit comments

Comments
 (0)