Skip to content

Commit 56db1ce

Browse files
authored
Merge pull request #29 from ThinkParQ/rb/handler-overhaul
Refactor: Overhaul and restructure message handlers for testing in isolation
2 parents a894f3d + cc90056 commit 56db1ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+3816
-3042
lines changed

mgmtd/src/app.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
//! Interfaces and implementations for in-app interaction between tasks or threads.
2+
3+
mod runtime;
4+
#[cfg(test)]
5+
pub(crate) mod test;
6+
7+
use crate::StaticInfo;
8+
use crate::license::LicensedFeature;
9+
use anyhow::Result;
10+
use protobuf::license::GetCertDataResult;
11+
pub(crate) use runtime::RuntimeApp;
12+
use rusqlite::{Connection, Transaction};
13+
use shared::bee_msg::Msg;
14+
use shared::bee_serde::{Deserializable, Serializable};
15+
use shared::types::{NodeId, NodeType, Uid};
16+
use std::fmt::Debug;
17+
use std::future::Future;
18+
use std::net::SocketAddr;
19+
use std::path::Path;
20+
use std::sync::Arc;
21+
22+
pub(crate) trait App: Debug + Clone + Send + 'static {
23+
/// Return a borrow to the applications static, immutable config and derived info
24+
fn static_info(&self) -> &StaticInfo;
25+
26+
// Database access
27+
28+
/// DB Read transaction
29+
fn read_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
30+
&self,
31+
op: T,
32+
) -> impl Future<Output = Result<R>> + Send;
33+
34+
/// DB write transaction
35+
fn write_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
36+
&self,
37+
op: T,
38+
) -> impl Future<Output = Result<R>> + Send;
39+
40+
/// DB write transaction without fsync
41+
fn write_tx_no_sync<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
42+
&self,
43+
op: T,
44+
) -> impl Future<Output = Result<R>> + Send;
45+
46+
/// Provides access to a DB connection handle, no transaction
47+
fn db_conn<T: Send + 'static + FnOnce(&mut Connection) -> Result<R>, R: Send + 'static>(
48+
&self,
49+
op: T,
50+
) -> impl Future<Output = Result<R>> + Send;
51+
52+
// BeeMsg communication
53+
54+
/// Send a [Msg] to a node via TCP and receive the response
55+
fn request<M: Msg + Serializable, R: Msg + Deserializable>(
56+
&self,
57+
node_uid: Uid,
58+
msg: &M,
59+
) -> impl Future<Output = Result<R>> + Send;
60+
61+
/// Send a [Msg] to all nodes of a type via UDP
62+
fn send_notifications<M: Msg + Serializable>(
63+
&self,
64+
node_types: &'static [NodeType],
65+
msg: &M,
66+
) -> impl Future<Output = ()> + Send;
67+
68+
/// Replace all stored BeeMsg network addresses of a node in the store
69+
fn replace_node_addrs(&self, node_uid: Uid, new_addrs: impl Into<Arc<[SocketAddr]>>);
70+
71+
// Run state
72+
73+
/// Check if management is in pre shutdown state
74+
fn is_pre_shutdown(&self) -> bool;
75+
/// Notify the runtime control that a particular client pulled states of a particular node type
76+
fn notify_client_pulled_state(&self, node_type: NodeType, node_id: NodeId);
77+
78+
// Licensing control
79+
80+
/// Load and verify a license certificate
81+
fn load_and_verify_license_cert(
82+
&self,
83+
cert_path: &Path,
84+
) -> impl Future<Output = Result<String>> + Send;
85+
86+
/// Get license certificate data
87+
fn get_license_cert_data(&self) -> Result<GetCertDataResult>;
88+
89+
/// Get licensed number of machines
90+
fn get_licensed_machines(&self) -> Result<u32>;
91+
92+
/// Verify a feature is licensed
93+
fn verify_licensed_feature(&self, feature: LicensedFeature) -> Result<()>;
94+
}

