Skip to content

Commit df48366

Browse files
committed
refactor(tcp_server): release state lock before sending packets
Currently we send tcp packets while holding the state lock. At the same time we use bounded queues for sending of packets. This means that a single stuck client can block the whole server until the timeout happens. This change makes the server release the state lock before sending any packet.
1 parent bcb1868 commit df48366

File tree

2 files changed

+178
-178
lines changed

2 files changed

+178
-178
lines changed

tox_core/src/relay/server/client.rs

Lines changed: 5 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,20 @@
33

44
use tox_crypto::*;
55
use tox_packet::relay::*;
6-
use tox_packet::relay::connection_id::ConnectionId;
76
use crate::relay::links::Links;
8-
use tox_packet::onion::InnerOnionResponse;
97
use crate::time::*;
108
use crate::utils::*;
119

12-
use std::io::{Error, ErrorKind};
1310
use std::net::IpAddr;
1411
use std::time::{Instant, Duration};
1512

1613
use futures::channel::mpsc;
17-
use futures::SinkExt;
1814
use rand::thread_rng;
1915

2016
/// Interval of time for sending TCP PingRequest
2117
pub const TCP_PING_FREQUENCY: Duration = Duration::from_secs(30);
2218
/// Interval of time for waiting response of PingRequest sent
2319
pub const TCP_PING_TIMEOUT: Duration = Duration::from_secs(10);
24-
/// Interval of time for packet sending
25-
pub const TCP_SEND_TIMEOUT: Duration = Duration::from_secs(1);
2620

2721
/** Structure that represents how Server keeps connected clients. A write-only socket with
2822
human interface. A client cannot send a message directly to another client, whereas server can.
@@ -125,93 +119,16 @@ impl Client {
125119
&mut self.links
126120
}
127121

128-
/** Send a packet. This method does not ignore IO error
129-
*/
130-
async fn send(&self, packet: Packet) -> Result<(), Error> {
131-
let mut tx = self.tx.clone();
132-
133-
let timeout = tokio::time::timeout(
134-
TCP_SEND_TIMEOUT,
135-
tx.send(packet)
136-
);
137-
138-
match timeout.await {
139-
Err(e) => Err(Error::new(
140-
ErrorKind::Other,
141-
format!("Failed to send packet: {:?}", e)
142-
)),
143-
Ok(Err(e)) => Err(Error::new(
144-
ErrorKind::Other,
145-
format!("Failed to send packet: {:?}", e)
146-
)),
147-
Ok(_) => Ok(())
148-
}
149-
}
150-
/** Send a packet. This method ignores IO error
151-
*/
152-
async fn send_ignore_error(&self, packet: Packet) {
153-
// ignore if somehow failed to send it
154-
self.send(packet).await.ok();
155-
}
156-
/** Construct RouteResponse and send it to Client
157-
*/
158-
pub async fn send_route_response(&self, pk: PublicKey, connection_id: ConnectionId) -> Result<(), Error> {
159-
self.send(
160-
Packet::RouteResponse(RouteResponse { connection_id, pk })
161-
).await
162-
}
163-
/** Construct ConnectNotification and send it to Client ignoring IO error
164-
*/
165-
pub async fn send_connect_notification(&self, connection_id: ConnectionId) {
166-
self.send_ignore_error(
167-
Packet::ConnectNotification(ConnectNotification { connection_id })
168-
).await;
169-
}
170-
/** Construct DisconnectNotification and send it to Client ignoring IO error
171-
*/
172-
pub async fn send_disconnect_notification(&self, connection_id: ConnectionId) {
173-
self.send_ignore_error(
174-
Packet::DisconnectNotification(DisconnectNotification { connection_id })
175-
).await;
122+
pub fn tx(&self) -> mpsc::Sender<Packet> {
123+
self.tx.clone()
176124
}
177-
/** Construct PongResponse and send it to Client
178-
*/
179-
pub async fn send_pong_response(&self, ping_id: u64) -> Result<(), Error> {
180-
self.send(
181-
Packet::PongResponse(PongResponse { ping_id })
182-
).await
183-
}
184-
/** Construct OobReceive and send it to Client ignoring IO error
185-
*/
186-
pub async fn send_oob(&self, sender_pk: PublicKey, data: Vec<u8>) {
187-
self.send_ignore_error(
188-
Packet::OobReceive(OobReceive { sender_pk, data })
189-
).await;
190-
}
191-
/** Construct OnionResponse and send it to Client
192-
*/
193-
pub async fn send_onion_response(&self, payload: InnerOnionResponse) -> Result<(), Error> {
194-
self.send(
195-
Packet::OnionResponse(OnionResponse { payload })
196-
).await
197-
}
198-
/** Construct Data and send it to Client
199-
*/
200-
pub async fn send_data(&self, connection_id: ConnectionId, data: DataPayload) -> Result<(), Error> {
201-
self.send(
202-
Packet::Data(Data { connection_id, data })
203-
).await
204-
}
205-
/** Construct PingRequest and send it to Client
206-
*/
207-
pub async fn send_ping_request(&mut self) -> Result<(), Error> {
125+
126+
pub fn new_ping_id(&mut self) -> u64 {
208127
let ping_id = gen_ping_id(&mut thread_rng());
209128

210129
self.last_pinged = Instant::now();
211130
self.ping_id = ping_id;
212131

213-
self.send(
214-
Packet::PingRequest(PingRequest { ping_id })
215-
).await
132+
ping_id
216133
}
217134
}

0 commit comments

Comments
 (0)