From 6103e0773242aa267d42dad160e6ea8f6ab78e9c Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 30 Oct 2025 21:31:46 +0100 Subject: [PATCH 1/2] Tedge flows statistics are published over MQTT Signed-off-by: Didier Wenzek --- crates/extensions/tedge_flows/src/actor.rs | 10 +- crates/extensions/tedge_flows/src/lib.rs | 5 + crates/extensions/tedge_flows/src/runtime.rs | 5 +- crates/extensions/tedge_flows/src/stats.rs | 98 ++++++++++++++++--- .../extensions/tedge_flows/tests/interval.rs | 1 + 5 files changed, 103 insertions(+), 16 deletions(-) diff --git a/crates/extensions/tedge_flows/src/actor.rs b/crates/extensions/tedge_flows/src/actor.rs index acd4b5e4ee6..a5b0ad66862 100644 --- a/crates/extensions/tedge_flows/src/actor.rs +++ b/crates/extensions/tedge_flows/src/actor.rs @@ -6,6 +6,7 @@ use crate::flow::Message; use crate::flow::SourceTag; use crate::registry::FlowRegistryExt; use crate::runtime::MessageProcessor; +use crate::stats::MqttStatsPublisher; use crate::InputMessage; use crate::Tick; use async_trait::async_trait; @@ -48,6 +49,7 @@ pub struct FlowsMapper { pub(super) watched_commands: HashSet, pub(super) processor: MessageProcessor, pub(super) next_dump: Instant, + pub(super) stats_publisher: MqttStatsPublisher, } #[async_trait] @@ -258,7 +260,13 @@ impl FlowsMapper { let timestamp = SystemTime::now(); if self.next_dump <= now { self.processor.dump_memory_stats().await; - self.processor.dump_processing_stats().await; + for record in self + .processor + .dump_processing_stats(&self.stats_publisher) + .await + { + self.mqtt_sender.send(record).await?; + } self.next_dump = now + STATS_DUMP_INTERVAL; } for messages in self.processor.on_interval(timestamp, now).await { diff --git a/crates/extensions/tedge_flows/src/lib.rs b/crates/extensions/tedge_flows/src/lib.rs index 2853ea72f54..a3f60bf34c7 100644 --- a/crates/extensions/tedge_flows/src/lib.rs +++ b/crates/extensions/tedge_flows/src/lib.rs @@ -18,6 +18,7 @@ pub use crate::flow::*; pub use crate::registry::BaseFlowRegistry; pub use crate::registry::FlowRegistryExt; pub use crate::runtime::MessageProcessor; +use crate::stats::MqttStatsPublisher; use camino::Utf8Path; use std::collections::HashSet; use std::convert::Infallible; @@ -131,6 +132,9 @@ impl Builder for FlowsMapperBuilder { fn build(self) -> FlowsMapper { let subscriptions = self.topics().clone(); let watched_commands = HashSet::new(); + let stats_publisher = MqttStatsPublisher { + topic_prefix: "te/device/main/service/tedge-flows/stats".to_string(), + }; FlowsMapper { messages: self.message_box.build(), mqtt_sender: self.mqtt_sender, @@ -139,6 +143,7 @@ impl Builder for FlowsMapperBuilder { watched_commands, processor: self.processor, next_dump: Instant::now() + STATS_DUMP_INTERVAL, + stats_publisher, } } } diff --git a/crates/extensions/tedge_flows/src/runtime.rs b/crates/extensions/tedge_flows/src/runtime.rs index e4c8c3816f0..f6309369c4c 100644 --- a/crates/extensions/tedge_flows/src/runtime.rs +++ b/crates/extensions/tedge_flows/src/runtime.rs @@ -5,6 +5,7 @@ use crate::js_runtime::JsRuntime; use crate::registry::BaseFlowRegistry; use crate::registry::FlowRegistryExt; use crate::stats::Counter; +use crate::stats::StatsPublisher; use crate::LoadError; use camino::Utf8Path; use camino::Utf8PathBuf; @@ -135,8 +136,8 @@ impl MessageProcessor { out_messages } - pub async fn dump_processing_stats(&self) { - self.stats.dump_processing_stats(); + pub async fn dump_processing_stats(&self, publisher: &P) -> Vec { + self.stats.dump_processing_stats(publisher) } pub async fn dump_memory_stats(&self) { diff --git a/crates/extensions/tedge_flows/src/stats.rs b/crates/extensions/tedge_flows/src/stats.rs index b73027965e7..49533e9db2a 100644 --- a/crates/extensions/tedge_flows/src/stats.rs +++ b/crates/extensions/tedge_flows/src/stats.rs @@ -1,7 +1,10 @@ +use serde_json::Value; use std::collections::HashMap; use std::fmt::Display; use std::time::Duration; use std::time::Instant; +use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::Topic; #[derive(Default)] pub struct Counter { @@ -113,11 +116,12 @@ impl Counter { self.from_start.entry(dim).or_default().add(sample); } - pub fn dump_processing_stats(&self) { + pub fn dump_processing_stats(&self, publisher: &P) -> Vec { tracing::info!(target: "flows", "Processing statistics:"); - for (dim, stats) in &self.from_start { - stats.dump_statistics(dim) - } + self.from_start + .iter() + .filter_map(|(dim, stats)| stats.dump_statistics(dim, publisher)) + .collect() } } @@ -140,15 +144,27 @@ impl Stats { } } - pub fn dump_statistics(&self, dim: &Dimension) { - tracing::info!(target: "flows", " - {dim}"); - tracing::info!(target: "flows", " - input count: {}", self.messages_in); - tracing::info!(target: "flows", " - output count: {}", self.messages_out); - tracing::info!(target: "flows", " - error count: {}", self.error_raised); - if let Some(duration_stats) = &self.processing_time { - tracing::info!(target: "flows", " - min processing time: {:?}", duration_stats.min); - tracing::info!(target: "flows", " - max processing time: {:?}", duration_stats.max); - } + pub fn dump_statistics( + &self, + dim: &Dimension, + publisher: &P, + ) -> Option { + let stats = match self.processing_time.as_ref() { + None => serde_json::json!({ + "input": self.messages_in, + "output": self.messages_out, + "error": self.error_raised, + }), + Some(duration_stats) => serde_json::json!({ + "input": self.messages_in, + "output": self.messages_out, + "error": self.error_raised, + "cpu-min": format!("{:?}", duration_stats.min), + "cpu-max": format!("{:?}", duration_stats.max), + }), + }; + + publisher.publish(dim, stats) } } @@ -192,3 +208,59 @@ impl Dimension { } } } + +pub trait StatsPublisher { + type Record; + + fn publish(&self, dim: &Dimension, stats: serde_json::Value) -> Option; +} + +pub struct TracingStatsPublisher; + +impl StatsPublisher for TracingStatsPublisher { + type Record = (); + + fn publish(&self, dim: &Dimension, stats: Value) -> Option<()> { + tracing::info!(target: "flows", " - {dim}"); + if let Some(stats) = stats.as_object() { + for (k, v) in stats { + tracing::info!(target: "flows", " - {k}: {v}"); + } + } + None + } +} + +pub struct MqttStatsPublisher { + pub topic_prefix: String, +} + +impl StatsPublisher for MqttStatsPublisher { + type Record = MqttMessage; + + fn publish(&self, dim: &Dimension, stats: Value) -> Option { + match dim { + Dimension::Flow(path) | Dimension::OnMessage(path) => { + self.topic_for(path).map(|topic| { + let payload = stats.to_string(); + MqttMessage::new(&topic, payload) + }) + } + + Dimension::Runtime => self.topic_for("runtime").map(|topic| { + let payload = stats.to_string(); + MqttMessage::new(&topic, payload) + }), + + _ => None, + } + } +} + +impl MqttStatsPublisher { + pub fn topic_for(&self, path: &str) -> Option { + let name = path.split('/').last().unwrap(); + let topic = format!("{}/{}", self.topic_prefix, name); + Topic::new(&topic).ok() + } +} diff --git a/crates/extensions/tedge_flows/tests/interval.rs b/crates/extensions/tedge_flows/tests/interval.rs index b6f426747cc..73553e28d90 100644 --- a/crates/extensions/tedge_flows/tests/interval.rs +++ b/crates/extensions/tedge_flows/tests/interval.rs @@ -370,6 +370,7 @@ async fn interval_executes_when_time_exceeds_interval() { let count = || { captured_messages .retain(|msg| !msg.topic.as_ref().contains("status")) + .retain(|msg| !msg.topic.as_ref().contains("stats")) .count() }; From 65eab4643fcd75eecf84b43e899fa0cda168b377 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 21 Nov 2025 17:01:01 +0100 Subject: [PATCH 2/2] Fix hard-coded MQTT topics for flows status and statistics Signed-off-by: Didier Wenzek --- crates/core/tedge_mapper/src/flows/mod.rs | 16 ++++++++--- crates/extensions/tedge_flows/src/actor.rs | 13 ++++----- crates/extensions/tedge_flows/src/lib.rs | 27 ++++++++++++++++--- .../extensions/tedge_flows/tests/interval.rs | 12 ++++++--- .../tedge_flows/tests/stats_dump.rs | 24 ++++++++--------- 5 files changed, 64 insertions(+), 28 deletions(-) diff --git a/crates/core/tedge_mapper/src/flows/mod.rs b/crates/core/tedge_mapper/src/flows/mod.rs index 6f3e41a152d..6214180bcd9 100644 --- a/crates/core/tedge_mapper/src/flows/mod.rs +++ b/crates/core/tedge_mapper/src/flows/mod.rs @@ -1,8 +1,11 @@ use crate::core::mapper::start_basic_actors; use crate::TEdgeComponent; +use mqtt_channel::Topic; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_config::TEdgeConfig; use tedge_file_system_ext::FsWatchActorBuilder; use tedge_flows::FlowsMapperBuilder; +use tedge_flows::FlowsMapperConfig; use tedge_watch_ext::WatchActorBuilder; pub struct GenMapper; @@ -14,11 +17,18 @@ impl TEdgeComponent for GenMapper { tedge_config: TEdgeConfig, config_dir: &tedge_config::Path, ) -> Result<(), anyhow::Error> { - let (mut runtime, mut mqtt_actor) = - start_basic_actors("tedge-flows", &tedge_config).await?; + let service_name = "tedge-flows"; + let te = &tedge_config.mqtt.topic_root; + let service_id = EntityTopicId::default_main_service(service_name).unwrap(); + let service_config = FlowsMapperConfig { + statistics_topic: Topic::new(&format!("{te}/{service_id}/statistics"))?, + status_topic: Topic::new(&format!("{te}/{service_id}/status"))?, + }; + let flows_dir = config_dir.join("flows"); + let (mut runtime, mut mqtt_actor) = start_basic_actors(service_name, &tedge_config).await?; let mut fs_actor = FsWatchActorBuilder::new(); - let mut flows_mapper = FlowsMapperBuilder::try_new(config_dir.join("flows")).await?; + let mut flows_mapper = FlowsMapperBuilder::try_new(service_config, flows_dir).await?; flows_mapper.connect(&mut mqtt_actor); flows_mapper.connect_fs(&mut fs_actor); diff --git a/crates/extensions/tedge_flows/src/actor.rs b/crates/extensions/tedge_flows/src/actor.rs index a5b0ad66862..975d78b8bb1 100644 --- a/crates/extensions/tedge_flows/src/actor.rs +++ b/crates/extensions/tedge_flows/src/actor.rs @@ -7,6 +7,7 @@ use crate::flow::SourceTag; use crate::registry::FlowRegistryExt; use crate::runtime::MessageProcessor; use crate::stats::MqttStatsPublisher; +use crate::FlowsMapperConfig; use crate::InputMessage; use crate::Tick; use async_trait::async_trait; @@ -27,7 +28,6 @@ use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::SubscriptionDiff; -use tedge_mqtt_ext::Topic; use tedge_mqtt_ext::TopicFilter; use tedge_watch_ext::WatchEvent; use tedge_watch_ext::WatchRequest; @@ -42,6 +42,7 @@ use tracing::warn; pub const STATS_DUMP_INTERVAL: Duration = Duration::from_secs(300); pub struct FlowsMapper { + pub(super) config: FlowsMapperConfig, pub(super) messages: SimpleMessageBox, pub(super) mqtt_sender: DynSender, pub(super) watch_request_sender: DynSender, @@ -179,7 +180,7 @@ impl FlowsMapper { let status = "enabled"; let now = OffsetDateTime::now_utc(); for flow in self.processor.registry.flows() { - let status = Self::flow_status(flow.name(), status, &now); + let status = self.flow_status(flow.name(), status, &now); self.mqtt_sender.send(status).await?; } Ok(()) @@ -192,19 +193,19 @@ impl FlowsMapper { } else { "removed" }; - let status = Self::flow_status(flow, status, &now); + let status = self.flow_status(flow, status, &now); self.mqtt_sender.send(status).await?; Ok(()) } - fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage { - let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows"); + fn flow_status(&self, flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage { + let topic = &self.config.status_topic; let payload = json!({ "flow": flow, "status": status, "time": time.unix_timestamp(), }); - MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce) + MqttMessage::new(topic, payload.to_string()).with_qos(QoS::AtLeastOnce) } async fn on_source_poll(&mut self) -> Result<(), RuntimeError> { diff --git a/crates/extensions/tedge_flows/src/lib.rs b/crates/extensions/tedge_flows/src/lib.rs index a3f60bf34c7..bbcd95bb1b4 100644 --- a/crates/extensions/tedge_flows/src/lib.rs +++ b/crates/extensions/tedge_flows/src/lib.rs @@ -39,18 +39,34 @@ use tedge_mqtt_ext::DynSubscriptions; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::MqttRequest; use tedge_mqtt_ext::SubscriptionDiff; +use tedge_mqtt_ext::Topic; use tedge_mqtt_ext::TopicFilter; use tedge_watch_ext::WatchEvent; use tedge_watch_ext::WatchRequest; use tokio::time::Instant; use tracing::error; +pub struct FlowsMapperConfig { + pub statistics_topic: Topic, + pub status_topic: Topic, +} + +impl Default for FlowsMapperConfig { + fn default() -> Self { + FlowsMapperConfig { + statistics_topic: Topic::new("te/device/main/service/tedge-flows/statistics").unwrap(), + status_topic: Topic::new("te/device/main/service/tedge-flows/status").unwrap(), + } + } +} + fan_in_message_type!(InputMessage[MqttMessage, WatchEvent, FsWatchEvent, Tick]: Clone, Debug, Eq, PartialEq); #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct Tick; pub struct FlowsMapperBuilder { + config: FlowsMapperConfig, message_box: SimpleMessageBoxBuilder, mqtt_sender: DynSender, watch_request_sender: DynSender, @@ -58,8 +74,11 @@ pub struct FlowsMapperBuilder { } impl FlowsMapperBuilder { - pub async fn try_new(config_dir: impl AsRef) -> Result { - let registry = ConnectedFlowRegistry::new(config_dir); + pub async fn try_new( + config: FlowsMapperConfig, + flows_dir: impl AsRef, + ) -> Result { + let registry = ConnectedFlowRegistry::new(flows_dir); let mut processor = MessageProcessor::try_new(registry).await?; let message_box = SimpleMessageBoxBuilder::new("TedgeFlows", 16); let mqtt_sender = NullSender.into(); @@ -68,6 +87,7 @@ impl FlowsMapperBuilder { processor.load_all_flows().await; Ok(FlowsMapperBuilder { + config, message_box, mqtt_sender, watch_request_sender, @@ -133,9 +153,10 @@ impl Builder for FlowsMapperBuilder { let subscriptions = self.topics().clone(); let watched_commands = HashSet::new(); let stats_publisher = MqttStatsPublisher { - topic_prefix: "te/device/main/service/tedge-flows/stats".to_string(), + topic_prefix: self.config.statistics_topic.to_string(), }; FlowsMapper { + config: self.config, messages: self.message_box.build(), mqtt_sender: self.mqtt_sender, watch_request_sender: self.watch_request_sender, diff --git a/crates/extensions/tedge_flows/tests/interval.rs b/crates/extensions/tedge_flows/tests/interval.rs index 73553e28d90..bcc7beb1815 100644 --- a/crates/extensions/tedge_flows/tests/interval.rs +++ b/crates/extensions/tedge_flows/tests/interval.rs @@ -10,6 +10,7 @@ use tedge_actors::MessageSource; use tedge_actors::NoConfig; use tedge_actors::SimpleMessageBoxBuilder; use tedge_flows::FlowsMapperBuilder; +use tedge_flows::FlowsMapperConfig; use tedge_mqtt_ext::DynSubscriptions; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::MqttRequest; @@ -370,7 +371,7 @@ async fn interval_executes_when_time_exceeds_interval() { let count = || { captured_messages .retain(|msg| !msg.topic.as_ref().contains("status")) - .retain(|msg| !msg.topic.as_ref().contains("stats")) + .retain(|msg| !msg.topic.as_ref().contains("statistics")) .count() }; @@ -411,9 +412,12 @@ async fn tick(duration: Duration) { type ActorHandle = tokio::task::JoinHandle>; async fn spawn_flows_actor(config_dir: &TempDir, mqtt: &mut MockMqtt) -> ActorHandle { - let mut flows_builder = FlowsMapperBuilder::try_new(config_dir.path().to_str().unwrap()) - .await - .expect("Failed to create FlowsMapper"); + let mut flows_builder = FlowsMapperBuilder::try_new( + FlowsMapperConfig::default(), + config_dir.path().to_str().unwrap(), + ) + .await + .expect("Failed to create FlowsMapper"); flows_builder.connect(mqtt); let flows_actor = flows_builder.build(); diff --git a/crates/extensions/tedge_flows/tests/stats_dump.rs b/crates/extensions/tedge_flows/tests/stats_dump.rs index 8207ec0cf72..4e5f97d5edf 100644 --- a/crates/extensions/tedge_flows/tests/stats_dump.rs +++ b/crates/extensions/tedge_flows/tests/stats_dump.rs @@ -1,5 +1,6 @@ use camino::Utf8Path; use std::convert::Infallible; +use std::path::Path; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::time::Duration; @@ -14,6 +15,7 @@ use tedge_actors::NoConfig; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; use tedge_flows::FlowsMapperBuilder; +use tedge_flows::FlowsMapperConfig; use tedge_mqtt_ext::DynSubscriptions; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::MqttRequest; @@ -90,10 +92,7 @@ async fn stats_are_dumped_when_no_interval_handlers_registered() { "#; std::fs::write(config_dir.join("mqtt_only_flow.toml"), config).expect("Failed to write config"); - let mut flows_builder = FlowsMapperBuilder::try_new(Utf8Path::from_path(config_dir).unwrap()) - .await - .expect("Failed to create FlowsMapperBuilder"); - + let mut flows_builder = flows_builder(config_dir).await; let mut mock_mqtt = MockMqttBuilder::new(); flows_builder.connect(&mut mock_mqtt); @@ -155,10 +154,7 @@ async fn stats_dumped_when_interval_handlers_present() { "#; std::fs::write(config_dir.join("interval_flow.toml"), config).expect("Failed to write config"); - let mut flows_builder = FlowsMapperBuilder::try_new(Utf8Path::from_path(config_dir).unwrap()) - .await - .expect("Failed to create FlowsMapperBuilder"); - + let mut flows_builder = flows_builder(config_dir).await; let mut mock_mqtt = MockMqttBuilder::new(); flows_builder.connect(&mut mock_mqtt); @@ -215,10 +211,7 @@ async fn stats_not_dumped_before_300_seconds() { "#; std::fs::write(config_dir.join("mqtt_only_flow.toml"), config).expect("Failed to write config"); - let mut flows_builder = FlowsMapperBuilder::try_new(Utf8Path::from_path(config_dir).unwrap()) - .await - .expect("Failed to create FlowsMapperBuilder"); - + let mut flows_builder = flows_builder(config_dir).await; let mut mock_mqtt = MockMqttBuilder::new(); flows_builder.connect(&mut mock_mqtt); @@ -248,6 +241,13 @@ async fn stats_not_dumped_before_300_seconds() { ); } +async fn flows_builder(config_dir: &Path) -> FlowsMapperBuilder { + let config = FlowsMapperConfig::default(); + FlowsMapperBuilder::try_new(config, Utf8Path::from_path(config_dir).unwrap()) + .await + .expect("Failed to create FlowsMapperBuilder") +} + type MockMqttActor = SimpleMessageBox; struct MockMqttBuilder {