Skip to content

Commit 4c79d99

Browse files
committed
Simplify tftpd-targz.rs example
1 parent 8343f2a commit 4c79d99

File tree

2 files changed

+32
-95
lines changed

2 files changed

+32
-95
lines changed

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,19 @@ futures-util = "0.3.5"
3131

3232
[dev-dependencies]
3333
anyhow = "1.0.32"
34-
log = "0.4.11"
3534
structopt = "0.3.16"
3635
rand = { version = "0.7.3", features = ["small_rng"] }
3736
md5 = "0.7.0"
3837
tempfile = "3.1.0"
3938
matches = "0.1.8"
4039
fern = "0.6.0"
41-
piper = "0.1.3"
42-
tar = "0.4.29"
43-
flate2 = "1.0.16"
44-
threadpool = "1.8.1"
4540
async-executor = "0.1.2"
4641
async-channel = "1.4.0"
4742

43+
# deps for tftpd-targz.rs
44+
async-std = "1.6.3"
45+
async-tar = "0.3.0"
46+
async-compression = { version = "0.3.5", features = ["gzip", "futures-io"] }
47+
4848
[features]
4949
external-client-tests = []

examples/tftpd-targz.rs

Lines changed: 27 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
11
use anyhow::Result;
22
use structopt::StructOpt;
33

4+
use async_compression::futures::bufread::GzipDecoder;
5+
use async_std::fs::File;
6+
use async_std::io::{BufReader, Sink};
7+
use async_std::path::{Path, PathBuf};
8+
use async_std::stream::StreamExt;
9+
use async_std::task::block_on;
10+
use async_tar::{Archive, Entry};
411
use async_tftp::packet;
512
use async_tftp::server::{Handler, TftpServerBuilder};
6-
use flate2::read::GzDecoder;
7-
use futures_lite::future::block_on;
8-
use futures_lite::io::{AsyncWrite, AsyncWriteExt, Sink};
9-
use std::fs::File;
10-
use std::io::Read;
1113
use std::net::SocketAddr;
12-
use std::path::{Path, PathBuf};
13-
use tar::Archive;
14-
use threadpool::ThreadPool;
1514

1615
struct TftpdTarGzHandler {
1716
archive_path: PathBuf,
18-
thread_pool: ThreadPool,
1917
}
2018

2119
impl TftpdTarGzHandler {
2220
fn new(path: impl AsRef<Path>) -> Self {
2321
TftpdTarGzHandler {
2422
archive_path: path.as_ref().to_owned(),
25-
thread_pool: ThreadPool::new(5),
2623
}
2724
}
2825
}
@@ -32,108 +29,48 @@ fn strip_path_prefixes(path: &Path) -> &Path {
3229
path.strip_prefix("/").or_else(|_| path.strip_prefix("./")).unwrap_or(path)
3330
}
3431

35-
fn io_copy(
36-
mut src: impl Read,
37-
mut dest: impl AsyncWrite + Unpin,
38-
) -> std::io::Result<()> {
39-
let mut buf = [0u8; 1024];
40-
41-
loop {
42-
match src.read(&mut buf[..]) {
43-
Ok(0) => break,
44-
Ok(len) => block_on(dest.write_all(&buf[..len]))?,
45-
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
46-
Err(e) => return Err(e),
47-
}
48-
}
49-
50-
Ok(())
51-
}
52-
53-
// This macro sends `Err` over a channel, it is used within `thread_pool`.
54-
macro_rules! try_or_send {
55-
($e:expr, $tx:ident) => {{
56-
match $e {
57-
Ok(x) => x,
58-
Err(e) => {
59-
let _ = $tx.send(Err(e.into()));
60-
return;
61-
}
62-
}
63-
}};
64-
}
65-
6632
#[async_tftp::async_trait]
6733
impl Handler for TftpdTarGzHandler {
68-
type Reader = piper::Reader;
34+
type Reader = Entry<Archive<GzipDecoder<BufReader<File>>>>;
6935
type Writer = Sink;
7036

7137
async fn read_req_open(
7238
&mut self,
7339
_client: &SocketAddr,
74-
path: &Path,
40+
path: &std::path::Path,
7541
) -> Result<(Self::Reader, Option<u64>), packet::Error> {
76-
let req_path = strip_path_prefixes(path).to_owned();
77-
let archive_path = self.archive_path.clone();
78-
79-
let (pipe_r, pipe_w) = piper::pipe(65536);
80-
let (open_res_tx, open_res_rx) = async_channel::bounded(1);
42+
let req_path = strip_path_prefixes(path.into()).to_owned();
8143

82-
// We need to use our own thread pool to handle blocking IO
83-
// of `tar::Entry`.
84-
self.thread_pool.execute(move || {
85-
let file = try_or_send!(File::open(archive_path), open_res_tx);
44+
let file = File::open(self.archive_path.clone()).await?;
45+
let archive = Archive::new(GzipDecoder::new(BufReader::new(file)));
8646

87-
let mut archive = Archive::new(GzDecoder::new(file));
88-
let entries = try_or_send!(archive.entries(), open_res_tx);
47+
let mut entries = archive.entries()?;
8948

90-
for entry in entries {
91-
let entry = try_or_send!(entry, open_res_tx);
49+
while let Some(Ok(entry)) = entries.next().await {
50+
if entry
51+
.path()
52+
.map(|p| strip_path_prefixes(&*p) == req_path)
53+
.unwrap_or(false)
54+
{
55+
// We manage to find the entry.
9256

93-
// If entry path is the same with requested path.
94-
if entry
95-
.path()
96-
.map(|p| strip_path_prefixes(&p) == req_path)
97-
.unwrap_or(false)
57+
// Check if it is a regular file.
58+
if entry.header().entry_type() != async_tar::EntryType::Regular
9859
{
99-
// We manage to find the entry.
100-
101-
// Check if it is a regular file.
102-
if entry.header().entry_type() != tar::EntryType::Regular {
103-
break;
104-
}
105-
106-
// Inform handler to continue on serving the data.
107-
if open_res_tx.try_send(Ok(())).is_err() {
108-
// Do not transfer data if handler task is canceled.
109-
return;
110-
}
111-
112-
// Forward data to handler.
113-
let _ = io_copy(entry, pipe_w);
114-
115-
return;
60+
break;
11661
}
11762

118-
// Requested path not found within the archive.
119-
let _ = open_res_tx.send(Err(packet::Error::FileNotFound));
63+
return Ok((entry, None));
12064
}
121-
});
122-
123-
// Wait for the above task to find the requested path and
124-
// starts transferring data.
125-
open_res_rx
126-
.recv()
127-
.await
128-
.unwrap_or_else(|_| Err(packet::Error::FileNotFound))?;
65+
}
12966

130-
Ok((pipe_r, None))
67+
Err(packet::Error::FileNotFound)
13168
}
13269

13370
async fn write_req_open(
13471
&mut self,
13572
_client: &SocketAddr,
136-
_path: &Path,
73+
_path: &std::path::Path,
13774
_size: Option<u64>,
13875
) -> Result<Self::Writer, packet::Error> {
13976
Err(packet::Error::IllegalOperation)

0 commit comments

Comments
 (0)