Skip to content

Commit 9d75d21

Browse files
committed
[broker-15] working on publishing with QoS 1
1 parent d05de40 commit 9d75d21

33 files changed

+496
-113
lines changed

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package com.ss.mqtt.broker.config;
22

3+
import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler;
4+
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
35
import com.ss.mqtt.broker.handler.packet.in.*;
6+
import com.ss.mqtt.broker.handler.publish.in.PublishInHandler;
7+
import com.ss.mqtt.broker.handler.publish.in.Qos0PublishInHandler;
8+
import com.ss.mqtt.broker.handler.publish.in.Qos1PublishInHandler;
9+
import com.ss.mqtt.broker.handler.publish.in.Qos2PublishInHandler;
10+
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
11+
import com.ss.mqtt.broker.handler.publish.out.Qos0PublishOutHandler;
12+
import com.ss.mqtt.broker.handler.publish.out.Qos1PublishOutHandler;
13+
import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler;
414
import com.ss.mqtt.broker.model.MqttPropertyConstants;
515
import com.ss.mqtt.broker.model.QoS;
616
import com.ss.mqtt.broker.network.MqttConnection;
7-
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
8-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
917
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
10-
import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler;
18+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1119
import com.ss.mqtt.broker.network.packet.PacketType;
1220
import com.ss.mqtt.broker.service.*;
1321
import com.ss.mqtt.broker.service.impl.*;
@@ -80,6 +88,11 @@ private interface ChannelFactory extends
8088
);
8189
}
8290

91+
@Bean
92+
@NotNull PacketIdGenerator packetIdGenerator() {
93+
return new DefaultPacketIdGenerator();
94+
}
95+
8396
@Bean
8497
PacketInHandler @NotNull [] devicePacketHandlers(
8598
@NotNull AuthenticationService authenticationService,
@@ -99,6 +112,7 @@ private interface ChannelFactory extends
99112
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
100113
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
101114
handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler();
115+
handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler();
102116

103117
return handlers;
104118
}
@@ -150,8 +164,29 @@ private interface ChannelFactory extends
150164
}
151165

152166
@Bean
153-
@NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) {
154-
return new SimplePublishingService(subscriptionService);
167+
@NotNull PublishOutHandler[] publishOutHandlers(@NotNull PacketIdGenerator packetIdGenerator) {
168+
return new PublishOutHandler[] {
169+
new Qos0PublishOutHandler(),
170+
new Qos1PublishOutHandler(packetIdGenerator),
171+
new Qos2PublishOutHandler(),
172+
};
173+
}
174+
175+
@Bean
176+
@NotNull PublishInHandler[] publishInHandlers(
177+
@NotNull SubscriptionService subscriptionService,
178+
@NotNull PublishOutHandler[] publishOutHandlers
179+
) {
180+
return new PublishInHandler[] {
181+
new Qos0PublishInHandler(subscriptionService, publishOutHandlers),
182+
new Qos1PublishInHandler(subscriptionService, publishOutHandlers),
183+
new Qos2PublishInHandler(subscriptionService, publishOutHandlers),
184+
};
185+
}
186+
187+
@Bean
188+
@NotNull PublishingService publishingService(@NotNull PublishInHandler[] publishInHandlers) {
189+
return new DefaultPublishingService(publishInHandlers);
155190
}
156191

157192
@Bean

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
3030
}
3131

