33import com .ss .mqtt .broker .model .MqttPropertyConstants ;
44import com .ss .mqtt .broker .model .QoS ;
55import com .ss .mqtt .broker .network .MqttConnection ;
6+ import com .ss .mqtt .broker .network .client .MqttClientReleaseHandler ;
67import com .ss .mqtt .broker .network .client .UnsafeMqttClient ;
78import com .ss .mqtt .broker .network .client .impl .DeviceMqttClient ;
9+ import com .ss .mqtt .broker .network .client .impl .DeviceMqttClientReleaseHandler ;
810import com .ss .mqtt .broker .network .packet .PacketType ;
911import com .ss .mqtt .broker .network .packet .in .handler .*;
1012import com .ss .mqtt .broker .service .*;
2022import org .jetbrains .annotations .NotNull ;
2123import org .springframework .context .annotation .Bean ;
2224import org .springframework .context .annotation .Configuration ;
23- import org .springframework .context .annotation .PropertySource ;
2425import org .springframework .core .env .Environment ;
2526
2627import java .net .InetSocketAddress ;
@@ -48,14 +49,9 @@ private interface ChannelFactory extends
4849 return new DefaultBufferAllocator (networkConfig );
4950 }
5051
51- @ Bean
52- @ NotNull ClientService clientService () {
53- return new DefaultClientService ();
54- }
55-
5652 @ Bean
5753 @ NotNull ClientIdRegistry clientIdRegistry () {
58- return new SimpleClientIdRegistry (
54+ return new InMemoryClientIdRegistry (
5955 env .getProperty (
6056 "client.id.available.chars" ,
6157 "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-"
@@ -64,16 +60,20 @@ private interface ChannelFactory extends
6460 );
6561 }
6662
63+ @ Bean
64+ @ NotNull MqttSessionService mqttSessionService () {
65+ return new InMemoryMqttSessionService (
66+ env .getProperty ("sessions.clean.thread.interval" , int .class , 60000 )
67+ );
68+ }
69+
6770 @ Bean
6871 @ NotNull CredentialSource credentialSource () {
6972 return new FileCredentialsSource (env .getProperty ("credentials.source.file.name" , "credentials" ));
7073 }
7174
7275 @ Bean
73- @ NotNull AuthenticationService authenticationService (
74- @ NotNull CredentialSource credentialSource ,
75- @ NotNull ClientIdRegistry clientIdRegistry
76- ) {
76+ @ NotNull AuthenticationService authenticationService (@ NotNull CredentialSource credentialSource ) {
7777 return new SimpleAuthenticationService (
7878 credentialSource ,
7979 env .getProperty ("authentication.allow.anonymous" , boolean .class , false )
@@ -85,31 +85,47 @@ private interface ChannelFactory extends
8585 @ NotNull AuthenticationService authenticationService ,
8686 @ NotNull ClientIdRegistry clientIdRegistry ,
8787 @ NotNull SubscriptionService subscriptionService ,
88- @ NotNull PublishingService publishingService
88+ @ NotNull PublishingService publishingService ,
89+ @ NotNull MqttSessionService mqttSessionService
8990 ) {
9091
9192 var handlers = new PacketInHandler [PacketType .INVALID .ordinal ()];
92- handlers [PacketType .CONNECT .ordinal ()] = new ConnectInPacketHandler (clientIdRegistry , authenticationService );
93+ handlers [PacketType .CONNECT .ordinal ()] = new ConnectInPacketHandler (
94+ clientIdRegistry ,
95+ authenticationService ,
96+ mqttSessionService
97+ );
9398 handlers [PacketType .SUBSCRIBE .ordinal ()] = new SubscribeInPacketHandler (subscriptionService );
9499 handlers [PacketType .UNSUBSCRIBE .ordinal ()] = new UnsubscribeInPacketHandler (subscriptionService );
95100 handlers [PacketType .PUBLISH .ordinal ()] = new PublishInPacketHandler (publishingService );
101+ handlers [PacketType .DISCONNECT .ordinal ()] = new DisconnetInPacketHandler ();
96102
97103 return handlers ;
98104 }
99105
106+ @ Bean
107+ @ NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler (
108+ @ NotNull ClientIdRegistry clientIdRegistry ,
109+ @ NotNull MqttSessionService mqttSessionService
110+ ) {
111+ return new DeviceMqttClientReleaseHandler (clientIdRegistry , mqttSessionService );
112+ }
113+
100114 @ Bean
101115 @ NotNull ServerNetwork <@ NotNull MqttConnection > deviceNetwork (
102116 @ NotNull ServerNetworkConfig networkConfig ,
103117 @ NotNull BufferAllocator bufferAllocator ,
104118 @ NotNull MqttConnectionConfig deviceConnectionConfig ,
105- PacketInHandler @ NotNull [] devicePacketHandlers
119+ PacketInHandler @ NotNull [] devicePacketHandlers ,
120+ @ NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler
106121 ) {
107122 return NetworkFactory .newServerNetwork (
108123 networkConfig ,
109124 deviceConnectionFactory (
110125 bufferAllocator ,
111126 deviceConnectionConfig ,
112- devicePacketHandlers
127+ devicePacketHandlers ,
128+ deviceMqttClientReleaseHandler
113129 )
114130 );
115131 }
@@ -212,7 +228,8 @@ private interface ChannelFactory extends
212228 private @ NotNull ChannelFactory deviceConnectionFactory (
213229 @ NotNull BufferAllocator bufferAllocator ,
214230 @ NotNull MqttConnectionConfig connectionConfig ,
215- PacketInHandler @ NotNull [] packetHandlers
231+ PacketInHandler @ NotNull [] packetHandlers ,
232+ @ NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler
216233 ) {
217234 return (network , channel ) -> new MqttConnection (
218235 network ,
@@ -221,7 +238,7 @@ private interface ChannelFactory extends
221238 100 ,
222239 packetHandlers ,
223240 connectionConfig ,
224- DeviceMqttClient :: new
241+ mqttConnection -> new DeviceMqttClient ( mqttConnection , deviceMqttClientReleaseHandler )
225242 );
226243 }
227244}
0 commit comments