Skip to content

Commit 9df9eee

Browse files
committed
Tedge flows publish enabled/updated/removed status
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent a38d074 commit 9df9eee

File tree

2 files changed

+61
-4
lines changed

2 files changed

+61
-4
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::Tick;
1010
use async_trait::async_trait;
1111
use camino::Utf8PathBuf;
1212
use futures::FutureExt;
13+
use serde_json::json;
1314
use std::cmp::min;
1415
use std::collections::HashSet;
1516
use std::time::Duration;
@@ -21,10 +22,13 @@ use tedge_actors::Sender;
2122
use tedge_actors::SimpleMessageBox;
2223
use tedge_file_system_ext::FsWatchEvent;
2324
use tedge_mqtt_ext::MqttMessage;
25+
use tedge_mqtt_ext::QoS;
2426
use tedge_mqtt_ext::SubscriptionDiff;
27+
use tedge_mqtt_ext::Topic;
2528
use tedge_mqtt_ext::TopicFilter;
2629
use tedge_watch_ext::WatchEvent;
2730
use tedge_watch_ext::WatchRequest;
31+
use time::OffsetDateTime;
2832
use tokio::io::AsyncWriteExt;
2933
use tokio::time::sleep_until;
3034
use tokio::time::Instant;
@@ -52,6 +56,7 @@ impl Actor for FlowsMapper {
5256

5357
async fn run(mut self) -> Result<(), RuntimeError> {
5458
self.send_updated_subscriptions().await?;
59+
self.notify_flows_status().await?;
5560

5661
while let Some(message) = self.next_message().await {
5762
match message {
@@ -73,17 +78,19 @@ impl Actor for FlowsMapper {
7378
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
7479
self.processor.reload_script(path).await;
7580
} else if path.extension() == Some("toml") {
76-
self.processor.add_flow(path).await;
81+
self.processor.add_flow(path.clone()).await;
7782
self.send_updated_subscriptions().await?;
83+
self.update_flow_status(path.as_str()).await?;
7884
}
7985
}
8086
InputMessage::FsWatchEvent(FsWatchEvent::FileCreated(path)) => {
8187
let Ok(path) = Utf8PathBuf::try_from(path) else {
8288
continue;
8389
};
8490
if matches!(path.extension(), Some("toml")) {
85-
self.processor.add_flow(path).await;
91+
self.processor.add_flow(path.clone()).await;
8692
self.send_updated_subscriptions().await?;
93+
self.update_flow_status(path.as_str()).await?;
8794
}
8895
}
8996
InputMessage::FsWatchEvent(FsWatchEvent::FileDeleted(path)) => {
@@ -93,8 +100,9 @@ impl Actor for FlowsMapper {
93100
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
94101
self.processor.remove_script(path).await;
95102
} else if path.extension() == Some("toml") {
96-
self.processor.remove_flow(path).await;
103+
self.processor.remove_flow(path.clone()).await;
97104
self.send_updated_subscriptions().await?;
105+
self.update_flow_status(path.as_str()).await?;
98106
}
99107
}
100108
_ => continue,
@@ -163,6 +171,38 @@ impl FlowsMapper {
163171
watch_requests
164172
}
165173

174+
async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> {
175+
let status = "enabled";
176+
let now = OffsetDateTime::now_utc();
177+
for flow in self.processor.flows.keys() {
178+
let status = Self::flow_status(flow, status, &now);
179+
self.mqtt_sender.send(status).await?;
180+
}
181+
Ok(())
182+
}
183+
184+
async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> {
185+
let now = OffsetDateTime::now_utc();
186+
let status = if self.processor.flows.contains_key(flow) {
187+
"updated"
188+
} else {
189+
"removed"
190+
};
191+
let status = Self::flow_status(flow, status, &now);
192+
self.mqtt_sender.send(status).await?;
193+
Ok(())
194+
}
195+
196+
fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage {
197+
let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows");
198+
let payload = json!({
199+
"flow": flow,
200+
"status": status,
201+
"time": time.unix_timestamp(),
202+
});
203+
MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce)
204+
}
205+
166206
async fn on_source_poll(&mut self) -> Result<(), RuntimeError> {
167207
let now = Instant::now();
168208
let timestamp = DateTime::now();

tests/RobotFramework/tests/tedge_flows/tedge_flows.robot

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ Consuming messages from the tail of file
120120
[Teardown] Uninstall Flow tail-named-pipe.toml
121121

122122
Consuming messages from a file, periodically
123+
${start} Get Unix Timestamp
123124
Install Flow read-file-periodically.toml
125+
Should Have MQTT Messages
126+
... topic=te/device/main/service/tedge-flows/status/flows
127+
... minimum=1
128+
... message_contains=read-file-periodically
129+
... date_from=${start}
124130
Execute Command echo hello >/tmp/file.input
125131
Execute Command tedge mqtt sub test/file/input --duration 1s | grep hello
126132
Execute Command echo world >/tmp/file.input
@@ -133,16 +139,27 @@ Consuming messages from a file, periodically
133139
[Teardown] Uninstall Flow read-file-periodically.toml
134140

135141
Appending messages to a file
142+
${start} Get Unix Timestamp
136143
Install Flow append-to-file.toml
144+
Should Have MQTT Messages
145+
... topic=te/device/main/service/tedge-flows/status/flows
146+
... minimum=1
147+
... message_contains=append-to-file
148+
... date_from=${start}
137149
Execute Command for i in $(seq 3); do tedge mqtt pub seq/events "$i"; done
138150
Execute Command grep '\\[seq/events\\] 1' /tmp/events.log
139151
Execute Command grep '\\[seq/events\\] 2' /tmp/events.log
140152
Execute Command grep '\\[seq/events\\] 3' /tmp/events.log
141153
[Teardown] Uninstall Flow append-to-file.toml
142154

143155
Publishing transformation errors
144-
Install Flow publish-js-errors.toml
145156
${start} Get Unix Timestamp
157+
Install Flow publish-js-errors.toml
158+
Should Have MQTT Messages
159+
... topic=te/device/main/service/tedge-flows/status/flows
160+
... minimum=1
161+
... message_contains=publish-js-errors
162+
... date_from=${start}
146163
Execute Command tedge mqtt pub collectd/foo 12345:6789
147164
Should Have MQTT Messages
148165
... topic=test/errors

0 commit comments

Comments
 (0)