3232
@Override
33-
public @NotNull MqttWritablePacket newPublish(
33+
public @NotNull PublishOutPacket newPublish(
3434
@NotNull MqttClient client,
3535
int packetId,
3636
@NotNull QoS qos,

src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
4444
}
4545

4646
@Override
47-
public @NotNull MqttWritablePacket newPublish(
47+
public @NotNull PublishOutPacket newPublish(
4848
@NotNull MqttClient client,
4949
int packetId,
5050
@NotNull QoS qos,

src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.ss.mqtt.broker.model.reason.code.*;
66
import com.ss.mqtt.broker.network.client.MqttClient;
77
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
8+
import com.ss.mqtt.broker.network.packet.out.PublishOutPacket;
89
import com.ss.rlib.common.util.ArrayUtils;
910
import com.ss.rlib.common.util.StringUtils;
1011
import com.ss.rlib.common.util.array.Array;
@@ -76,7 +77,7 @@ public abstract class MqttPacketOutFactory {
7677
}
7778

7879

79-
public @NotNull MqttWritablePacket newPublish(
80+
public @NotNull PublishOutPacket newPublish(
8081
@NotNull MqttClient client,
8182
int packetId,
8283
@NotNull QoS qos,
@@ -101,7 +102,7 @@ public abstract class MqttPacketOutFactory {
101102
);
102103
}
103104

104-
public abstract @NotNull MqttWritablePacket newPublish(
105+
public abstract @NotNull PublishOutPacket newPublish(
105106
@NotNull MqttClient client,
106107
int packetId,
107108
@NotNull QoS qos,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.ss.mqtt.broker.handler.packet.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
4+
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
5+
import lombok.RequiredArgsConstructor;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
@RequiredArgsConstructor
9+
public class PublishAckInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishAckInPacket> {
10+
11+
@Override
12+
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) {
13+
client.getSession().unregisterPendingPacket(client, packet);
14+
}
15+
}

src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@ public class PublishInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie
1313

1414
@Override
1515
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishInPacket packet) {
16-
17-
var ackReasonCode = publishingService.publish(packet);
18-
19-
client.send(client.getPacketOutFactory().newPublishAck(
20-
client,
21-
packet.getPacketId(),
22-
ackReasonCode
23-
));
16+
publishingService.publish(client, packet);
2417
}
2518
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.ss.mqtt.broker.handler.publish.in;
2+
3+
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.QoS;
5+
import com.ss.mqtt.broker.service.SubscriptionService;
6+
import lombok.RequiredArgsConstructor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
@RequiredArgsConstructor
10+
abstract class AbstractPublishInHandler implements PublishInHandler {
11+
12+
protected final @NotNull SubscriptionService subscriptionService;
13+
protected final @NotNull PublishOutHandler[] publishOutHandlers;
14+
15+
protected @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
16+
return publishOutHandlers[qos.ordinal()];
17+
}
18+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.ss.mqtt.broker.handler.publish.in;
2+
3+
import com.ss.mqtt.broker.network.client.MqttClient;
4+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
/**
8+
* Interface to handle incoming publish packets.
9+
*/
10+
public interface PublishInHandler {
11+
12+
void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet);
13+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.ss.mqtt.broker.handler.publish.in;
2+
3+
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.network.client.MqttClient;
5+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
6+
import com.ss.mqtt.broker.service.SubscriptionService;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
public class Qos0PublishInHandler extends AbstractPublishInHandler {
10+
11+
public Qos0PublishInHandler(
12+
@NotNull SubscriptionService subscriptionService,
13+
@NotNull PublishOutHandler[] publishOutHandlers
14+
) {
15+
super(subscriptionService, publishOutHandlers);
16+
}
17+
18+
@Override
19+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
20+
21+
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
22+
23+
for (var subscriber : subscribers) {
24+
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
25+
}
26+
}
27+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.ss.mqtt.broker.handler.publish.in;
2+
3+
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
4+
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
5+
import com.ss.mqtt.broker.network.client.MqttClient;
6+
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
7+
import com.ss.mqtt.broker.service.SubscriptionService;
8+
import org.jetbrains.annotations.NotNull;
9+
10+
public class Qos1PublishInHandler extends AbstractPublishInHandler {
11+
12+
public Qos1PublishInHandler(
13+
@NotNull SubscriptionService subscriptionService,
14+
@NotNull PublishOutHandler[] publishOutHandlers
15+
) {
16+
super(subscriptionService, publishOutHandlers);
17+
}
18+
19+
@Override
20+
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {
21+
22+
var subscribers = subscriptionService.getSubscribers(packet.getTopicName());
23+
24+
for (var subscriber : subscribers) {
25+
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
26+
}
27+
28+
var reasonCode = subscribers.isEmpty() ?
29+
PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS : PublishAckReasonCode.SUCCESS;
30+
31+
var ackPacket = client.getPacketOutFactory().newPublishAck(
32+
client,
33+
packet.getPacketId(),
34+
reasonCode
35+
);
36+
37+
client.send(ackPacket);
38+
}
39+
}

0 commit comments

Comments
 (0)