Skip to content

Commit ef72894

Browse files
author
Roman Proskuryakov
authored
Merge pull request #365 from tox-rs/timeouts
Refactor timeouts
2 parents c559674 + 12ea971 commit ef72894

File tree

5 files changed

+49
-58
lines changed

5 files changed

+49
-58
lines changed

src/toxcore/dht/lan_discovery.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures::sync::mpsc;
1010
use get_if_addrs;
1111
use get_if_addrs::IfAddr;
1212
use tokio::timer::Interval;
13-
use tokio::timer::timeout::Error as TimeoutError;
13+
use tokio::util::FutureExt;
1414

1515
use crate::toxcore::crypto_core::*;
1616
use crate::toxcore::io_tokio::*;
@@ -48,9 +48,6 @@ pub const END_PORT: u16 = 33546;
4848
/// Interval in seconds between `LanDiscovery` packet sending.
4949
pub const LAN_DISCOVERY_INTERVAL: u64 = 10;
5050

51-
/// Timeout in seconds for packet sending
52-
pub const LAN_DISCOVERY_SEND_TIMEOUT: u64 = 1;
53-
5451
/// Shorthand for the transmit half of the message channel.
5552
type Tx = mpsc::Sender<(Packet, SocketAddr)>;
5653

@@ -130,7 +127,7 @@ impl LanDiscoverySender {
130127
}
131128

