|
| 1 | +use crate::ui::console::{self, ConsoleWindow}; |
| 2 | +use crate::ui::helpers::get_size_label; |
| 3 | +use crate::ui::types::{StatsUpdateParams, TestResults}; |
| 4 | +use std::time::Instant; |
| 5 | +use tokio::sync::mpsc; |
| 6 | + |
| 7 | +pub fn handle_stats_update( |
| 8 | + stats_rx: &mut mpsc::Receiver<(f64, u64, f64, usize, f64)>, |
| 9 | + params: &mut StatsUpdateParams<'_>, |
| 10 | + results: &TestResults, |
| 11 | + console: &ConsoleWindow, |
| 12 | +) -> bool { |
| 13 | + let mut stats_closed = false; |
| 14 | + |
| 15 | + while let Some(update) = try_recv(stats_rx) { |
| 16 | + match update { |
| 17 | + StatsUpdate::Data(data) => process_stats(data, params, results, console), |
| 18 | + StatsUpdate::Closed => { |
| 19 | + stats_closed = true; |
| 20 | + break; |
| 21 | + } |
| 22 | + StatsUpdate::Pending => break, |
| 23 | + } |
| 24 | + } |
| 25 | + |
| 26 | + if stats_closed { |
| 27 | + finalize_current_chunk(params); |
| 28 | + } |
| 29 | + |
| 30 | + stats_closed |
| 31 | +} |
| 32 | + |
| 33 | +struct StatsData { |
| 34 | + throughput: f64, |
| 35 | + reads_per_sec: u64, |
| 36 | + read_size: usize, |
| 37 | + latency_us: f64, |
| 38 | +} |
| 39 | + |
| 40 | +enum StatsUpdate { |
| 41 | + Data(StatsData), |
| 42 | + Closed, |
| 43 | + Pending, |
| 44 | +} |
| 45 | + |
| 46 | +fn try_recv(stats_rx: &mut mpsc::Receiver<(f64, u64, f64, usize, f64)>) -> Option<StatsUpdate> { |
| 47 | + match stats_rx.try_recv() { |
| 48 | + Ok((throughput, reads_per_sec, _elapsed_secs, read_size, latency_us)) => { |
| 49 | + Some(StatsUpdate::Data(StatsData { |
| 50 | + throughput, |
| 51 | + reads_per_sec, |
| 52 | + read_size, |
| 53 | + latency_us, |
| 54 | + })) |
| 55 | + } |
| 56 | + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Some(StatsUpdate::Pending), |
| 57 | + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => Some(StatsUpdate::Closed), |
| 58 | + } |
| 59 | +} |
| 60 | + |
| 61 | +fn process_stats( |
| 62 | + data: StatsData, |
| 63 | + params: &mut StatsUpdateParams<'_>, |
| 64 | + results: &TestResults, |
| 65 | + console: &ConsoleWindow, |
| 66 | +) { |
| 67 | + record_chunk_completion(params, data.read_size); |
| 68 | + |
| 69 | + *params.current_throughput = data.throughput; |
| 70 | + *params.current_reads = data.reads_per_sec; |
| 71 | + *params.current_latency = data.latency_us; |
| 72 | + |
| 73 | + if params.current_test_size.as_ref().copied() != Some(data.read_size) { |
| 74 | + *params.test_start_time = Some(Instant::now()); |
| 75 | + *params.current_test_size = Some(data.read_size); |
| 76 | + } |
| 77 | + |
| 78 | + let max_throughput = (*params.max_throughput).max(data.throughput); |
| 79 | + *params.max_throughput = max_throughput; |
| 80 | + |
| 81 | + log_stats(console, &data); |
| 82 | + update_results(results, params, &data); |
| 83 | +} |
| 84 | + |
| 85 | +fn record_chunk_completion(params: &mut StatsUpdateParams<'_>, read_size: usize) { |
| 86 | + if let Some(current_size) = (*params.current_test_size) |
| 87 | + && current_size != read_size |
| 88 | + && !params |
| 89 | + .completed_chunks |
| 90 | + .iter() |
| 91 | + .any(|(size, _)| *size == current_size) |
| 92 | + && let Some(start_time) = *params.test_start_time |
| 93 | + { |
| 94 | + let completion_time = start_time.elapsed().as_secs_f64(); |
| 95 | + params |
| 96 | + .completed_chunks |
| 97 | + .push((current_size, completion_time)); |
| 98 | + } |
| 99 | +} |
| 100 | + |
| 101 | +fn log_stats(console: &ConsoleWindow, data: &StatsData) { |
| 102 | + console::log_to_console( |
| 103 | + console, |
| 104 | + &format!( |
| 105 | + "Throughput: {:.2} MB/s, Reads/s: {}, Latency: {:.1} μs, Size: {}", |
| 106 | + data.throughput, |
| 107 | + data.reads_per_sec, |
| 108 | + data.latency_us, |
| 109 | + get_size_label(data.read_size) |
| 110 | + ), |
| 111 | + ); |
| 112 | +} |
| 113 | + |
| 114 | +fn update_results(results: &TestResults, params: &StatsUpdateParams<'_>, data: &StatsData) { |
| 115 | + if let Ok(mut results_guard) = results.lock() { |
| 116 | + let entry_index = results_guard |
| 117 | + .iter() |
| 118 | + .position(|(size, _)| *size == data.read_size); |
| 119 | + |
| 120 | + if entry_index.is_none() { |
| 121 | + results_guard.push((data.read_size, (Vec::new(), Vec::new(), Vec::new()))); |
| 122 | + } |
| 123 | + |
| 124 | + if let Some(entry) = results_guard |
| 125 | + .iter_mut() |
| 126 | + .find(|(size, _)| *size == data.read_size) |
| 127 | + { |
| 128 | + let elapsed_secs = (*params.test_start_time) |
| 129 | + .map(|t| t.elapsed().as_secs_f64()) |
| 130 | + .unwrap_or(0.0); |
| 131 | + let reads_per_sec = data.reads_per_sec as f64; |
| 132 | + |
| 133 | + entry.1.0.push((elapsed_secs, data.throughput)); |
| 134 | + entry.1.1.push((elapsed_secs, reads_per_sec)); |
| 135 | + entry.1.2.push((elapsed_secs, data.latency_us)); |
| 136 | + } |
| 137 | + } |
| 138 | +} |
| 139 | + |
| 140 | +fn finalize_current_chunk(params: &mut StatsUpdateParams<'_>) { |
| 141 | + if let Some(current_size) = (*params.current_test_size) |
| 142 | + && !params |
| 143 | + .completed_chunks |
| 144 | + .iter() |
| 145 | + .any(|(s, _)| *s == current_size) |
| 146 | + && let Some(start_time) = *params.test_start_time |
| 147 | + { |
| 148 | + let completion_time = start_time.elapsed().as_secs_f64(); |
| 149 | + params |
| 150 | + .completed_chunks |
| 151 | + .push((current_size, completion_time)); |
| 152 | + } |
| 153 | +} |
0 commit comments