@@ -77,6 +77,11 @@ type UdpTx = mpsc::Sender<(Packet, SocketAddr)>;
7777/// key is a DHT key.
7878type DhtPkTx = mpsc:: UnboundedSender < ( PublicKey , PublicKey ) > ;
7979
80+ /// Shorthand for the transmit half of the message channel for sending a
81+ /// connection status when it becomes connected or disconnected. The key is a
82+ /// long term key of the connection.
83+ type ConnectionStatusTx = mpsc:: UnboundedSender < ( PublicKey , bool ) > ;
84+
8085/// Shorthand for the transmit half of the message channel for sending lossless
8186/// packets. The key is a long term public key of the peer that sent this
8287/// packet.
@@ -124,6 +129,9 @@ pub struct NetCrypto {
124129 /// `CryptoConnection` then `NetCrypto` module will send message to this
125130 /// sink.
126131 dht_pk_tx : Arc < RwLock < Option < DhtPkTx > > > ,
132+ /// Sink to send a connection status when it becomes connected or
133+ /// disconnected. The key is a long term key of the connection.
134+ connection_status_tx : Arc < RwLock < Option < ConnectionStatusTx > > > ,
127135 /// Sink to send lossless packets. The key is a long term public key of the
128136 /// peer that sent this packet.
129137 lossless_tx : LosslessTx ,
@@ -159,6 +167,7 @@ impl NetCrypto {
159167 NetCrypto {
160168 udp_tx : args. udp_tx ,
161169 dht_pk_tx : Default :: default ( ) ,
170+ connection_status_tx : Default :: default ( ) ,
162171 lossless_tx : args. lossless_tx ,
163172 lossy_tx : args. lossy_tx ,
164173 dht_pk : args. dht_pk ,
@@ -216,19 +225,36 @@ impl NetCrypto {
216225 }
217226 }
218227
228+ /// Send connection status to the appropriate sink when it becomes connected
229+ /// or disconnected.
230+ fn send_connection_status ( & self , connection : & CryptoConnection , status : bool ) -> impl Future < Item = ( ) , Error = mpsc:: SendError < ( PublicKey , bool ) > > {
231+ if connection. is_established ( ) != status {
232+ if let Some ( ref connection_status_tx) = * self . connection_status_tx . read ( ) {
233+ Either :: A ( send_to ( connection_status_tx, ( connection. peer_real_pk , status) ) )
234+ } else {
235+ Either :: B ( future:: ok ( ( ) ) )
236+ }
237+ } else {
238+ Either :: B ( future:: ok ( ( ) ) )
239+ }
240+ }
241+
219242 /// Kill a connection sending `PACKET_ID_KILL` packet and removing it from
220243 /// the connections list.
221244 pub fn kill_connection ( & self , real_pk : PublicKey ) -> impl Future < Item = ( ) , Error = KillConnectionError > {
222245 if let Some ( connection) = self . connections . write ( ) . remove ( & real_pk) {
223246 let mut connection = connection. write ( ) ;
224247 self . clear_keys_by_addr ( & connection) ;
225- if connection. is_established ( ) || connection. is_not_confirmed ( ) {
248+ let status_future = self . send_connection_status ( & connection, false )
249+ . map_err ( |e| e. context ( KillConnectionErrorKind :: SendToConnectionStatus ) . into ( ) ) ;
250+ let kill_future = if connection. is_established ( ) || connection. is_not_confirmed ( ) {
226251 let packet_number = connection. send_array . buffer_end ;
227252 Either :: A ( self . send_data_packet ( & mut connection, vec ! [ PACKET_ID_KILL ] , packet_number)
228253 . map_err ( |e| e. context ( KillConnectionErrorKind :: SendTo ) . into ( ) ) )
229254 } else {
230255 Either :: B ( future:: ok ( ( ) ) )
231- }
256+ } ;
257+ Either :: A ( kill_future. join ( status_future) . map ( |_| ( ) ) )
232258 } else {
233259 Either :: B ( future:: err ( KillConnectionErrorKind :: NoConnection . into ( ) ) )
234260 }
@@ -672,17 +698,21 @@ impl NetCrypto {
672698
673699 if packet_id == PACKET_ID_KILL {
674700 // Kill the connection
701+ let status_future = Box :: new ( self . send_connection_status ( & connection, false )
702+ . map_err ( |e| e. context ( HandlePacketErrorKind :: SendToConnectionStatus ) . into ( ) ) )
703+ as Box < dyn Future < Item = _ , Error = _ > + Send > ;
675704 self . connections . write ( ) . remove ( & connection. peer_real_pk ) ;
676705 self . clear_keys_by_addr ( & connection) ;
677- return Box :: new ( future :: ok ( ( ) ) ) ;
706+ return status_future ;
678707 }
679708
680709 // Update nonce if diff is big enough
681710 if diff > NONCE_DIFF_THRESHOLD * 2 {
682711 increment_nonce_number ( & mut received_nonce, u64:: from ( NONCE_DIFF_THRESHOLD ) ) ;
683712 }
684713
685- // TODO: connection status notification
714+ let status_future = self . send_connection_status ( & connection, true )
715+ . map_err ( |e| e. context ( HandlePacketErrorKind :: SendToConnectionStatus ) . into ( ) ) ;
686716
687717 connection. status = ConnectionStatus :: Established {
688718 sent_nonce,
@@ -725,7 +755,7 @@ impl NetCrypto {
725755 }
726756 }
727757
728- result
758+ Box :: new ( result. join ( status_future ) . map ( |_| ( ) ) )
729759 }
730760
731761 /// Handle `CryptoData` packet received from UDP socket
@@ -844,6 +874,12 @@ impl NetCrypto {
844874 keys_by_addr. remove ( & ( addr. ip ( ) , addr. port ( ) ) ) ;
845875 }
846876
877+ if connection. is_established ( ) {
878+ let status_future = self . send_connection_status ( & connection, false )
879+ . map_err ( |e| e. context ( SendDataErrorKind :: SendToConnectionStatus ) . into ( ) ) ;
880+ futures. push ( Box :: new ( status_future) ) ;
881+ }
882+
847883 if connection. is_established ( ) || connection. is_not_confirmed ( ) {
848884 let packet_number = connection. send_array . buffer_end ;
849885 futures. push ( Box :: new ( self . send_data_packet ( & mut connection, vec ! [ PACKET_ID_KILL ] , packet_number) ) ) ;
@@ -911,6 +947,12 @@ impl NetCrypto {
911947 pub fn set_dht_pk_sink ( & self , dht_pk_tx : DhtPkTx ) {
912948 * self . dht_pk_tx . write ( ) = Some ( dht_pk_tx) ;
913949 }
950+
951+ /// Set sink to send a connection status when it becomes connected or
952+ /// disconnected.
953+ pub fn set_connection_status_sink ( & self , connection_status_tx : ConnectionStatusTx ) {
954+ * self . connection_status_tx . write ( ) = Some ( connection_status_tx) ;
955+ }
914956}
915957
916958#[ cfg( test) ]
0 commit comments