Skip to content

Commit 13a2c59

Browse files
committed
Tedge flows publish enabled/updated/removed status
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 506ea39 commit 13a2c59

File tree

7 files changed

+103
-24
lines changed

7 files changed

+103
-24
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::Tick;
1010
use async_trait::async_trait;
1111
use camino::Utf8PathBuf;
1212
use futures::FutureExt;
13+
use serde_json::json;
1314
use std::cmp::min;
1415
use std::collections::HashSet;
1516
use std::time::Duration;
@@ -21,10 +22,13 @@ use tedge_actors::Sender;
2122
use tedge_actors::SimpleMessageBox;
2223
use tedge_file_system_ext::FsWatchEvent;
2324
use tedge_mqtt_ext::MqttMessage;
25+
use tedge_mqtt_ext::QoS;
2426
use tedge_mqtt_ext::SubscriptionDiff;
27+
use tedge_mqtt_ext::Topic;
2528
use tedge_mqtt_ext::TopicFilter;
2629
use tedge_watch_ext::WatchEvent;
2730
use tedge_watch_ext::WatchRequest;
31+
use time::OffsetDateTime;
2832
use tokio::io::AsyncWriteExt;
2933
use tokio::time::sleep_until;
3034
use tokio::time::Instant;
@@ -52,6 +56,7 @@ impl Actor for FlowsMapper {
5256

5357
async fn run(mut self) -> Result<(), RuntimeError> {
5458
self.send_updated_subscriptions().await?;
59+
self.notify_flows_status().await?;
5560

5661
while let Some(message) = self.next_message().await {
5762
match message {
@@ -73,17 +78,19 @@ impl Actor for FlowsMapper {
7378
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
7479
self.processor.reload_script(path).await;
7580
} else if path.extension() == Some("toml") {
76-
self.processor.add_flow(path).await;
81+
self.processor.add_flow(path.clone()).await;
7782
self.send_updated_subscriptions().await?;
83+
self.update_flow_status(path.as_str()).await?;
7884
}
7985
}
8086
InputMessage::FsWatchEvent(FsWatchEvent::FileCreated(path)) => {
8187
let Ok(path) = Utf8PathBuf::try_from(path) else {
8288
continue;
8389
};
8490
if matches!(path.extension(), Some("toml")) {
85-
self.processor.add_flow(path).await;
91+
self.processor.add_flow(path.clone()).await;
8692
self.send_updated_subscriptions().await?;
93+
self.update_flow_status(path.as_str()).await?;
8794
}
8895
}
8996
InputMessage::FsWatchEvent(FsWatchEvent::FileDeleted(path)) => {
@@ -93,8 +100,9 @@ impl Actor for FlowsMapper {
93100
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
94101
self.processor.remove_script(path).await;
95102
} else if path.extension() == Some("toml") {
96-
self.processor.remove_flow(path).await;
103+
self.processor.remove_flow(path.clone()).await;
97104
self.send_updated_subscriptions().await?;
105+
self.update_flow_status(path.as_str()).await?;
98106
}
99107
}
100108
_ => continue,
@@ -163,6 +171,38 @@ impl FlowsMapper {
163171
watch_requests
164172
}
165173

174+
async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> {
175+
let status = "enabled";
176+
let now = OffsetDateTime::now_utc();
177+
for flow in self.processor.flows.keys() {
178+
let status = Self::flow_status(flow, status, &now);
179+
self.mqtt_sender.send(status).await?;
180+
}
181+
Ok(())
182+
}
183+
184+
async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> {
185+
let now = OffsetDateTime::now_utc();
186+
let status = if self.processor.flows.contains_key(flow) {
187+
"updated"
188+
} else {
189+
"removed"
190+
};
191+
let status = Self::flow_status(flow, status, &now);
192+
self.mqtt_sender.send(status).await?;
193+
Ok(())
194+
}
195+
196+
fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage {
197+
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows");
198+
let payload = json!({
199+
"flow": flow,
200+
"status": status,
201+
"time": time.unix_timestamp(),
202+
});
203+
MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce)
204+
}
205+
166206
async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
167207
let now = Instant::now();
168208
let timestamp = DateTime::now();

crates/extensions/tedge_flows/tests/interval.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,23 @@ async fn interval_executes_at_configured_frequency() {
4949
let captured_messages = CapturedMessages::default();
5050
let mut mqtt = MockMqtt::new(captured_messages.clone());
5151
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
52+
let count = || {
53+
captured_messages
54+
.retain(|msg| !msg.topic.as_ref().contains("status"))
55+
.count()
56+
};
5257

5358
tick(Duration::from_millis(500)).await;
54-
assert_eq!(captured_messages.count(), 0, "Should not execute before 1s");
59+
assert_eq!(count(), 0, "Should not execute before 1s");
5560

5661
tick(Duration::from_millis(600)).await;
57-
assert_eq!(captured_messages.count(), 1, "Should execute once at 1s");
62+
assert_eq!(count(), 1, "Should execute once at 1s");
5863

5964
tick(Duration::from_secs(1)).await;
60-
assert_eq!(captured_messages.count(), 2, "Should execute twice at 2s");
65+
assert_eq!(count(), 2, "Should execute twice at 2s");
6166

6267
tick(Duration::from_secs(1)).await;
63-
assert_eq!(
64-
captured_messages.count(),
65-
3,
66-
"Should execute three times at 3s"
67-
);
68+
assert_eq!(count(), 3, "Should execute three times at 3s");
6869

6970
actor_handle.abort();
7071
let _ = actor_handle.await;
@@ -123,7 +124,11 @@ async fn multiple_scripts_execute_at_independent_frequencies() {
123124
let captured_messages = CapturedMessages::default();
124125
let mut mqtt = MockMqtt::new(captured_messages.clone());
125126
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
126-
let count = |topic: &str| captured_messages.count_topic(topic);
127+
let count = |topic: &str| {
128+
captured_messages
129+
.retain(|msg| !msg.topic.as_ref().contains("status"))
130+
.count_topic(topic)
131+
};
127132

128133
tick(Duration::from_millis(500)).await;
129134
assert_eq!(count("test/fast"), 1, "Fast should execute once at 500ms");
@@ -177,7 +182,11 @@ async fn script_with_oninterval_but_no_config_gets_default_1s_interval() {
177182
let captured_messages = CapturedMessages::default();
178183
let mut mqtt = MockMqtt::new(captured_messages.clone());
179184
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
180-
let count = || captured_messages.count();
185+
let count = || {
186+
captured_messages
187+
.retain(|msg| !msg.topic.as_ref().contains("status"))
188+
.count()
189+
};
181190

182191
tick(Duration::from_millis(500)).await;
183192
assert_eq!(count(), 0, "Shouldn't execute before default 1s interval");
@@ -228,7 +237,11 @@ async fn interval_executes_independently_from_message_processing() {
228237
let captured_messages = CapturedMessages::default();
229238
let mut mqtt = MockMqtt::new(captured_messages.clone());
230239
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
231-
let count = |topic: &str| captured_messages.count_topic(topic);
240+
let count = |topic: &str| {
241+
captured_messages
242+
.retain(|msg| !msg.topic.as_ref().contains("status"))
243+
.count_topic(topic)
244+
};
232245

233246
tick(Duration::from_millis(1000)).await;
234247
assert_eq!(
@@ -303,7 +316,11 @@ async fn very_short_intervals_execute_correctly() {
303316
let captured_messages = CapturedMessages::default();
304317
let mut mqtt = MockMqtt::new(captured_messages.clone());
305318
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
306-
let count = || captured_messages.count();
319+
let count = || {
320+
captured_messages
321+
.retain(|msg| !msg.topic.as_ref().contains("status"))
322+
.count()
323+
};
307324

308325
tick(Duration::from_millis(100)).await;
309326
assert_eq!(count(), 1, "Should execute once by 100ms");
@@ -350,7 +367,11 @@ async fn interval_executes_when_time_exceeds_interval() {
350367
let captured_messages = CapturedMessages::default();
351368
let mut mqtt = MockMqtt::new(captured_messages.clone());
352369
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
353-
let count = || captured_messages.count();
370+
let count = || {
371+
captured_messages
372+
.retain(|msg| !msg.topic.as_ref().contains("status"))
373+
.count()
374+
};
354375

355376
tick(Duration::from_secs(60)).await;
356377
assert_eq!(count(), 0, "Should not execute before 2 minutes");
@@ -494,4 +515,10 @@ impl CapturedMessages {
494515
let msgs = self.messages.lock().unwrap();
495516
msgs.iter().filter(|m| m.topic.name == topic).count()
496517
}
518+
519+
pub fn retain(&self, predicate: impl Fn(&MqttMessage) -> bool) -> &Self {
520+
let mut messages = self.messages.lock().unwrap();
521+
messages.retain(predicate);
522+
self
523+
}
497524
}

tests/RobotFramework/tests/tedge_flows/tedge_flows.robot

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,32 +61,32 @@ Using base64 to encode tedge flows output
6161

6262
Units are configured using topic metadata
6363
${transformed_msg} Execute Command
64-
... cat /etc/tedge/flows/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test
64+
... cat /etc/tedge/data/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test
6565
... strip=True
6666
${expected_msg} Execute Command
67-
... cat /etc/tedge/flows/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
67+
... cat /etc/tedge/data/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
6868
... strip=True
6969
Should Be Equal
7070
... ${transformed_msg}
7171
... ${expected_msg}
7272

7373
Computing average over a time window
7474
${transformed_msg} Execute Command
75-
... cat /etc/tedge/flows/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js
75+
... cat /etc/tedge/data/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js
7676
... strip=True
7777
${expected_msg} Execute Command
78-
... cat /etc/tedge/flows/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
78+
... cat /etc/tedge/data/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
7979
... strip=True
8080
Should Be Equal
8181
... ${transformed_msg}
8282
... ${expected_msg}
8383

8484
Each instance of a script must have its own static state
8585
${transformed_msg} Execute Command
86-
... cat /etc/tedge/flows/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort
86+
... cat /etc/tedge/data/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort
8787
... strip=True
8888
${expected_msg} Execute Command
89-
... cat /etc/tedge/flows/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort
89+
... cat /etc/tedge/data/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort
9090
... strip=True
9191
Should Be Equal
9292
... ${transformed_msg}
@@ -180,19 +180,31 @@ Custom Setup
180180
Start Service tedge-flows
181181

182182
Copy Configuration Files
183-
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/* /etc/tedge/flows/
183+
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.js /etc/tedge/flows/
184+
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.toml /etc/tedge/flows/
185+
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.samples /etc/tedge/data/
184186

185187
Install Journalctl Flow
186188
[Arguments] ${definition_file}
189+
${start} Get Unix Timestamp
187190
ThinEdgeIO.Transfer To Device ${CURDIR}/journalctl-flows/${definition_file} /etc/tedge/flows/
191+
Should Have MQTT Messages
192+
... topic=te/device/main/service/tedge-flows/status/flows
193+
... date_from=${start}
194+
... message_contains=${definition_file}
188195

189196
Uninstall Journalctl Flow
190197
[Arguments] ${definition_file}
191198
Execute Command cmd=rm -f /etc/tedge/flows/${definition_file}
192199

193200
Install Flow
194201
[Arguments] ${definition_file}
195-
ThinEdgeIO.Transfer To Device ${CURDIR}/input-ext/${definition_file} /etc/tedge/flows/
202+
${start} Get Unix Timestamp
203+
ThinEdgeIO.Transfer To Device ${CURDIR}/input-flows/${definition_file} /etc/tedge/flows/
204+
Should Have MQTT Messages
205+
... topic=te/device/main/service/tedge-flows/status/flows
206+
... date_from=${start}
207+
... message_contains=${definition_file}
196208

197209
Uninstall Flow
198210
[Arguments] ${definition_file}

0 commit comments

Comments
 (0)