Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions crates/core/tedge_mapper/src/flows/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::core::mapper::start_basic_actors;
use crate::TEdgeComponent;
use mqtt_channel::Topic;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_config::TEdgeConfig;
use tedge_file_system_ext::FsWatchActorBuilder;
use tedge_flows::FlowsMapperBuilder;
use tedge_flows::FlowsMapperConfig;
use tedge_watch_ext::WatchActorBuilder;

pub struct GenMapper;
Expand All @@ -14,11 +17,18 @@ impl TEdgeComponent for GenMapper {
tedge_config: TEdgeConfig,
config_dir: &tedge_config::Path,
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors("tedge-flows", &tedge_config).await?;
let service_name = "tedge-flows";
let te = &tedge_config.mqtt.topic_root;
let service_id = EntityTopicId::default_main_service(service_name).unwrap();
let service_config = FlowsMapperConfig {
statistics_topic: Topic::new(&format!("{te}/{service_id}/statistics"))?,
status_topic: Topic::new(&format!("{te}/{service_id}/status"))?,
};
Comment on lines +23 to +26
Copy link
Contributor Author

@didier-wenzek didier-wenzek Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now time to discuss the appropriate topics for flows status and statistics #3846 (comment)

I think that it would make sense to publish the statistics as measurements under {te}/{service_id}/m/statistics. But then these stats will be pushed by default to the cloud.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah publishing under the standard thin-edge.io mqtt topic structure would be more consistent, but we might need to use a different telemetry endpoint to publish this information to avoid sending too much information to the cloud by default.

We could create a convention where some local/internal information which is meant for local/on-device consumption (rather than intended for the cloud):

{te}/{service_id}/local/statistics

Alternatively, we could use flows as the type since the flows statistics maybe published on serveral services in the future, e.g. tedge-mapper-c8y, tedge-mapper-az, tedge-flows etc.

{te}/{service_id}/flows/statistics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new proposal for the topic would be:

{te}/{service_id}/status/metrics

let flows_dir = config_dir.join("flows");

let (mut runtime, mut mqtt_actor) = start_basic_actors(service_name, &tedge_config).await?;
let mut fs_actor = FsWatchActorBuilder::new();
let mut flows_mapper = FlowsMapperBuilder::try_new(config_dir.join("flows")).await?;
let mut flows_mapper = FlowsMapperBuilder::try_new(service_config, flows_dir).await?;
flows_mapper.connect(&mut mqtt_actor);
flows_mapper.connect_fs(&mut fs_actor);

Expand Down
23 changes: 16 additions & 7 deletions crates/extensions/tedge_flows/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::flow::Message;
use crate::flow::SourceTag;
use crate::registry::FlowRegistryExt;
use crate::runtime::MessageProcessor;
use crate::stats::MqttStatsPublisher;
use crate::FlowsMapperConfig;
use crate::InputMessage;
use crate::Tick;
use async_trait::async_trait;
Expand All @@ -26,7 +28,6 @@ use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::SubscriptionDiff;
use tedge_mqtt_ext::Topic;
use tedge_mqtt_ext::TopicFilter;
use tedge_watch_ext::WatchEvent;
use tedge_watch_ext::WatchRequest;
Expand All @@ -41,13 +42,15 @@ use tracing::warn;
pub const STATS_DUMP_INTERVAL: Duration = Duration::from_secs(300);

pub struct FlowsMapper {
pub(super) config: FlowsMapperConfig,
pub(super) messages: SimpleMessageBox<InputMessage, SubscriptionDiff>,
pub(super) mqtt_sender: DynSender<MqttMessage>,
pub(super) watch_request_sender: DynSender<WatchRequest>,
pub(super) subscriptions: TopicFilter,
pub(super) watched_commands: HashSet<String>,
pub(super) processor: MessageProcessor<ConnectedFlowRegistry>,
pub(super) next_dump: Instant,
pub(super) stats_publisher: MqttStatsPublisher,
}

#[async_trait]
Expand Down Expand Up @@ -177,7 +180,7 @@ impl FlowsMapper {
let status = "enabled";
let now = OffsetDateTime::now_utc();
for flow in self.processor.registry.flows() {
let status = Self::flow_status(flow.name(), status, &now);
let status = self.flow_status(flow.name(), status, &now);
self.mqtt_sender.send(status).await?;
}
Ok(())
Expand All @@ -190,19 +193,19 @@ impl FlowsMapper {
} else {
"removed"
};
let status = Self::flow_status(flow, status, &now);
let status = self.flow_status(flow, status, &now);
self.mqtt_sender.send(status).await?;
Ok(())
}

fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage {
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows");
fn flow_status(&self, flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage {
let topic = &self.config.status_topic;
let payload = json!({
"flow": flow,
"status": status,
"time": time.unix_timestamp(),
});
MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce)
MqttMessage::new(topic, payload.to_string()).with_qos(QoS::AtLeastOnce)
}

async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
Expand Down Expand Up @@ -258,7 +261,13 @@ impl FlowsMapper {
let timestamp = SystemTime::now();
if self.next_dump <= now {
self.processor.dump_memory_stats().await;
self.processor.dump_processing_stats().await;
for record in self
.processor
.dump_processing_stats(&self.stats_publisher)
.await
{
self.mqtt_sender.send(record).await?;
}
self.next_dump = now + STATS_DUMP_INTERVAL;
}
for messages in self.processor.on_interval(timestamp, now).await {
Expand Down
30 changes: 28 additions & 2 deletions crates/extensions/tedge_flows/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::flow::*;
pub use crate::registry::BaseFlowRegistry;
pub use crate::registry::FlowRegistryExt;
pub use crate::runtime::MessageProcessor;
use crate::stats::MqttStatsPublisher;
use camino::Utf8Path;
use std::collections::HashSet;
use std::convert::Infallible;
Expand All @@ -38,27 +39,46 @@ use tedge_mqtt_ext::DynSubscriptions;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::MqttRequest;
use tedge_mqtt_ext::SubscriptionDiff;
use tedge_mqtt_ext::Topic;
use tedge_mqtt_ext::TopicFilter;
use tedge_watch_ext::WatchEvent;
use tedge_watch_ext::WatchRequest;
use tokio::time::Instant;
use tracing::error;

pub struct FlowsMapperConfig {
pub statistics_topic: Topic,
pub status_topic: Topic,
}

impl Default for FlowsMapperConfig {
fn default() -> Self {
FlowsMapperConfig {
statistics_topic: Topic::new("te/device/main/service/tedge-flows/statistics").unwrap(),
status_topic: Topic::new("te/device/main/service/tedge-flows/status").unwrap(),
}
}
}

fan_in_message_type!(InputMessage[MqttMessage, WatchEvent, FsWatchEvent, Tick]: Clone, Debug, Eq, PartialEq);

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Tick;

pub struct FlowsMapperBuilder {
config: FlowsMapperConfig,
message_box: SimpleMessageBoxBuilder<InputMessage, SubscriptionDiff>,
mqtt_sender: DynSender<MqttMessage>,
watch_request_sender: DynSender<WatchRequest>,
processor: MessageProcessor<ConnectedFlowRegistry>,
}

impl FlowsMapperBuilder {
pub async fn try_new(config_dir: impl AsRef<Utf8Path>) -> Result<Self, LoadError> {
let registry = ConnectedFlowRegistry::new(config_dir);
pub async fn try_new(
config: FlowsMapperConfig,
flows_dir: impl AsRef<Utf8Path>,
) -> Result<Self, LoadError> {
let registry = ConnectedFlowRegistry::new(flows_dir);
let mut processor = MessageProcessor::try_new(registry).await?;
let message_box = SimpleMessageBoxBuilder::new("TedgeFlows", 16);
let mqtt_sender = NullSender.into();
Expand All @@ -67,6 +87,7 @@ impl FlowsMapperBuilder {
processor.load_all_flows().await;

Ok(FlowsMapperBuilder {
config,
message_box,
mqtt_sender,
watch_request_sender,
Expand Down Expand Up @@ -131,14 +152,19 @@ impl Builder<FlowsMapper> for FlowsMapperBuilder {
fn build(self) -> FlowsMapper {
let subscriptions = self.topics().clone();
let watched_commands = HashSet::new();
let stats_publisher = MqttStatsPublisher {
topic_prefix: self.config.statistics_topic.to_string(),
};
FlowsMapper {
config: self.config,
messages: self.message_box.build(),
mqtt_sender: self.mqtt_sender,
watch_request_sender: self.watch_request_sender,
subscriptions,
watched_commands,
processor: self.processor,
next_dump: Instant::now() + STATS_DUMP_INTERVAL,
stats_publisher,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/extensions/tedge_flows/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::js_runtime::JsRuntime;
use crate::registry::BaseFlowRegistry;
use crate::registry::FlowRegistryExt;
use crate::stats::Counter;
use crate::stats::StatsPublisher;
use crate::LoadError;
use camino::Utf8Path;
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -135,8 +136,8 @@ impl<Registry: FlowRegistryExt + Send> MessageProcessor<Registry> {
out_messages
}

pub async fn dump_processing_stats(&self) {
self.stats.dump_processing_stats();
pub async fn dump_processing_stats<P: StatsPublisher>(&self, publisher: &P) -> Vec<P::Record> {
self.stats.dump_processing_stats(publisher)
}

pub async fn dump_memory_stats(&self) {
Expand Down
98 changes: 85 additions & 13 deletions crates/extensions/tedge_flows/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use serde_json::Value;
use std::collections::HashMap;
use std::fmt::Display;
use std::time::Duration;
use std::time::Instant;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;

#[derive(Default)]
pub struct Counter {
Expand Down Expand Up @@ -113,11 +116,12 @@ impl Counter {
self.from_start.entry(dim).or_default().add(sample);
}

pub fn dump_processing_stats(&self) {
pub fn dump_processing_stats<P: StatsPublisher>(&self, publisher: &P) -> Vec<P::Record> {
tracing::info!(target: "flows", "Processing statistics:");
for (dim, stats) in &self.from_start {
stats.dump_statistics(dim)
}
self.from_start
.iter()
.filter_map(|(dim, stats)| stats.dump_statistics(dim, publisher))
.collect()
}
}

Expand All @@ -140,15 +144,27 @@ impl Stats {
}
}

pub fn dump_statistics(&self, dim: &Dimension) {
tracing::info!(target: "flows", " - {dim}");
tracing::info!(target: "flows", " - input count: {}", self.messages_in);
tracing::info!(target: "flows", " - output count: {}", self.messages_out);
tracing::info!(target: "flows", " - error count: {}", self.error_raised);
if let Some(duration_stats) = &self.processing_time {
tracing::info!(target: "flows", " - min processing time: {:?}", duration_stats.min);
tracing::info!(target: "flows", " - max processing time: {:?}", duration_stats.max);
}
pub fn dump_statistics<P: StatsPublisher>(
&self,
dim: &Dimension,
publisher: &P,
) -> Option<P::Record> {
let stats = match self.processing_time.as_ref() {
None => serde_json::json!({
"input": self.messages_in,
"output": self.messages_out,
"error": self.error_raised,
}),
Some(duration_stats) => serde_json::json!({
"input": self.messages_in,
"output": self.messages_out,
"error": self.error_raised,
"cpu-min": format!("{:?}", duration_stats.min),
"cpu-max": format!("{:?}", duration_stats.max),
}),
};

publisher.publish(dim, stats)
}
}

Expand Down Expand Up @@ -192,3 +208,59 @@ impl Dimension {
}
}
}

pub trait StatsPublisher {
type Record;

fn publish(&self, dim: &Dimension, stats: serde_json::Value) -> Option<Self::Record>;
}

pub struct TracingStatsPublisher;

impl StatsPublisher for TracingStatsPublisher {
type Record = ();

fn publish(&self, dim: &Dimension, stats: Value) -> Option<()> {
tracing::info!(target: "flows", " - {dim}");
if let Some(stats) = stats.as_object() {
for (k, v) in stats {
tracing::info!(target: "flows", " - {k}: {v}");
}
}
None
}
}

pub struct MqttStatsPublisher {
pub topic_prefix: String,
}

impl StatsPublisher for MqttStatsPublisher {
type Record = MqttMessage;

fn publish(&self, dim: &Dimension, stats: Value) -> Option<Self::Record> {
match dim {
Dimension::Flow(path) | Dimension::OnMessage(path) => {
self.topic_for(path).map(|topic| {
let payload = stats.to_string();
MqttMessage::new(&topic, payload)
})
}

Dimension::Runtime => self.topic_for("runtime").map(|topic| {
let payload = stats.to_string();
MqttMessage::new(&topic, payload)
}),

_ => None,
}
}
}

impl MqttStatsPublisher {
pub fn topic_for(&self, path: &str) -> Option<Topic> {
let name = path.split('/').last().unwrap();
let topic = format!("{}/{}", self.topic_prefix, name);
Topic::new(&topic).ok()
}
}
11 changes: 8 additions & 3 deletions crates/extensions/tedge_flows/tests/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_flows::FlowsMapperBuilder;
use tedge_flows::FlowsMapperConfig;
use tedge_mqtt_ext::DynSubscriptions;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::MqttRequest;
Expand Down Expand Up @@ -370,6 +371,7 @@ async fn interval_executes_when_time_exceeds_interval() {
let count = || {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.retain(|msg| !msg.topic.as_ref().contains("statistics"))
.count()
};

Expand Down Expand Up @@ -410,9 +412,12 @@ async fn tick(duration: Duration) {
type ActorHandle = tokio::task::JoinHandle<Result<(), tedge_actors::RuntimeError>>;

async fn spawn_flows_actor(config_dir: &TempDir, mqtt: &mut MockMqtt) -> ActorHandle {
let mut flows_builder = FlowsMapperBuilder::try_new(config_dir.path().to_str().unwrap())
.await
.expect("Failed to create FlowsMapper");
let mut flows_builder = FlowsMapperBuilder::try_new(
FlowsMapperConfig::default(),
config_dir.path().to_str().unwrap(),
)
.await
.expect("Failed to create FlowsMapper");

flows_builder.connect(mqtt);
let flows_actor = flows_builder.build();
Expand Down
Loading
Loading