Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions crates/extensions/tedge_flows/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::Tick;
use async_trait::async_trait;
use camino::Utf8PathBuf;
use futures::FutureExt;
use serde_json::json;
use std::cmp::min;
use std::collections::HashSet;
use std::time::Duration;
Expand All @@ -21,10 +22,13 @@ use tedge_actors::Sender;
use tedge_actors::SimpleMessageBox;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::SubscriptionDiff;
use tedge_mqtt_ext::Topic;
use tedge_mqtt_ext::TopicFilter;
use tedge_watch_ext::WatchEvent;
use tedge_watch_ext::WatchRequest;
use time::OffsetDateTime;
use tokio::io::AsyncWriteExt;
use tokio::time::sleep_until;
use tokio::time::Instant;
Expand Down Expand Up @@ -52,6 +56,7 @@ impl Actor for FlowsMapper {

async fn run(mut self) -> Result<(), RuntimeError> {
self.send_updated_subscriptions().await?;
self.notify_flows_status().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a deal breaker or anything, but I see duplicate status/flow messages whenever flows are dynamically installed. I believe it's because of the fs-notify issue where a newly created file emits both Modified and Updated events. Wondering if we can react to Modified events only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have some generic logic which debounces inotify notifications?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have some generic logic which debounces inotify notifications?

Yes we have. But there are still some surprising event sequences.

I believe it's because of the fs-notify issue where a newly created file emits both Modified and Updated events. Wondering if we can react to Modified events only.

For now, we are on the safe side - with duplicated events. I will need some time to investigate if only reacting to Modified is correct.


while let Some(message) = self.next_message().await {
match message {
Expand All @@ -73,17 +78,19 @@ impl Actor for FlowsMapper {
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
self.processor.reload_script(path).await;
} else if path.extension() == Some("toml") {
self.processor.add_flow(path).await;
self.processor.add_flow(path.clone()).await;
self.send_updated_subscriptions().await?;
self.update_flow_status(path.as_str()).await?;
}
}
InputMessage::FsWatchEvent(FsWatchEvent::FileCreated(path)) => {
let Ok(path) = Utf8PathBuf::try_from(path) else {
continue;
};
if matches!(path.extension(), Some("toml")) {
self.processor.add_flow(path).await;
self.processor.add_flow(path.clone()).await;
self.send_updated_subscriptions().await?;
self.update_flow_status(path.as_str()).await?;
}
}
InputMessage::FsWatchEvent(FsWatchEvent::FileDeleted(path)) => {
Expand All @@ -93,8 +100,9 @@ impl Actor for FlowsMapper {
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
self.processor.remove_script(path).await;
} else if path.extension() == Some("toml") {
self.processor.remove_flow(path).await;
self.processor.remove_flow(path.clone()).await;
self.send_updated_subscriptions().await?;
self.update_flow_status(path.as_str()).await?;
}
}
_ => continue,
Expand Down Expand Up @@ -163,6 +171,38 @@ impl FlowsMapper {
watch_requests
}

async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> {
let status = "enabled";
let now = OffsetDateTime::now_utc();
for flow in self.processor.flows.keys() {
let status = Self::flow_status(flow, status, &now);
self.mqtt_sender.send(status).await?;
}
Ok(())
}

async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> {
let now = OffsetDateTime::now_utc();
let status = if self.processor.flows.contains_key(flow) {
"updated"
} else {
"removed"
};
let status = Self::flow_status(flow, status, &now);
self.mqtt_sender.send(status).await?;
Ok(())
}

fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage {
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coded for now. This will be done properly in #3843

Copy link
Contributor

@albinsuresh albinsuresh Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows");
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flow");

Since the payload is the status of a single flow and not an aggregate stat across all flows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but the feature is tedge flows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. We can take a final call on this based on whether flow names would be included in the topic or not. In the current state where all flow stats are published to the same topic, this seems appropriate.

let payload = json!({
"flow": flow,
Copy link
Contributor

@albinsuresh albinsuresh Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it'd be better to include the flow name in the topic instead of the payload so that these status messages can be published as retained messages per flow. That'll help the user get an overview of the status of all the flows by subscribing to status/flow/# topic.

@reubenmiller Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're going to address the exact topic/s which the stats are published to in a follow up PR...so I assume a single topic was just chosen for this PR, and the follow-up PR will publish them on individual topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include the flow name in the topic instead of the payload so that these status messages can be published as retained messages per flow.

I considered that but decided against it for now. Indeed this raises other questions, such as "what about a flow which definition is removed while the mapper was down?". The goal of this PR was more to quickly come with a fix.

"status": status,
"time": time.unix_timestamp(),
});
MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce)
}

async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
let now = Instant::now();
let timestamp = DateTime::now();
Expand Down
53 changes: 40 additions & 13 deletions crates/extensions/tedge_flows/tests/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,23 @@ async fn interval_executes_at_configured_frequency() {
let captured_messages = CapturedMessages::default();
let mut mqtt = MockMqtt::new(captured_messages.clone());
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
let count = || {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.count()
};

tick(Duration::from_millis(500)).await;
assert_eq!(captured_messages.count(), 0, "Should not execute before 1s");
assert_eq!(count(), 0, "Should not execute before 1s");

tick(Duration::from_millis(600)).await;
assert_eq!(captured_messages.count(), 1, "Should execute once at 1s");
assert_eq!(count(), 1, "Should execute once at 1s");

tick(Duration::from_secs(1)).await;
assert_eq!(captured_messages.count(), 2, "Should execute twice at 2s");
assert_eq!(count(), 2, "Should execute twice at 2s");

tick(Duration::from_secs(1)).await;
assert_eq!(
captured_messages.count(),
3,
"Should execute three times at 3s"
);
assert_eq!(count(), 3, "Should execute three times at 3s");

actor_handle.abort();
let _ = actor_handle.await;
Expand Down Expand Up @@ -123,7 +124,11 @@ async fn multiple_scripts_execute_at_independent_frequencies() {
let captured_messages = CapturedMessages::default();
let mut mqtt = MockMqtt::new(captured_messages.clone());
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
let count = |topic: &str| captured_messages.count_topic(topic);
let count = |topic: &str| {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.count_topic(topic)
};

tick(Duration::from_millis(500)).await;
assert_eq!(count("test/fast"), 1, "Fast should execute once at 500ms");
Expand Down Expand Up @@ -177,7 +182,11 @@ async fn script_with_oninterval_but_no_config_gets_default_1s_interval() {
let captured_messages = CapturedMessages::default();
let mut mqtt = MockMqtt::new(captured_messages.clone());
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
let count = || captured_messages.count();
let count = || {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.count()
};

tick(Duration::from_millis(500)).await;
assert_eq!(count(), 0, "Shouldn't execute before default 1s interval");
Expand Down Expand Up @@ -228,7 +237,11 @@ async fn interval_executes_independently_from_message_processing() {
let captured_messages = CapturedMessages::default();
let mut mqtt = MockMqtt::new(captured_messages.clone());
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
let count = |topic: &str| captured_messages.count_topic(topic);
let count = |topic: &str| {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.count_topic(topic)
};

tick(Duration::from_millis(1000)).await;
assert_eq!(
Expand Down Expand Up @@ -303,7 +316,11 @@ async fn very_short_intervals_execute_correctly() {
let captured_messages = CapturedMessages::default();
let mut mqtt = MockMqtt::new(captured_messages.clone());
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
let count = || captured_messages.count();
let count = || {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.count()
};

tick(Duration::from_millis(100)).await;
assert_eq!(count(), 1, "Should execute once by 100ms");
Expand Down Expand Up @@ -350,7 +367,11 @@ async fn interval_executes_when_time_exceeds_interval() {
let captured_messages = CapturedMessages::default();
let mut mqtt = MockMqtt::new(captured_messages.clone());
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
let count = || captured_messages.count();
let count = || {
captured_messages
.retain(|msg| !msg.topic.as_ref().contains("status"))
.count()
};

tick(Duration::from_secs(60)).await;
assert_eq!(count(), 0, "Should not execute before 2 minutes");
Expand Down Expand Up @@ -494,4 +515,10 @@ impl CapturedMessages {
let msgs = self.messages.lock().unwrap();
msgs.iter().filter(|m| m.topic.name == topic).count()
}

pub fn retain(&self, predicate: impl Fn(&MqttMessage) -> bool) -> &Self {
let mut messages = self.messages.lock().unwrap();
messages.retain(predicate);
self
}
}
28 changes: 20 additions & 8 deletions tests/RobotFramework/tests/tedge_flows/tedge_flows.robot
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,32 @@ Using base64 to encode tedge flows output

Units are configured using topic metadata
${transformed_msg} Execute Command
... cat /etc/tedge/flows/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test
... cat /etc/tedge/data/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test
... strip=True
${expected_msg} Execute Command
... cat /etc/tedge/flows/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
... cat /etc/tedge/data/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
... strip=True
Should Be Equal
... ${transformed_msg}
... ${expected_msg}

Computing average over a time window
${transformed_msg} Execute Command
... cat /etc/tedge/flows/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js
... cat /etc/tedge/data/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js
... strip=True
${expected_msg} Execute Command
... cat /etc/tedge/flows/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
... cat /etc/tedge/data/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
... strip=True
Should Be Equal
... ${transformed_msg}
... ${expected_msg}

Each instance of a script must have its own static state
${transformed_msg} Execute Command
... cat /etc/tedge/flows/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort
... cat /etc/tedge/data/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort
... strip=True
${expected_msg} Execute Command
... cat /etc/tedge/flows/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort
... cat /etc/tedge/data/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort
... strip=True
Should Be Equal
... ${transformed_msg}
Expand Down Expand Up @@ -180,19 +180,31 @@ Custom Setup
Start Service tedge-flows

Copy Configuration Files
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/* /etc/tedge/flows/
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.js /etc/tedge/flows/
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.toml /etc/tedge/flows/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side comment unrelated to this PR: We could consider installing the relevant flow at the test level for older tests as well in an independent PR.

ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.samples /etc/tedge/data/

Install Journalctl Flow
[Arguments] ${definition_file}
${start} Get Unix Timestamp
ThinEdgeIO.Transfer To Device ${CURDIR}/journalctl-flows/${definition_file} /etc/tedge/flows/
Should Have MQTT Messages
... topic=te/device/main/service/tedge-flows/status/flows
... date_from=${start}
... message_contains=${definition_file}

Uninstall Journalctl Flow
[Arguments] ${definition_file}
Execute Command cmd=rm -f /etc/tedge/flows/${definition_file}

Install Flow
[Arguments] ${definition_file}
ThinEdgeIO.Transfer To Device ${CURDIR}/input-ext/${definition_file} /etc/tedge/flows/
${start} Get Unix Timestamp
ThinEdgeIO.Transfer To Device ${CURDIR}/input-flows/${definition_file} /etc/tedge/flows/
Should Have MQTT Messages
... topic=te/device/main/service/tedge-flows/status/flows
... date_from=${start}
... message_contains=${definition_file}

Uninstall Flow
[Arguments] ${definition_file}
Expand Down