mgmtd/src/app/runtime.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use super::*;
2+
use crate::ClientPulledStateNotification;
3+
use crate::bee_msg::dispatch_request;
4+
use crate::license::LicenseVerifier;
5+
use anyhow::Result;
6+
use protobuf::license::GetCertDataResult;
7+
use rusqlite::{Connection, Transaction};
8+
use shared::conn::msg_dispatch::{DispatchRequest, Request};
9+
use shared::conn::outgoing::Pool;
10+
use shared::run_state::WeakRunStateHandle;
11+
use sqlite::Connections;
12+
use std::fmt::Debug;
13+
use std::ops::Deref;
14+
use tokio::sync::mpsc;
15+
16+
/// A collection of Handles used for interacting and accessing the different components of the app.
17+
///
18+
/// This is the actual runtime object that can be shared between tasks. Interfaces should, however,
19+
/// accept any implementation of the AppContext trait instead.
20+
#[derive(Clone, Debug)]
21+
pub(crate) struct RuntimeApp(Arc<InnerAppHandles>);
22+
23+
/// Stores the actual handles.
24+
#[derive(Debug)]
25+
pub(crate) struct InnerAppHandles {
26+
pub conn: Pool,
27+
pub db: Connections,
28+
pub license: LicenseVerifier,
29+
pub info: &'static StaticInfo,
30+
pub run_state: WeakRunStateHandle,
31+
shutdown_client_id: mpsc::Sender<ClientPulledStateNotification>,
32+
}
33+
34+
impl RuntimeApp {
35+
/// Creates a new AppHandles object.
36+
///
37+
/// Takes all the stored handles.
38+
pub(crate) fn new(
39+
conn: Pool,
40+
db: Connections,
41+
license: LicenseVerifier,
42+
info: &'static StaticInfo,
43+
run_state: WeakRunStateHandle,
44+
shutdown_client_id: mpsc::Sender<ClientPulledStateNotification>,
45+
) -> Self {
46+
Self(Arc::new(InnerAppHandles {
47+
conn,
48+
db,
49+
license,
50+
info,
51+
run_state,
52+
shutdown_client_id,
53+
}))
54+
}
55+
}
56+
57+
/// Derefs to InnerAppHandle which stores all the handles.
58+
///
59+
/// Allows transparent access.
60+
impl Deref for RuntimeApp {
61+
type Target = InnerAppHandles;
62+
63+
fn deref(&self) -> &Self::Target {
64+
&self.0
65+
}
66+
}
67+
68+
/// Adds BeeMsg dispatching functionality to AppHandles
69+
impl DispatchRequest for RuntimeApp {
70+
async fn dispatch_request(&self, req: impl Request) -> Result<()> {
71+
dispatch_request(self, req).await
72+
}
73+
}
74+
75+
impl App for RuntimeApp {
76+
fn static_info(&self) -> &StaticInfo {
77+
self.info
78+
}
79+
80+
async fn read_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
81+
&self,
82+
op: T,
83+
) -> Result<R> {
84+
Connections::read_tx(&self.db, op).await
85+
}
86+
87+
async fn write_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
88+
&self,
89+
op: T,
90+
) -> Result<R> {
91+
Connections::write_tx(&self.db, op).await
92+
}
93+
94+
async fn write_tx_no_sync<
95+
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
96+
R: Send + 'static,
97+
>(
98+
&self,
99+
op: T,
100+
) -> Result<R> {
101+
Connections::write_tx_no_sync(&self.db, op).await
102+
}
103+
104+
async fn db_conn<
105+
T: Send + 'static + FnOnce(&mut Connection) -> Result<R>,
106+
R: Send + 'static,
107+
>(
108+
&self,
109+
op: T,
110+
) -> Result<R> {
111+
Connections::conn(&self.db, op).await
112+
}
113+
114+
async fn request<M: Msg + Serializable, R: Msg + Deserializable>(
115+
&self,
116+
node_uid: Uid,
117+
msg: &M,
118+
) -> Result<R> {
119+
Pool::request(&self.conn, node_uid, msg).await
120+
}
121+
122+
async fn send_notifications<M: Msg + Serializable>(
123+
&self,
124+
node_types: &'static [NodeType],
125+
msg: &M,
126+
) {
127+
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");
128+
129+
for t in node_types {
130+
if let Err(err) = async {
131+
let nodes = self
132+
.read_tx(move |tx| crate::db::node::get_with_type(tx, *t))
133+
.await?;
134+
135+
self.conn
136+
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
137+
.await?;
138+
139+
Ok(()) as Result<_>
140+
}
141+
.await
142+
{
143+
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
144+
}
145+
}
146+
}
147+
148+
fn replace_node_addrs(&self, node_uid: Uid, new_addrs: impl Into<Arc<[SocketAddr]>>) {
149+
Pool::replace_node_addrs(&self.conn, node_uid, new_addrs)
150+
}
151+
152+
fn is_pre_shutdown(&self) -> bool {
153+
WeakRunStateHandle::pre_shutdown(&self.run_state)
154+
}
155+
156+
fn notify_client_pulled_state(&self, node_type: NodeType, node_id: NodeId) {
157+
if self.run_state.pre_shutdown() {
158+
let tx = self.shutdown_client_id.clone();
159+
160+
// We don't want to block the task calling this and are not interested by the results
161+
tokio::spawn(async move {
162+
let _ = tx.send((node_type, node_id)).await;
163+
});
164+
}
165+
}
166+
167+
async fn load_and_verify_license_cert(&self, cert_path: &Path) -> Result<String> {
168+
LicenseVerifier::load_and_verify_license_cert(&self.license, cert_path).await
169+
}
170+
171+
fn get_license_cert_data(&self) -> Result<GetCertDataResult> {
172+
LicenseVerifier::get_license_cert_data(&self.license)
173+
}
174+
175+
fn get_licensed_machines(&self) -> Result<u32> {
176+
LicenseVerifier::get_licensed_machines(&self.license)
177+
}
178+
179+
fn verify_licensed_feature(&self, feature: LicensedFeature) -> Result<()> {
180+
self.license.verify_licensed_feature(feature)
181+
}
182+
}

0 commit comments

Comments
 (0)