Skip to content

Commit d53c456

Browse files
committed
Tedge flows publish enabled/updated/removed status
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 506ea39 commit d53c456

File tree

6 files changed

+63
-11
lines changed

6 files changed

+63
-11
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::Tick;
1010
use async_trait::async_trait;
1111
use camino::Utf8PathBuf;
1212
use futures::FutureExt;
13+
use serde_json::json;
1314
use std::cmp::min;
1415
use std::collections::HashSet;
1516
use std::time::Duration;
@@ -21,10 +22,13 @@ use tedge_actors::Sender;
2122
use tedge_actors::SimpleMessageBox;
2223
use tedge_file_system_ext::FsWatchEvent;
2324
use tedge_mqtt_ext::MqttMessage;
25+
use tedge_mqtt_ext::QoS;
2426
use tedge_mqtt_ext::SubscriptionDiff;
27+
use tedge_mqtt_ext::Topic;
2528
use tedge_mqtt_ext::TopicFilter;
2629
use tedge_watch_ext::WatchEvent;
2730
use tedge_watch_ext::WatchRequest;
31+
use time::OffsetDateTime;
2832
use tokio::io::AsyncWriteExt;
2933
use tokio::time::sleep_until;
3034
use tokio::time::Instant;
@@ -52,6 +56,7 @@ impl Actor for FlowsMapper {
5256

5357
async fn run(mut self) -> Result<(), RuntimeError> {
5458
self.send_updated_subscriptions().await?;
59+
self.notify_flows_status().await?;
5560

5661
while let Some(message) = self.next_message().await {
5762
match message {
@@ -73,17 +78,19 @@ impl Actor for FlowsMapper {
7378
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
7479
self.processor.reload_script(path).await;
7580
} else if path.extension() == Some("toml") {
76-
self.processor.add_flow(path).await;
81+
self.processor.add_flow(path.clone()).await;
7782
self.send_updated_subscriptions().await?;
83+
self.update_flow_status(path.as_str()).await?;
7884
}
7985
}
8086
InputMessage::FsWatchEvent(FsWatchEvent::FileCreated(path)) => {
8187
let Ok(path) = Utf8PathBuf::try_from(path) else {
8288
continue;
8389
};
8490
if matches!(path.extension(), Some("toml")) {
85-
self.processor.add_flow(path).await;
91+
self.processor.add_flow(path.clone()).await;
8692
self.send_updated_subscriptions().await?;
93+
self.update_flow_status(path.as_str()).await?;
8794
}
8895
}
8996
InputMessage::FsWatchEvent(FsWatchEvent::FileDeleted(path)) => {
@@ -93,8 +100,9 @@ impl Actor for FlowsMapper {
93100
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
94101
self.processor.remove_script(path).await;
95102
} else if path.extension() == Some("toml") {
96-
self.processor.remove_flow(path).await;
103+
self.processor.remove_flow(path.clone()).await;
97104
self.send_updated_subscriptions().await?;
105+
self.update_flow_status(path.as_str()).await?;
98106
}
99107
}
100108
_ => continue,
@@ -163,6 +171,38 @@ impl FlowsMapper {
163171
watch_requests
164172
}
165173

174+
async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> {
175+
let status = "enabled";
176+
let now = OffsetDateTime::now_utc();
177+
for flow in self.processor.flows.keys() {
178+
let status = Self::flow_status(flow, status, &now);
179+
self.mqtt_sender.send(status).await?;
180+
}
181+
Ok(())
182+
}
183+
184+
async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> {
185+
let now = OffsetDateTime::now_utc();
186+
let status = if self.processor.flows.contains_key(flow) {
187+
"updated"
188+
} else {
189+
"removed"
190+
};
191+
let status = Self::flow_status(flow, status, &now);
192+
self.mqtt_sender.send(status).await?;
193+
Ok(())
194+
}
195+
196+
fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage {
197+
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows");
198+
let payload = json!({
199+
"flow": flow,
200+
"status": status,
201+
"time": time.unix_timestamp(),
202+
});
203+
MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce)
204+
}
205+
166206
async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
167207
let now = Instant::now();
168208
let timestamp = DateTime::now();

tests/RobotFramework/tests/tedge_flows/tedge_flows.robot

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,32 +61,32 @@ Using base64 to encode tedge flows output
6161

6262
Units are configured using topic metadata
6363
${transformed_msg} Execute Command
64-
... cat /etc/tedge/flows/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test
64+
... cat /etc/tedge/data/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test
6565
... strip=True
6666
${expected_msg} Execute Command
67-
... cat /etc/tedge/flows/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
67+
... cat /etc/tedge/data/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
6868
... strip=True
6969
Should Be Equal
7070
... ${transformed_msg}
7171
... ${expected_msg}
7272

7373
Computing average over a time window
7474
${transformed_msg} Execute Command
75-
... cat /etc/tedge/flows/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js
75+
... cat /etc/tedge/data/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js
7676
... strip=True
7777
${expected_msg} Execute Command
78-
... cat /etc/tedge/flows/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
78+
... cat /etc/tedge/data/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '
7979
... strip=True
8080
Should Be Equal
8181
... ${transformed_msg}
8282
... ${expected_msg}
8383

8484
Each instance of a script must have its own static state
8585
${transformed_msg} Execute Command
86-
... cat /etc/tedge/flows/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort
86+
... cat /etc/tedge/data/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort
8787
... strip=True
8888
${expected_msg} Execute Command
89-
... cat /etc/tedge/flows/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort
89+
... cat /etc/tedge/data/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort
9090
... strip=True
9191
Should Be Equal
9292
... ${transformed_msg}
@@ -180,19 +180,31 @@ Custom Setup
180180
Start Service tedge-flows
181181

182182
Copy Configuration Files
183-
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/* /etc/tedge/flows/
183+
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.js /etc/tedge/flows/
184+
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.toml /etc/tedge/flows/
185+
ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.samples /etc/tedge/data/
184186

185187
Install Journalctl Flow
186188
[Arguments] ${definition_file}
189+
${start} Get Unix Timestamp
187190
ThinEdgeIO.Transfer To Device ${CURDIR}/journalctl-flows/${definition_file} /etc/tedge/flows/
191+
Should Have MQTT Messages
192+
... topic=te/device/main/service/tedge-flows/status/flows
193+
... date_from=${start}
194+
... message_contains=${definition_file}
188195

189196
Uninstall Journalctl Flow
190197
[Arguments] ${definition_file}
191198
Execute Command cmd=rm -f /etc/tedge/flows/${definition_file}
192199

193200
Install Flow
194201
[Arguments] ${definition_file}
195-
ThinEdgeIO.Transfer To Device ${CURDIR}/input-ext/${definition_file} /etc/tedge/flows/
202+
${start} Get Unix Timestamp
203+
ThinEdgeIO.Transfer To Device ${CURDIR}/input-flows/${definition_file} /etc/tedge/flows/
204+
Should Have MQTT Messages
205+
... topic=te/device/main/service/tedge-flows/status/flows
206+
... date_from=${start}
207+
... message_contains=${definition_file}
196208

197209
Uninstall Flow
198210
[Arguments] ${definition_file}

0 commit comments

Comments
 (0)