Skip to content

Commit f7d9182

Browse files
author
Roman Proskuryakov
authored
Merge pull request #380 from tox-rs/friend_request
Handle friend requests inside onion client
2 parents 76840d9 + 5afddd2 commit f7d9182

File tree

5 files changed

+87
-45
lines changed

5 files changed

+87
-45
lines changed

src/toxcore/dht/server/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ pub struct Server {
118118
/// Tx split of a channel to send packets to this peer via UDP socket.
119119
pub tx: Tx,
120120
/// Sink to send friend's `SocketAddr` when it gets known.
121-
friend_saddr_sink: Arc<RwLock<Option<mpsc::UnboundedSender<PackedNode>>>>,
121+
friend_saddr_sink: Arc<RwLock<OptionalSink<mpsc::UnboundedSender<PackedNode>>>>,
122122
/// Struct that stores and manages requests IDs and timeouts.
123123
request_queue: Arc<RwLock<RequestQueue<PublicKey>>>,
124124
/// Close nodes list which contains nodes close to own DHT `PublicKey`.
@@ -759,13 +759,9 @@ impl Server {
759759
for friend in friends.values_mut() {
760760
friend.try_add_to_close(node);
761761
}
762-
if let Some(ref friend_saddr_sink) = *self.friend_saddr_sink.read() {
763-
if friends.contains_key(&node.pk) {
764-
Either::A(send_to(friend_saddr_sink, node)
765-
.map_err(|e| e.context(HandlePacketErrorKind::FriendSaddr).into()))
766-
} else {
767-
Either::B(future::ok(()))
768-
}
762+
if friends.contains_key(&node.pk) {
763+
Either::A(send_to(&*self.friend_saddr_sink.read(), node)
764+
.map_err(|e| e.context(HandlePacketErrorKind::FriendSaddr).into()))
769765
} else {
770766
Either::B(future::ok(()))
771767
}
@@ -1467,7 +1463,7 @@ impl Server {
14671463

14681464
/// Set sink to send friend's `SocketAddr` when it gets known.
14691465
pub fn set_friend_saddr_sink(&self, friend_saddr_sink: mpsc::UnboundedSender<PackedNode>) {
1470-
*self.friend_saddr_sink.write() = Some(friend_saddr_sink);
1466+
self.friend_saddr_sink.write().set(friend_saddr_sink);
14711467
}
14721468

14731469
/// Get `PrecomputedKey`s cache.

src/toxcore/io_tokio.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::fmt::Debug;
44
use std::io::{Error as IoError};
55

6-
use futures::{Future, Sink, Stream};
6+
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
77

88
/// A convenience typedef around a `Future` whose error component is `io::Error`
99
pub type IoFuture<T> = Box<Future<Item = T, Error = IoError> + Send>;
@@ -32,3 +32,51 @@ pub fn send_all_to<T: Send + 'static, S, Tx, E: Debug>(tx: &Tx, s: S) -> impl Fu
3232
.map(|_tx| ()) // ignore tx because it was cloned
3333
}
3434

35+
/// `Sink` type that can either be empty or store inner `Sink`. When it's empty
36+
/// it drops all sent elements.
37+
#[derive(Debug, Clone)]
38+
pub struct OptionalSink<S: Sink>(Option<S>);
39+
40+
impl<S: Sink> OptionalSink<S> {
41+
/// Create new `OptionalSink`.
42+
pub fn new() -> Self {
43+
OptionalSink(None)
44+
}
45+
46+
/// Set a sink to `OptionalSink`.
47+
pub fn set(&mut self, sink: S) {
48+
self.0 = Some(sink);
49+
}
50+
}
51+
52+
impl<S: Sink> Default for OptionalSink<S> {
53+
fn default() -> Self {
54+
OptionalSink::new()
55+
}
56+
}
57+
58+
impl<S: Sink> Sink for OptionalSink<S> {
59+
type SinkItem = S::SinkItem;
60+
type SinkError = S::SinkError;
61+
fn start_send(&mut self, item: S::SinkItem) -> StartSend<S::SinkItem, S::SinkError> {
62+
if let Some(ref mut sink) = self.0 {
63+
sink.start_send(item)
64+
} else {
65+
Ok(AsyncSink::Ready)
66+
}
67+
}
68+
fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
69+
if let Some(ref mut sink) = self.0 {
70+
sink.poll_complete()
71+
} else {
72+
Ok(Async::Ready(()))
73+
}
74+
}
75+
fn close(&mut self) -> Poll<(), S::SinkError> {
76+
if let Some(ref mut sink) = self.0 {
77+
sink.close()
78+
} else {
79+
Ok(Async::Ready(()))
80+
}
81+
}
82+
}

src/toxcore/net_crypto/mod.rs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,10 @@ pub struct NetCrypto {
128128
/// packet. If key from `Cookie` is not equal to saved key inside
129129
/// `CryptoConnection` then `NetCrypto` module will send message to this
130130
/// sink.
131-
dht_pk_tx: Arc<RwLock<Option<DhtPkTx>>>,
131+
dht_pk_tx: Arc<RwLock<OptionalSink<DhtPkTx>>>,
132132
/// Sink to send a connection status when it becomes connected or
133133
/// disconnected. The key is a long term key of the connection.
134-
connection_status_tx: Arc<RwLock<Option<ConnectionStatusTx>>>,
134+
connection_status_tx: Arc<RwLock<OptionalSink<ConnectionStatusTx>>>,
135135
/// Sink to send lossless packets. The key is a long term public key of the
136136
/// peer that sent this packet.
137137
lossless_tx: LosslessTx,
@@ -229,11 +229,7 @@ impl NetCrypto {
229229
/// or disconnected.
230230
fn send_connection_status(&self, connection: &CryptoConnection, status: bool) -> impl Future<Item = (), Error = mpsc::SendError<(PublicKey, bool)>> {
231231
if connection.is_established() != status {
232-
if let Some(ref connection_status_tx) = *self.connection_status_tx.read() {
233-
Either::A(send_to(connection_status_tx, (connection.peer_real_pk, status)))
234-
} else {
235-
Either::B(future::ok(()))
236-
}
232+
Either::A(send_to(&*self.connection_status_tx.read(), (connection.peer_real_pk, status)))
237233
} else {
238234
Either::B(future::ok(()))
239235
}
@@ -452,12 +448,8 @@ impl NetCrypto {
452448
return Box::new(future::err(HandlePacketError::from(HandlePacketErrorKind::InvalidRealPk)))
453449
}
454450
if cookie.dht_pk != connection.peer_dht_pk {
455-
let dht_pk_future = if let Some(ref dht_pk_tx) = *self.dht_pk_tx.read() {
456-
Either::A(send_to(dht_pk_tx, (connection.peer_real_pk, cookie.dht_pk))
457-
.map_err(|e| e.context(HandlePacketErrorKind::SendToDhtpk).into()))
458-
} else {
459-
Either::B(future::ok(()))
460-
};
451+
let dht_pk_future = send_to(&*self.dht_pk_tx.read(), (connection.peer_real_pk, cookie.dht_pk))
452+
.map_err(|e| e.context(HandlePacketErrorKind::SendToDhtpk).into());
461453
return Box::new(dht_pk_future
462454
.and_then(|()| future::err(HandlePacketError::from(HandlePacketErrorKind::InvalidDhtPk)))
463455
)
@@ -561,12 +553,8 @@ impl NetCrypto {
561553
let connection = Arc::new(RwLock::new(connection));
562554
connections.insert(cookie.real_pk, connection);
563555

564-
let dht_pk_future = if let Some(ref dht_pk_tx) = *self.dht_pk_tx.read() {
565-
Either::A(send_to(dht_pk_tx, (cookie.real_pk, cookie.dht_pk))
566-
.map_err(|e| e.context(HandlePacketErrorKind::SendToDhtpk).into()))
567-
} else {
568-
Either::B(future::ok(()))
569-
};
556+
let dht_pk_future = send_to(&*self.dht_pk_tx.read(), (cookie.real_pk, cookie.dht_pk))
557+
.map_err(|e| e.context(HandlePacketErrorKind::SendToDhtpk).into());
570558

571559
Either::B(kill_future.join(dht_pk_future).map(|_| ()))
572560
}
@@ -986,13 +974,13 @@ impl NetCrypto {
986974

987975
/// Set sink to send DHT `PublicKey` when it gets known.
988976
pub fn set_dht_pk_sink(&self, dht_pk_tx: DhtPkTx) {
989-
*self.dht_pk_tx.write() = Some(dht_pk_tx);
977+
self.dht_pk_tx.write().set(dht_pk_tx);
990978
}
991979

992980
/// Set sink to send a connection status when it becomes connected or
993981
/// disconnected.
994982
pub fn set_connection_status_sink(&self, connection_status_tx: ConnectionStatusTx) {
995-
*self.connection_status_tx.write() = Some(connection_status_tx);
983+
self.connection_status_tx.write().set(connection_status_tx);
996984
}
997985
}
998986

src/toxcore/onion/client/errors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ error_kind! {
6464
#[doc = "Failed to handle DHT `PublicKey` announce."]
6565
#[fail(display = "Failed to handle DHT PublicKey announce")]
6666
DhtPkAnnounce,
67+
#[doc = "Failed to send a friend request."]
68+
#[fail(display = "Failed to send a friend request")]
69+
FriendRequest,
6770
}
6871
}
6972

src/toxcore/onion/client/mod.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ use crate::toxcore::time::*;
4040
/// key is a DHT key.
4141
type DhtPkTx = mpsc::UnboundedSender<(PublicKey, PublicKey)>;
4242

43+
/// Shorthand for the transmit half of the message channel for sending friend
44+
/// requests when we get them. The key is a long term key.
45+
type FriendRequestTx = mpsc::UnboundedSender<(PublicKey, FriendRequest)>;
46+
4347
/// Number of friend's close nodes to store.
4448
const MAX_ONION_FRIEND_NODES: u8 = 8;
4549

@@ -302,7 +306,10 @@ struct OnionClientState {
302306
friends: HashMap<PublicKey, OnionFriend>,
303307
/// Sink to send DHT `PublicKey` when it gets known. The first key is a long
304308
/// term key, the second key is a DHT key.
305-
dht_pk_tx: Option<DhtPkTx>,
309+
dht_pk_tx: OptionalSink<DhtPkTx>,
310+
/// Sink to send friend requests when we get them. The key is a long term
311+
/// key.
312+
friend_request_tx: OptionalSink<FriendRequestTx>,
306313
}
307314

308315
impl OnionClientState {
@@ -312,7 +319,8 @@ impl OnionClientState {
312319
announce_list: Kbucket::new(MAX_ONION_ANNOUNCE_NODES),
313320
announce_requests: RequestQueue::new(ANNOUNCE_TIMEOUT),
314321
friends: HashMap::new(),
315-
dht_pk_tx: None,
322+
dht_pk_tx: OptionalSink::new(),
323+
friend_request_tx: OptionalSink::new(),
316324
}
317325
}
318326
}
@@ -360,7 +368,7 @@ impl OnionClient {
360368

361369
/// Set sink to send DHT `PublicKey` when it gets known.
362370
pub fn set_dht_pk_sink(&self, dht_pk_tx: DhtPkTx) {
363-
self.state.lock().dht_pk_tx = Some(dht_pk_tx);
371+
self.state.lock().dht_pk_tx.set(dht_pk_tx);
364372
}
365373

366374
/// Check if a node was pinged recently.
@@ -511,11 +519,7 @@ impl OnionClient {
511519
friend.dht_pk = Some(dht_pk_announce.dht_pk);
512520
friend.last_seen = Some(clock_now());
513521

514-
let dht_pk_future = if let Some(ref dht_pk_tx) = state.dht_pk_tx {
515-
Either::A(send_to(dht_pk_tx, (friend_pk, dht_pk_announce.dht_pk)))
516-
} else {
517-
Either::B(future::ok(()))
518-
};
522+
let dht_pk_future = send_to(&state.dht_pk_tx, (friend_pk, dht_pk_announce.dht_pk));
519523

520524
let futures = dht_pk_announce.nodes.into_iter().map(|node| match node.ip_port.protocol {
521525
ProtocolType::UDP => {
@@ -545,13 +549,15 @@ impl OnionClient {
545549
Ok(payload) => payload,
546550
Err(e) => return Either::A(future::err(e.context(HandleDataResponseErrorKind::InvalidInnerPayload).into()))
547551
};
548-
match iner_payload {
552+
let future = match iner_payload {
549553
OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce) =>
550-
Either::B(self.handle_dht_pk_announce(payload.real_pk, dht_pk_announce)
554+
Either::A(self.handle_dht_pk_announce(payload.real_pk, dht_pk_announce)
551555
.map_err(|e| e.context(HandleDataResponseErrorKind::DhtPkAnnounce).into())),
552-
OnionDataResponseInnerPayload::FriendRequest(_) =>
553-
Either::A(future::ok(()))
554-
}
556+
OnionDataResponseInnerPayload::FriendRequest(friend_request) =>
557+
Either::B(send_to(&self.state.lock().friend_request_tx, (payload.real_pk, friend_request))
558+
.map_err(|e| e.context(HandleDataResponseErrorKind::FriendRequest).into()))
559+
};
560+
Either::B(future)
555561
}
556562

557563
/// Add new node to random nodes pool to use them to build random paths.
@@ -991,7 +997,8 @@ mod tests {
991997
announce_list: Kbucket::new(8),
992998
announce_requests: RequestQueue::new(Duration::from_secs(42)),
993999
friends: HashMap::new(),
994-
dht_pk_tx: None,
1000+
dht_pk_tx: OptionalSink::new(),
1001+
friend_request_tx: OptionalSink::new(),
9951002
};
9961003

9971004
let _onion_client_state_c = onion_client_state.clone();

0 commit comments

Comments
 (0)