From 13a2c59c50729bd74c860d86117523a1570cfec5 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 31 Oct 2025 15:55:08 +0100 Subject: [PATCH] Tedge flows publish enabled/updated/removed status Signed-off-by: Didier Wenzek --- crates/extensions/tedge_flows/src/actor.rs | 46 ++++++++++++++-- .../extensions/tedge_flows/tests/interval.rs | 53 ++++++++++++++----- .../append-to-file.toml | 0 .../publish-js-errors.toml | 0 .../read-file-periodically.toml | 0 .../tail-named-pipe.toml | 0 .../tests/tedge_flows/tedge_flows.robot | 28 +++++++--- 7 files changed, 103 insertions(+), 24 deletions(-) rename tests/RobotFramework/tests/tedge_flows/{flows/input-ext => input-flows}/append-to-file.toml (100%) rename tests/RobotFramework/tests/tedge_flows/{flows/input-ext => input-flows}/publish-js-errors.toml (100%) rename tests/RobotFramework/tests/tedge_flows/{flows/input-ext => input-flows}/read-file-periodically.toml (100%) rename tests/RobotFramework/tests/tedge_flows/{flows/input-ext => input-flows}/tail-named-pipe.toml (100%) diff --git a/crates/extensions/tedge_flows/src/actor.rs b/crates/extensions/tedge_flows/src/actor.rs index 9f2a9d3d7ea..ef9dbec7082 100644 --- a/crates/extensions/tedge_flows/src/actor.rs +++ b/crates/extensions/tedge_flows/src/actor.rs @@ -10,6 +10,7 @@ use crate::Tick; use async_trait::async_trait; use camino::Utf8PathBuf; use futures::FutureExt; +use serde_json::json; use std::cmp::min; use std::collections::HashSet; use std::time::Duration; @@ -21,10 +22,13 @@ use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::SubscriptionDiff; +use tedge_mqtt_ext::Topic; use tedge_mqtt_ext::TopicFilter; use tedge_watch_ext::WatchEvent; use tedge_watch_ext::WatchRequest; +use time::OffsetDateTime; use tokio::io::AsyncWriteExt; use tokio::time::sleep_until; use tokio::time::Instant; @@ -52,6 +56,7 @@ impl Actor for FlowsMapper { async fn run(mut self) -> Result<(), RuntimeError> { self.send_updated_subscriptions().await?; + self.notify_flows_status().await?; while let Some(message) = self.next_message().await { match message { @@ -73,8 +78,9 @@ impl Actor for FlowsMapper { if matches!(path.extension(), Some("js" | "ts" | "mjs")) { self.processor.reload_script(path).await; } else if path.extension() == Some("toml") { - self.processor.add_flow(path).await; + self.processor.add_flow(path.clone()).await; self.send_updated_subscriptions().await?; + self.update_flow_status(path.as_str()).await?; } } InputMessage::FsWatchEvent(FsWatchEvent::FileCreated(path)) => { @@ -82,8 +88,9 @@ impl Actor for FlowsMapper { continue; }; if matches!(path.extension(), Some("toml")) { - self.processor.add_flow(path).await; + self.processor.add_flow(path.clone()).await; self.send_updated_subscriptions().await?; + self.update_flow_status(path.as_str()).await?; } } InputMessage::FsWatchEvent(FsWatchEvent::FileDeleted(path)) => { @@ -93,8 +100,9 @@ impl Actor for FlowsMapper { if matches!(path.extension(), Some("js" | "ts" | "mjs")) { self.processor.remove_script(path).await; } else if path.extension() == Some("toml") { - self.processor.remove_flow(path).await; + self.processor.remove_flow(path.clone()).await; self.send_updated_subscriptions().await?; + self.update_flow_status(path.as_str()).await?; } } _ => continue, @@ -163,6 +171,38 @@ impl FlowsMapper { watch_requests } + async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> { + let status = "enabled"; + let now = OffsetDateTime::now_utc(); + for flow in self.processor.flows.keys() { + let status = Self::flow_status(flow, status, &now); + self.mqtt_sender.send(status).await?; + } + Ok(()) + } + + async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> { + let now = OffsetDateTime::now_utc(); + let status = if self.processor.flows.contains_key(flow) { + "updated" + } else { + "removed" + }; + let status = Self::flow_status(flow, status, &now); + self.mqtt_sender.send(status).await?; + Ok(()) + } + + fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage { + let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows"); + let payload = json!({ + "flow": flow, + "status": status, + "time": time.unix_timestamp(), + }); + MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce) + } + async fn on_source_poll(&mut self) -> Result<(), RuntimeError> { let now = Instant::now(); let timestamp = DateTime::now(); diff --git a/crates/extensions/tedge_flows/tests/interval.rs b/crates/extensions/tedge_flows/tests/interval.rs index c572b1b6505..b6f426747cc 100644 --- a/crates/extensions/tedge_flows/tests/interval.rs +++ b/crates/extensions/tedge_flows/tests/interval.rs @@ -49,22 +49,23 @@ async fn interval_executes_at_configured_frequency() { let captured_messages = CapturedMessages::default(); let mut mqtt = MockMqtt::new(captured_messages.clone()); let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + let count = || { + captured_messages + .retain(|msg| !msg.topic.as_ref().contains("status")) + .count() + }; tick(Duration::from_millis(500)).await; - assert_eq!(captured_messages.count(), 0, "Should not execute before 1s"); + assert_eq!(count(), 0, "Should not execute before 1s"); tick(Duration::from_millis(600)).await; - assert_eq!(captured_messages.count(), 1, "Should execute once at 1s"); + assert_eq!(count(), 1, "Should execute once at 1s"); tick(Duration::from_secs(1)).await; - assert_eq!(captured_messages.count(), 2, "Should execute twice at 2s"); + assert_eq!(count(), 2, "Should execute twice at 2s"); tick(Duration::from_secs(1)).await; - assert_eq!( - captured_messages.count(), - 3, - "Should execute three times at 3s" - ); + assert_eq!(count(), 3, "Should execute three times at 3s"); actor_handle.abort(); let _ = actor_handle.await; @@ -123,7 +124,11 @@ async fn multiple_scripts_execute_at_independent_frequencies() { let captured_messages = CapturedMessages::default(); let mut mqtt = MockMqtt::new(captured_messages.clone()); let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; - let count = |topic: &str| captured_messages.count_topic(topic); + let count = |topic: &str| { + captured_messages + .retain(|msg| !msg.topic.as_ref().contains("status")) + .count_topic(topic) + }; tick(Duration::from_millis(500)).await; assert_eq!(count("test/fast"), 1, "Fast should execute once at 500ms"); @@ -177,7 +182,11 @@ async fn script_with_oninterval_but_no_config_gets_default_1s_interval() { let captured_messages = CapturedMessages::default(); let mut mqtt = MockMqtt::new(captured_messages.clone()); let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; - let count = || captured_messages.count(); + let count = || { + captured_messages + .retain(|msg| !msg.topic.as_ref().contains("status")) + .count() + }; tick(Duration::from_millis(500)).await; assert_eq!(count(), 0, "Shouldn't execute before default 1s interval"); @@ -228,7 +237,11 @@ async fn interval_executes_independently_from_message_processing() { let captured_messages = CapturedMessages::default(); let mut mqtt = MockMqtt::new(captured_messages.clone()); let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; - let count = |topic: &str| captured_messages.count_topic(topic); + let count = |topic: &str| { + captured_messages + .retain(|msg| !msg.topic.as_ref().contains("status")) + .count_topic(topic) + }; tick(Duration::from_millis(1000)).await; assert_eq!( @@ -303,7 +316,11 @@ async fn very_short_intervals_execute_correctly() { let captured_messages = CapturedMessages::default(); let mut mqtt = MockMqtt::new(captured_messages.clone()); let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; - let count = || captured_messages.count(); + let count = || { + captured_messages + .retain(|msg| !msg.topic.as_ref().contains("status")) + .count() + }; tick(Duration::from_millis(100)).await; assert_eq!(count(), 1, "Should execute once by 100ms"); @@ -350,7 +367,11 @@ async fn interval_executes_when_time_exceeds_interval() { let captured_messages = CapturedMessages::default(); let mut mqtt = MockMqtt::new(captured_messages.clone()); let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; - let count = || captured_messages.count(); + let count = || { + captured_messages + .retain(|msg| !msg.topic.as_ref().contains("status")) + .count() + }; tick(Duration::from_secs(60)).await; assert_eq!(count(), 0, "Should not execute before 2 minutes"); @@ -494,4 +515,10 @@ impl CapturedMessages { let msgs = self.messages.lock().unwrap(); msgs.iter().filter(|m| m.topic.name == topic).count() } + + pub fn retain(&self, predicate: impl Fn(&MqttMessage) -> bool) -> &Self { + let mut messages = self.messages.lock().unwrap(); + messages.retain(predicate); + self + } } diff --git a/tests/RobotFramework/tests/tedge_flows/flows/input-ext/append-to-file.toml b/tests/RobotFramework/tests/tedge_flows/input-flows/append-to-file.toml similarity index 100% rename from tests/RobotFramework/tests/tedge_flows/flows/input-ext/append-to-file.toml rename to tests/RobotFramework/tests/tedge_flows/input-flows/append-to-file.toml diff --git a/tests/RobotFramework/tests/tedge_flows/flows/input-ext/publish-js-errors.toml b/tests/RobotFramework/tests/tedge_flows/input-flows/publish-js-errors.toml similarity index 100% rename from tests/RobotFramework/tests/tedge_flows/flows/input-ext/publish-js-errors.toml rename to tests/RobotFramework/tests/tedge_flows/input-flows/publish-js-errors.toml diff --git a/tests/RobotFramework/tests/tedge_flows/flows/input-ext/read-file-periodically.toml b/tests/RobotFramework/tests/tedge_flows/input-flows/read-file-periodically.toml similarity index 100% rename from tests/RobotFramework/tests/tedge_flows/flows/input-ext/read-file-periodically.toml rename to tests/RobotFramework/tests/tedge_flows/input-flows/read-file-periodically.toml diff --git a/tests/RobotFramework/tests/tedge_flows/flows/input-ext/tail-named-pipe.toml b/tests/RobotFramework/tests/tedge_flows/input-flows/tail-named-pipe.toml similarity index 100% rename from tests/RobotFramework/tests/tedge_flows/flows/input-ext/tail-named-pipe.toml rename to tests/RobotFramework/tests/tedge_flows/input-flows/tail-named-pipe.toml diff --git a/tests/RobotFramework/tests/tedge_flows/tedge_flows.robot b/tests/RobotFramework/tests/tedge_flows/tedge_flows.robot index 270d830d6b4..1557f4304c9 100644 --- a/tests/RobotFramework/tests/tedge_flows/tedge_flows.robot +++ b/tests/RobotFramework/tests/tedge_flows/tedge_flows.robot @@ -61,10 +61,10 @@ Using base64 to encode tedge flows output Units are configured using topic metadata ${transformed_msg} Execute Command - ... cat /etc/tedge/flows/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test + ... cat /etc/tedge/data/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test ... strip=True ${expected_msg} Execute Command - ... cat /etc/tedge/flows/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' + ... cat /etc/tedge/data/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' ... strip=True Should Be Equal ... ${transformed_msg} @@ -72,10 +72,10 @@ Units are configured using topic metadata Computing average over a time window ${transformed_msg} Execute Command - ... cat /etc/tedge/flows/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js + ... cat /etc/tedge/data/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js ... strip=True ${expected_msg} Execute Command - ... cat /etc/tedge/flows/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' + ... cat /etc/tedge/data/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' ... strip=True Should Be Equal ... ${transformed_msg} @@ -83,10 +83,10 @@ Computing average over a time window Each instance of a script must have its own static state ${transformed_msg} Execute Command - ... cat /etc/tedge/flows/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort + ... cat /etc/tedge/data/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort ... strip=True ${expected_msg} Execute Command - ... cat /etc/tedge/flows/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort + ... cat /etc/tedge/data/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort ... strip=True Should Be Equal ... ${transformed_msg} @@ -180,11 +180,18 @@ Custom Setup Start Service tedge-flows Copy Configuration Files - ThinEdgeIO.Transfer To Device ${CURDIR}/flows/* /etc/tedge/flows/ + ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.js /etc/tedge/flows/ + ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.toml /etc/tedge/flows/ + ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.samples /etc/tedge/data/ Install Journalctl Flow [Arguments] ${definition_file} + ${start} Get Unix Timestamp ThinEdgeIO.Transfer To Device ${CURDIR}/journalctl-flows/${definition_file} /etc/tedge/flows/ + Should Have MQTT Messages + ... topic=te/device/main/service/tedge-flows/status/flows + ... date_from=${start} + ... message_contains=${definition_file} Uninstall Journalctl Flow [Arguments] ${definition_file} @@ -192,7 +199,12 @@ Uninstall Journalctl Flow Install Flow [Arguments] ${definition_file} - ThinEdgeIO.Transfer To Device ${CURDIR}/input-ext/${definition_file} /etc/tedge/flows/ + ${start} Get Unix Timestamp + ThinEdgeIO.Transfer To Device ${CURDIR}/input-flows/${definition_file} /etc/tedge/flows/ + Should Have MQTT Messages + ... topic=te/device/main/service/tedge-flows/status/flows + ... date_from=${start} + ... message_contains=${definition_file} Uninstall Flow [Arguments] ${definition_file}