-
Notifications
You must be signed in to change notification settings - Fork 69
fix: flaky tedge flows tests #3846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -10,6 +10,7 @@ use crate::Tick; | |||||
| use async_trait::async_trait; | ||||||
| use camino::Utf8PathBuf; | ||||||
| use futures::FutureExt; | ||||||
| use serde_json::json; | ||||||
| use std::cmp::min; | ||||||
| use std::collections::HashSet; | ||||||
| use std::time::Duration; | ||||||
|
|
@@ -21,10 +22,13 @@ use tedge_actors::Sender; | |||||
| use tedge_actors::SimpleMessageBox; | ||||||
| use tedge_file_system_ext::FsWatchEvent; | ||||||
| use tedge_mqtt_ext::MqttMessage; | ||||||
| use tedge_mqtt_ext::QoS; | ||||||
| use tedge_mqtt_ext::SubscriptionDiff; | ||||||
| use tedge_mqtt_ext::Topic; | ||||||
| use tedge_mqtt_ext::TopicFilter; | ||||||
| use tedge_watch_ext::WatchEvent; | ||||||
| use tedge_watch_ext::WatchRequest; | ||||||
| use time::OffsetDateTime; | ||||||
| use tokio::io::AsyncWriteExt; | ||||||
| use tokio::time::sleep_until; | ||||||
| use tokio::time::Instant; | ||||||
|
|
@@ -52,6 +56,7 @@ impl Actor for FlowsMapper { | |||||
|
|
||||||
| async fn run(mut self) -> Result<(), RuntimeError> { | ||||||
| self.send_updated_subscriptions().await?; | ||||||
| self.notify_flows_status().await?; | ||||||
|
|
||||||
| while let Some(message) = self.next_message().await { | ||||||
| match message { | ||||||
|
|
@@ -73,17 +78,19 @@ impl Actor for FlowsMapper { | |||||
| if matches!(path.extension(), Some("js" | "ts" | "mjs")) { | ||||||
| self.processor.reload_script(path).await; | ||||||
| } else if path.extension() == Some("toml") { | ||||||
| self.processor.add_flow(path).await; | ||||||
| self.processor.add_flow(path.clone()).await; | ||||||
| self.send_updated_subscriptions().await?; | ||||||
| self.update_flow_status(path.as_str()).await?; | ||||||
| } | ||||||
| } | ||||||
| InputMessage::FsWatchEvent(FsWatchEvent::FileCreated(path)) => { | ||||||
| let Ok(path) = Utf8PathBuf::try_from(path) else { | ||||||
| continue; | ||||||
| }; | ||||||
| if matches!(path.extension(), Some("toml")) { | ||||||
| self.processor.add_flow(path).await; | ||||||
| self.processor.add_flow(path.clone()).await; | ||||||
| self.send_updated_subscriptions().await?; | ||||||
| self.update_flow_status(path.as_str()).await?; | ||||||
| } | ||||||
| } | ||||||
| InputMessage::FsWatchEvent(FsWatchEvent::FileDeleted(path)) => { | ||||||
|
|
@@ -93,8 +100,9 @@ impl Actor for FlowsMapper { | |||||
| if matches!(path.extension(), Some("js" | "ts" | "mjs")) { | ||||||
| self.processor.remove_script(path).await; | ||||||
| } else if path.extension() == Some("toml") { | ||||||
| self.processor.remove_flow(path).await; | ||||||
| self.processor.remove_flow(path.clone()).await; | ||||||
| self.send_updated_subscriptions().await?; | ||||||
| self.update_flow_status(path.as_str()).await?; | ||||||
| } | ||||||
| } | ||||||
| _ => continue, | ||||||
|
|
@@ -163,6 +171,38 @@ impl FlowsMapper { | |||||
| watch_requests | ||||||
| } | ||||||
|
|
||||||
| async fn notify_flows_status(&mut self) -> Result<(), RuntimeError> { | ||||||
| let status = "enabled"; | ||||||
| let now = OffsetDateTime::now_utc(); | ||||||
| for flow in self.processor.flows.keys() { | ||||||
| let status = Self::flow_status(flow, status, &now); | ||||||
| self.mqtt_sender.send(status).await?; | ||||||
| } | ||||||
| Ok(()) | ||||||
| } | ||||||
|
|
||||||
| async fn update_flow_status(&mut self, flow: &str) -> Result<(), RuntimeError> { | ||||||
| let now = OffsetDateTime::now_utc(); | ||||||
| let status = if self.processor.flows.contains_key(flow) { | ||||||
| "updated" | ||||||
| } else { | ||||||
| "removed" | ||||||
| }; | ||||||
| let status = Self::flow_status(flow, status, &now); | ||||||
| self.mqtt_sender.send(status).await?; | ||||||
| Ok(()) | ||||||
| } | ||||||
|
|
||||||
| fn flow_status(flow: &str, status: &str, time: &OffsetDateTime) -> MqttMessage { | ||||||
| let topic = Topic::new_unchecked("te/device/main/service/tedge-flows/status/flows"); | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hard-coded for now. This will be done properly in #3843
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Since the payload is the status of a single flow and not an aggregate stat across all flows.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, but the feature is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay. We can take a final call on this based on whether flow names would be included in the topic or not. In the current state where all flow stats are published to the same topic, this seems appropriate. |
||||||
| let payload = json!({ | ||||||
| "flow": flow, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if it'd be better to include the @reubenmiller Thoughts?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're going to address the exact topic/s which the stats are published to in a follow up PR...so I assume a single topic was just chosen for this PR, and the follow-up PR will publish them on individual topics.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I considered that but decided against it for now. Indeed this raises other questions, such as "what about a flow which definition is removed while the mapper was down?". The goal of this PR was more to quickly come with a fix. |
||||||
| "status": status, | ||||||
| "time": time.unix_timestamp(), | ||||||
| }); | ||||||
| MqttMessage::new(&topic, payload.to_string()).with_qos(QoS::AtLeastOnce) | ||||||
| } | ||||||
|
|
||||||
| async fn on_source_poll(&mut self) -> Result<(), RuntimeError> { | ||||||
| let now = Instant::now(); | ||||||
| let timestamp = DateTime::now(); | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,32 +61,32 @@ Using base64 to encode tedge flows output | |
|
|
||
| Units are configured using topic metadata | ||
| ${transformed_msg} Execute Command | ||
| ... cat /etc/tedge/flows/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test | ||
| ... cat /etc/tedge/data/measurements.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test | ||
| ... strip=True | ||
| ${expected_msg} Execute Command | ||
| ... cat /etc/tedge/flows/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | ||
| ... cat /etc/tedge/data/measurements.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | ||
| ... strip=True | ||
| Should Be Equal | ||
| ... ${transformed_msg} | ||
| ... ${expected_msg} | ||
|
|
||
| Computing average over a time window | ||
| ${transformed_msg} Execute Command | ||
| ... cat /etc/tedge/flows/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js | ||
| ... cat /etc/tedge/data/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval --flow /etc/tedge/flows/average.js | ||
| ... strip=True | ||
| ${expected_msg} Execute Command | ||
| ... cat /etc/tedge/flows/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | ||
| ... cat /etc/tedge/data/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | ||
| ... strip=True | ||
| Should Be Equal | ||
| ... ${transformed_msg} | ||
| ... ${expected_msg} | ||
|
|
||
| Each instance of a script must have its own static state | ||
| ${transformed_msg} Execute Command | ||
| ... cat /etc/tedge/flows/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort | ||
| ... cat /etc/tedge/data/count-messages.samples | awk '{ print $2 }' FS\='INPUT:' | tedge flows test --final-on-interval | sort | ||
| ... strip=True | ||
| ${expected_msg} Execute Command | ||
| ... cat /etc/tedge/flows/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort | ||
| ... cat /etc/tedge/data/count-messages.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: ' | sort | ||
| ... strip=True | ||
| Should Be Equal | ||
| ... ${transformed_msg} | ||
|
|
@@ -180,19 +180,31 @@ Custom Setup | |
| Start Service tedge-flows | ||
|
|
||
| Copy Configuration Files | ||
| ThinEdgeIO.Transfer To Device ${CURDIR}/flows/* /etc/tedge/flows/ | ||
| ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.js /etc/tedge/flows/ | ||
| ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.toml /etc/tedge/flows/ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Side comment unrelated to this PR: We could consider installing the relevant flow at the test level for older tests as well in an independent PR. |
||
| ThinEdgeIO.Transfer To Device ${CURDIR}/flows/*.samples /etc/tedge/data/ | ||
|
|
||
| Install Journalctl Flow | ||
| [Arguments] ${definition_file} | ||
| ${start} Get Unix Timestamp | ||
| ThinEdgeIO.Transfer To Device ${CURDIR}/journalctl-flows/${definition_file} /etc/tedge/flows/ | ||
| Should Have MQTT Messages | ||
| ... topic=te/device/main/service/tedge-flows/status/flows | ||
| ... date_from=${start} | ||
| ... message_contains=${definition_file} | ||
|
|
||
| Uninstall Journalctl Flow | ||
| [Arguments] ${definition_file} | ||
| Execute Command cmd=rm -f /etc/tedge/flows/${definition_file} | ||
|
|
||
| Install Flow | ||
| [Arguments] ${definition_file} | ||
| ThinEdgeIO.Transfer To Device ${CURDIR}/input-ext/${definition_file} /etc/tedge/flows/ | ||
| ${start} Get Unix Timestamp | ||
| ThinEdgeIO.Transfer To Device ${CURDIR}/input-flows/${definition_file} /etc/tedge/flows/ | ||
| Should Have MQTT Messages | ||
| ... topic=te/device/main/service/tedge-flows/status/flows | ||
| ... date_from=${start} | ||
| ... message_contains=${definition_file} | ||
|
|
||
| Uninstall Flow | ||
| [Arguments] ${definition_file} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a deal breaker or anything, but I see duplicate
status/flowmessages whenever flows are dynamically installed. I believe it's because of the fs-notify issue where a newly created file emits bothModifiedandUpdatedevents. Wondering if we can react toModifiedevents only.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have some generic logic which debounces inotify notifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we have. But there are still some surprising event sequences.
For now, we are on the safe side - with duplicated events. I will need some time to investigate if only reacting to
Modifiedis correct.