Skip to content

Commit 2952453

Browse files
Merge pull request #3831 from didier-wenzek/fix/missing-reload-events
fix: missing reload events
2 parents 7689e03 + bcbb61e commit 2952453

File tree

7 files changed

+31
-20
lines changed

7 files changed

+31
-20
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl Actor for FlowsMapper {
7373
if matches!(path.extension(), Some("js" | "ts" | "mjs")) {
7474
self.processor.reload_script(path).await;
7575
} else if path.extension() == Some("toml") {
76-
self.processor.reload_flow(path).await;
76+
self.processor.add_flow(path).await;
7777
self.send_updated_subscriptions().await?;
7878
}
7979
}

crates/extensions/tedge_flows/src/runtime.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,18 @@ impl MessageProcessor {
9595
}
9696

9797
fn deadlines(&self) -> impl Iterator<Item = tokio::time::Instant> + '_ {
98-
self.flows
98+
let script_deadlines = self
99+
.flows
99100
.values()
100101
.flat_map(|flow| &flow.steps)
101-
.filter_map(|step| step.script.next_execution)
102+
.filter_map(|step| step.script.next_execution);
103+
104+
let source_deadlines = self
105+
.flows
106+
.values()
107+
.filter_map(|flow| flow.input.next_deadline());
108+
109+
script_deadlines.chain(source_deadlines)
102110
}
103111

104112
/// Get the next deadline for interval execution across all scripts
@@ -241,22 +249,15 @@ impl MessageProcessor {
241249

242250
pub async fn add_flow(&mut self, path: Utf8PathBuf) {
243251
let flow_id = Self::flow_id(&path);
244-
if !self.flows.contains_key(&flow_id) && self.load_flow(flow_id, path.clone()).await {
245-
info!(target: "flows", "Loaded new flow {path}");
246-
}
247-
}
248-
249-
pub async fn reload_flow(&mut self, path: Utf8PathBuf) {
250-
let flow_id = Self::flow_id(&path);
251-
if self.flows.contains_key(&flow_id) && self.load_flow(flow_id, path.clone()).await {
252-
info!(target: "flows", "Reloaded updated flow {path}");
252+
if self.load_flow(flow_id, path.clone()).await {
253+
info!(target: "flows", "Loading flow {path}");
253254
}
254255
}
255256

256257
pub async fn remove_flow(&mut self, path: Utf8PathBuf) {
257258
let flow_id = Self::flow_id(&path);
258259
self.flows.remove(&flow_id);
259-
info!(target: "flows", "Removed deleted flow {path}");
260+
info!(target: "flows", "Removing flow {path}");
260261
}
261262
}
262263

tests/RobotFramework/tests/tedge_flows/tedge_flows.robot

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ Running tedge-flows
9797
Execute Command tedge mqtt sub test/count/e --duration 2s | grep '{}'
9898

9999
Consuming messages from a process stdout
100-
# Assuming the flow journalctl-follow.toml has been properly installed
101100
Install Journalctl Flow journalctl-follow.toml
102101
${test_start} Get Unix Timestamp
103102
Restart Service tedge-agent
@@ -106,7 +105,6 @@ Consuming messages from a process stdout
106105
[Teardown] Uninstall Journalctl Flow journalctl-follow.toml
107106

108107
Consuming messages from a process stdout, periodically
109-
# Assuming the flow journalctl-cursor.toml has been properly installed
110108
Install Journalctl Flow journalctl-cursor.toml
111109
${test_start} Get Unix Timestamp
112110
Restart Service tedge-agent
@@ -115,32 +113,35 @@ Consuming messages from a process stdout, periodically
115113
[Teardown] Uninstall Journalctl Flow journalctl-cursor.toml
116114

117115
Consuming messages from the tail of file
118-
# Assuming the flow tail-named-pipe.toml has been properly installed
116+
Install Flow tail-named-pipe.toml
119117
${start} Get Unix Timestamp
120118
Execute Command echo hello>/tmp/events
121119
Should Have MQTT Messages topic=log/events message_contains=hello minimum=1 date_from=${start}
120+
[Teardown] Uninstall Flow tail-named-pipe.toml
122121

123122
Consuming messages from a file, periodically
124-
# Assuming the flow read-file-periodically.toml has been properly installed
123+
Install Flow read-file-periodically.toml
125124
Execute Command echo hello >/tmp/file.input
126125
Execute Command tedge mqtt sub test/file/input --duration 1s | grep hello
127126
Execute Command echo world >/tmp/file.input
128127
Execute Command tedge mqtt sub test/file/input --duration 1s | grep world
129128
Execute Command rm /tmp/file.input
130129
Execute Command
131-
... tedge mqtt sub test/file/input --duration 1s | grep 'Error in /etc/tedge/flows/read-file-periodically.toml'
130+
... tedge mqtt sub test/file/input --duration 1s | grep 'Fail to poll /tmp/file.input'
132131
Execute Command echo 'hello world' >/tmp/file.input
133132
Execute Command tedge mqtt sub test/file/input --duration 1s | grep 'hello world'
133+
[Teardown] Uninstall Flow read-file-periodically.toml
134134

135135
Appending messages to a file
136-
# Assuming the flow append-to-file.toml has been properly installed
136+
Install Flow append-to-file.toml
137137
Execute Command for i in $(seq 3); do tedge mqtt pub seq/events "$i"; done
138138
Execute Command grep '\\[seq/events\\] 1' /tmp/events.log
139139
Execute Command grep '\\[seq/events\\] 2' /tmp/events.log
140140
Execute Command grep '\\[seq/events\\] 3' /tmp/events.log
141+
[Teardown] Uninstall Flow append-to-file.toml
141142

142143
Publishing transformation errors
143-
# Assuming the flow publish-js-errors.toml has been properly installed
144+
Install Flow publish-js-errors.toml
144145
${start} Get Unix Timestamp
145146
Execute Command tedge mqtt pub collectd/foo 12345:6789
146147
Should Have MQTT Messages
@@ -167,6 +168,7 @@ Publishing transformation errors
167168
${message} JSONLibrary.Convert String To Json ${messages[0]}
168169
Should Be Equal As Integers ${message["time"]} 12345
169170
Should Be Equal As Integers ${message["b"]["c"]} 6789
171+
[Teardown] Uninstall Flow publish-js-errors.toml
170172

171173

172174
*** Keywords ***
@@ -188,6 +190,14 @@ Uninstall Journalctl Flow
188190
[Arguments] ${definition_file}
189191
Execute Command cmd=rm -f /etc/tedge/flows/${definition_file}
190192

193+
Install Flow
194+
[Arguments] ${definition_file}
195+
ThinEdgeIO.Transfer To Device ${CURDIR}/input-ext/${definition_file} /etc/tedge/flows/
196+
197+
Uninstall Flow
198+
[Arguments] ${definition_file}
199+
Execute Command cmd=rm -f /etc/tedge/flows/${definition_file}
200+
191201
Configure flows
192202
# Required by tail-named-pipe.toml
193203
Execute Command mkfifo /tmp/events

0 commit comments

Comments
 (0)