132129
/// Send `LanDiscovery` packets.
133-
fn send(&mut self) -> impl Future<Item=(), Error=TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
130+
fn send(&mut self) -> impl Future<Item=(), Error=mpsc::SendError<(Packet, SocketAddr)>> + Send {
134131
let addrs = self.get_broadcast_socket_addrs();
135132
let lan_packet = Packet::LanDiscovery(LanDiscovery {
136133
pk: self.dht_pk,
@@ -140,7 +137,7 @@ impl LanDiscoverySender {
140137
addrs.into_iter().map(move |addr| (lan_packet.clone(), addr))
141138
);
142139

143-
send_all_to_bounded(&self.tx, stream, Duration::from_secs(LAN_DISCOVERY_SEND_TIMEOUT))
140+
send_all_to(&self.tx, stream)
144141
}
145142

146143
/// Run LAN discovery periodically. Result future will never be completed
@@ -152,7 +149,7 @@ impl LanDiscoverySender {
152149
.map_err(|e| e.context(LanDiscoveryErrorKind::Wakeup).into())
153150
.for_each(move |_instant| {
154151
trace!("LAN discovery sender wake up");
155-
self.send().then(|r| {
152+
self.send().timeout(interval).then(|r| {
156153
if let Err(e) = r {
157154
warn!("Failed to send LAN discovery packets: {}", e);
158155
if let Some(e) = e.into_inner() {

src/toxcore/dht/server/mod.rs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use futures::future::{Either, join_all};
1212
use futures::sync::mpsc;
1313
use parking_lot::RwLock;
1414
use tokio::timer::Interval;
15-
use tokio::timer::timeout::Error as TimeoutError;
15+
use tokio::util::FutureExt;
1616

1717
use std::collections::HashMap;
1818
use std::net::SocketAddr;
@@ -70,8 +70,6 @@ pub const FAKE_FRIENDS_NUMBER: usize = 2;
7070
/// Maximum number of entry in Lru cache for precomputed keys.
7171
pub const PRECOMPUTED_LRU_CACHE_SIZE: usize = KBUCKET_DEFAULT_SIZE as usize * KBUCKET_MAX_ENTRIES as usize + // For KTree.
7272
KBUCKET_DEFAULT_SIZE as usize * (2 + 10); // For friend's close_nodes of 2 fake friends + 10 friends reserved
73-
/// Timeout in seconds for packet sending
74-
pub const DHT_SEND_TIMEOUT: u64 = 1;
7573
/// How often DHT main loop should be called.
7674
const MAIN_LOOP_INTERVAL: u64 = 1;
7775

@@ -382,15 +380,22 @@ impl Server {
382380
.map_err(|e| e.context(RunErrorKind::Wakeup).into())
383381
.for_each(move |_instant| {
384382
trace!("Bootstrap wake up");
385-
self.send_bootstrap_requests()
386-
.map_err(|e| e.context(RunErrorKind::SendTo).into())
383+
self.send_bootstrap_requests().timeout(interval).then(|r| {
384+
if let Err(e) = r {
385+
warn!("Failed to send initial bootstrap packets: {}", e);
386+
if let Some(e) = e.into_inner() {
387+
return future::err(e.context(RunErrorKind::SendTo).into())
388+
}
389+
}
390+
future::ok(())
391+
})
387392
})
388393
}
389394

390395
/// Check if all nodes in Ktree are discarded (including the case when
391396
/// it's empty) and if so then send `NodesRequest` packet to nodes from
392397
/// initial bootstrap list and from Ktree.
393-
fn send_bootstrap_requests(&self) -> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
398+
fn send_bootstrap_requests(&self) -> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
394399
let mut request_queue = self.request_queue.write();
395400
let close_nodes = self.close_nodes.read();
396401

@@ -417,9 +422,12 @@ impl Server {
417422
.map_err(|e| e.context(RunErrorKind::Wakeup).into())
418423
.for_each(move |_instant| {
419424
trace!("DHT server wake up");
420-
self.dht_main_loop().then(|res| {
425+
self.dht_main_loop().timeout(interval).then(|res| {
421426
if let Err(e) = res {
422427
warn!("Failed to send DHT periodical packets: {}", e);
428+
if let Some(e) = e.into_inner() {
429+
return future::err(e.context(RunErrorKind::SendTo).into())
430+
}
423431
}
424432
future::ok(())
425433
})
@@ -455,7 +463,7 @@ impl Server {
455463
}
456464

457465
/// Send `PingRequest` packets to nodes from `nodes_to_ping` list.
458-
fn send_pings(&self) -> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
466+
fn send_pings(&self) -> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
459467
let nodes_to_ping = mem::replace(
460468
&mut *self.nodes_to_ping.write(),
461469
Kbucket::<PackedNode>::new(MAX_TO_PING)
@@ -478,7 +486,7 @@ impl Server {
478486
/// a friend and we don't know it's address then this method will send
479487
/// `PingRequest` immediately instead of adding to a `nodes_to_ping`
480488
/// list.
481-
fn ping_add(&self, node: &PackedNode) -> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
489+
fn ping_add(&self, node: &PackedNode) -> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
482490
let close_nodes = self.close_nodes.read();
483491

484492
if !close_nodes.can_add(&node) {
@@ -502,7 +510,7 @@ impl Server {
502510
/// necessary to check whether node is alive before adding it to close
503511
/// nodes lists.
504512
fn ping_nodes_to_bootstrap(&self, request_queue: &mut RequestQueue<PublicKey>, nodes_to_bootstrap: &mut Kbucket<PackedNode>, pk: PublicKey)
505-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
513+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
506514
let capacity = nodes_to_bootstrap.capacity() as u8;
507515
let nodes_to_bootstrap = mem::replace(nodes_to_bootstrap, Kbucket::new(capacity));
508516

@@ -516,7 +524,7 @@ impl Server {
516524
/// Iterate over nodes from close nodes list and send `NodesRequest` packets
517525
/// to them if necessary.
518526
fn ping_close_nodes<'a, T>(&self, request_queue: &mut RequestQueue<PublicKey>, nodes: T, pk: PublicKey)
519-
-> Box<dyn Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send>
527+
-> Box<dyn Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send>
520528
where T: Iterator<Item = &'a mut DhtNode> // if change to impl Future the result will be dependent on nodes lifetime
521529
{
522530
let futures = nodes
@@ -539,7 +547,7 @@ impl Server {
539547
/// it was sent less than `NODES_REQ_INTERVAL`. This function should be
540548
/// called every second.
541549
fn send_nodes_req_random<'a, T>(&self, request_queue: &mut RequestQueue<PublicKey>, nodes: T, pk: PublicKey)
542-
-> Box<dyn Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send>
550+
-> Box<dyn Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send>
543551
where T: Iterator<Item = &'a DhtNode> // if change to impl Future the result will be dependent on nodes lifetime
544552
{
545553
let good_nodes = nodes
@@ -572,7 +580,7 @@ impl Server {
572580

573581
/// Send `PingRequest` packet to the node.
574582
fn send_ping_req(&self, node: &PackedNode, request_queue: &mut RequestQueue<PublicKey>)
575-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
583+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
576584
let payload = PingRequestPayload {
577585
id: request_queue.new_ping_id(node.pk),
578586
};
@@ -586,7 +594,7 @@ impl Server {
586594

587595
/// Send `NodesRequest` packet to the node.
588596
fn send_nodes_req(&self, node: &PackedNode, request_queue: &mut RequestQueue<PublicKey>, search_pk: PublicKey)
589-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
597+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
590598
// Check if packet is going to be sent to ourselves.
591599
if self.pk == node.pk {
592600
trace!("Attempt to send NodesRequest to ourselves.");
@@ -607,7 +615,7 @@ impl Server {
607615

608616
/// Send `NatPingRequest` packet to all friends and try to punch holes.
609617
fn send_nat_ping_req(&self, request_queue: &mut RequestQueue<PublicKey>, friends: &mut HashMap<PublicKey, DhtFriend>)
610-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
618+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
611619
let futures = friends.values_mut()
612620
.filter(|friend| !friend.is_addr_known())
613621
.map(|friend| {
@@ -645,7 +653,7 @@ impl Server {
645653

646654
/// Try to punch holes to specified friend.
647655
fn punch_holes(&self, request_queue: &mut RequestQueue<PublicKey>, friend: &mut DhtFriend, returned_addrs: &[SocketAddr])
648-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
656+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
649657
let punch_addrs = friend.hole_punch.next_punch_addrs(returned_addrs);
650658

651659
let packets = punch_addrs.into_iter().map(|addr| {
@@ -661,13 +669,13 @@ impl Server {
661669
(packet, addr)
662670
}).collect::<Vec<_>>();
663671

664-
send_all_to_bounded(&self.tx, stream::iter_ok(packets), Duration::from_secs(DHT_SEND_TIMEOUT))
672+
send_all_to(&self.tx, stream::iter_ok(packets))
665673
}
666674

667675
/// Send `NatPingRequest` packet to all close nodes of friend in the hope
668676
/// that they will redirect it to this friend.
669677
fn send_nat_ping_req_inner(&self, friend: &DhtFriend, nat_ping_req_packet: DhtRequest)
670-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
678+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
671679
let packet = Packet::DhtRequest(nat_ping_req_packet);
672680
let futures = friend.close_nodes.nodes
673681
.iter()
@@ -713,8 +721,8 @@ impl Server {
713721

714722
/// Send UDP packet to specified address.
715723
fn send_to(&self, addr: SocketAddr, packet: Packet)
716-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
717-
send_to_bounded(&self.tx, (packet, addr), Duration::from_secs(DHT_SEND_TIMEOUT))
724+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
725+
send_to(&self.tx, (packet, addr))
718726
}
719727

720728
/// Handle received `PingRequest` packet and response with `PingResponse`
@@ -1335,7 +1343,7 @@ impl Server {
13351343
/// Handle `OnionRequest` from TCP relay and send `OnionRequest1` packet
13361344
/// to the next node in the onion path.
13371345
pub fn handle_tcp_onion_request(&self, packet: OnionRequest, addr: SocketAddr)
1338-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
1346+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
13391347
let onion_symmetric_key = self.onion_symmetric_key.read();
13401348

13411349
let onion_return = OnionReturn::new(

src/toxcore/io_tokio.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@
22
33
use std::fmt::Debug;
44
use std::io::{Error as IoError};
5-
use std::time::Duration;
65

76
use futures::{Future, Sink, Stream};
8-
use tokio::util::FutureExt;
9-
use tokio::timer::timeout::Error as TimeoutError;
107

118
/// A convenience typedef around a `Future` whose error component is `io::Error`
129
pub type IoFuture<T> = Box<Future<Item = T, Error = IoError> + Send>;
@@ -24,14 +21,6 @@ pub fn send_to<T: Send + 'static, Tx, E: Debug>(tx: &Tx, v: T) -> impl Future<It
2421
.map(|_tx| ()) // ignore tx because it was cloned
2522
}
2623

27-
/// Send item to a sink using reference with timeout
28-
pub fn send_to_bounded<T: Send + 'static, Tx, E: Debug>(tx: &Tx, v: T, timeout: Duration) -> impl Future<Item=(), Error=TimeoutError<E>> + Send
29-
where Tx: Sink<SinkItem = T, SinkError = E> + Send + Clone + 'static
30-
{
31-
send_to(tx, v)
32-
.timeout(timeout)
33-
}
34-
3524
/// Send item to a sink using reference
3625
pub fn send_all_to<T: Send + 'static, S, Tx, E: Debug>(tx: &Tx, s: S) -> impl Future<Item=(), Error=E> + Send
3726
where S: Stream<Item = T, Error = E> + Send + 'static,
@@ -43,11 +32,3 @@ pub fn send_all_to<T: Send + 'static, S, Tx, E: Debug>(tx: &Tx, s: S) -> impl Fu
4332
.map(|_tx| ()) // ignore tx because it was cloned
4433
}
4534

46-
/// Send item to a sink using reference with timeout
47-
pub fn send_all_to_bounded<T: Send + 'static, S, Tx, E: Debug>(tx: &Tx, s: S, timeout: Duration) -> impl Future<Item=(), Error=TimeoutError<E>> + Send
48-
where S: Stream<Item = T, Error = E> + Send + 'static,
49-
Tx: Sink<SinkItem = T, SinkError = E> + Send + Clone + 'static
50-
{
51-
send_all_to(tx, s)
52-
.timeout(timeout)
53-
}

src/toxcore/net_crypto/mod.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use futures::future::Either;
3131
use futures::sync::mpsc;
3232
use parking_lot::RwLock;
3333
use tokio::timer::Interval;
34-
use tokio::timer::timeout::Error as TimeoutError;
34+
use tokio::util::FutureExt;
3535

3636
use crate::toxcore::binary_io::*;
3737
use crate::toxcore::crypto_core::*;
@@ -68,9 +68,6 @@ const PACKET_ID_LOSSY_RANGE_START: u8 = 192;
6868
/// `PACKET_ID_LOSSY_RANGE_END` are considered lossy packets.
6969
const PACKET_ID_LOSSY_RANGE_END: u8 = 254;
7070

71-
/// Timeout for packet sending
72-
const NET_CRYPTO_SEND_TIMEOUT: Duration = Duration::from_millis(50);
73-
7471
/// Shorthand for the transmit half of the message channel for sending DHT
7572
/// packets.
7673
type UdpTx = mpsc::Sender<(Packet, SocketAddr)>;
@@ -292,8 +289,8 @@ impl NetCrypto {
292289
}
293290

294291
/// Send `Packet` packet to UDP socket
295-
fn send_to_udp(&self, addr: SocketAddr, packet: Packet) -> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
296-
send_to_bounded(&self.udp_tx, (packet, addr), NET_CRYPTO_SEND_TIMEOUT)
292+
fn send_to_udp(&self, addr: SocketAddr, packet: Packet) -> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
293+
send_to(&self.udp_tx, (packet, addr))
297294
}
298295

299296
/// Get long term `PublicKey` of the peer by its UDP address
@@ -747,7 +744,7 @@ impl NetCrypto {
747744

748745
/// Send packet to crypto connection choosing TCP or UDP protocol
749746
fn send_packet(&self, packet: Packet, connection: &mut CryptoConnection)
750-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
747+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
751748
// TODO: can backpressure be used instead of congestion control? It
752749
// seems it's possible to implement wrapper for bounded sender with
753750
// priority queue and just send packets there
@@ -778,7 +775,7 @@ impl NetCrypto {
778775
/// Send `CookieRequest` or `CryptoHandshake` packet if needed depending on
779776
/// connection status and update sent counter
780777
fn send_status_packet(&self, connection: &mut CryptoConnection)
781-
-> impl Future<Item = (), Error = TimeoutError<mpsc::SendError<(Packet, SocketAddr)>>> + Send {
778+
-> impl Future<Item = (), Error = mpsc::SendError<(Packet, SocketAddr)>> + Send {
782779
match connection.packet_to_send() {
783780
Some(packet) => Either::A(self.send_packet(packet, connection)),
784781
None => Either::B(future::ok(())),
@@ -900,8 +897,15 @@ impl NetCrypto {
900897

901898
wakeups
902899
.map_err(|e| e.context(RunErrorKind::Wakeup).into())
903-
.for_each(move |_instant| self.main_loop()
904-
.map_err(|e| e.context(RunErrorKind::SendData).into())
900+
.for_each(move |_instant| self.main_loop().timeout(PACKET_COUNTER_AVERAGE_INTERVAL).then(|r| {
901+
if let Err(e) = r {
902+
warn!("Failed to send net crypto packets: {}", e);
903+
if let Some(e) = e.into_inner() {
904+
return future::err(e.context(RunErrorKind::SendData).into())
905+
}
906+
}
907+
future::ok(())
908+
})
905909
)
906910
}
907911
}

src/toxcore/tcp/server/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::time::{Instant, Duration};
1616

1717
use futures::Future;
1818
use futures::sync::mpsc;
19+
use tokio::util::FutureExt;
1920

2021
/// Interval in seconds for sending TCP PingRequest
2122
pub const TCP_PING_FREQUENCY: u64 = 30;
@@ -128,7 +129,7 @@ impl Client {
128129
/** Send a packet. This method does not ignore IO error
129130
*/
130131
fn send(&self, packet: Packet) -> impl Future<Item = (), Error = Error> + Send {
131-
send_to_bounded(&self.tx, packet, Duration::from_secs(TCP_SEND_TIMEOUT)).map_err(|e|
132+
send_to(&self.tx, packet).timeout(Duration::from_secs(TCP_SEND_TIMEOUT)).map_err(|e|
132133
Error::new(ErrorKind::Other,
133134
format!("Failed to send packet: {:?}", e)
134135
))

0 commit comments

Comments
 (0)