Skip to content

Commit 2570950

Browse files
committed
fixup! Tedge flows publish enabled/updated/removed status
1 parent d53c456 commit 2570950

File tree

1 file changed

+40
-13
lines changed

1 file changed

+40
-13
lines changed

crates/extensions/tedge_flows/tests/interval.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,23 @@ async fn interval_executes_at_configured_frequency() {
4949
let captured_messages = CapturedMessages::default();
5050
let mut mqtt = MockMqtt::new(captured_messages.clone());
5151
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
52+
let count = || {
53+
captured_messages
54+
.retain(|msg| !msg.topic.as_ref().contains("status"))
55+
.count()
56+
};
5257

5358
tick(Duration::from_millis(500)).await;
54-
assert_eq!(captured_messages.count(), 0, "Should not execute before 1s");
59+
assert_eq!(count(), 0, "Should not execute before 1s");
5560

5661
tick(Duration::from_millis(600)).await;
57-
assert_eq!(captured_messages.count(), 1, "Should execute once at 1s");
62+
assert_eq!(count(), 1, "Should execute once at 1s");
5863

5964
tick(Duration::from_secs(1)).await;
60-
assert_eq!(captured_messages.count(), 2, "Should execute twice at 2s");
65+
assert_eq!(count(), 2, "Should execute twice at 2s");
6166

6267
tick(Duration::from_secs(1)).await;
63-
assert_eq!(
64-
captured_messages.count(),
65-
3,
66-
"Should execute three times at 3s"
67-
);
68+
assert_eq!(count(), 3, "Should execute three times at 3s");
6869

6970
actor_handle.abort();
7071
let _ = actor_handle.await;
@@ -123,7 +124,11 @@ async fn multiple_scripts_execute_at_independent_frequencies() {
123124
let captured_messages = CapturedMessages::default();
124125
let mut mqtt = MockMqtt::new(captured_messages.clone());
125126
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
126-
let count = |topic: &str| captured_messages.count_topic(topic);
127+
let count = |topic: &str| {
128+
captured_messages
129+
.retain(|msg| !msg.topic.as_ref().contains("status"))
130+
.count_topic(topic)
131+
};
127132

128133
tick(Duration::from_millis(500)).await;
129134
assert_eq!(count("test/fast"), 1, "Fast should execute once at 500ms");
@@ -177,7 +182,11 @@ async fn script_with_oninterval_but_no_config_gets_default_1s_interval() {
177182
let captured_messages = CapturedMessages::default();
178183
let mut mqtt = MockMqtt::new(captured_messages.clone());
179184
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
180-
let count = || captured_messages.count();
185+
let count = || {
186+
captured_messages
187+
.retain(|msg| !msg.topic.as_ref().contains("status"))
188+
.count()
189+
};
181190

182191
tick(Duration::from_millis(500)).await;
183192
assert_eq!(count(), 0, "Shouldn't execute before default 1s interval");
@@ -228,7 +237,11 @@ async fn interval_executes_independently_from_message_processing() {
228237
let captured_messages = CapturedMessages::default();
229238
let mut mqtt = MockMqtt::new(captured_messages.clone());
230239
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
231-
let count = |topic: &str| captured_messages.count_topic(topic);
240+
let count = |topic: &str| {
241+
captured_messages
242+
.retain(|msg| !msg.topic.as_ref().contains("status"))
243+
.count_topic(topic)
244+
};
232245

233246
tick(Duration::from_millis(1000)).await;
234247
assert_eq!(
@@ -303,7 +316,11 @@ async fn very_short_intervals_execute_correctly() {
303316
let captured_messages = CapturedMessages::default();
304317
let mut mqtt = MockMqtt::new(captured_messages.clone());
305318
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
306-
let count = || captured_messages.count();
319+
let count = || {
320+
captured_messages
321+
.retain(|msg| !msg.topic.as_ref().contains("status"))
322+
.count()
323+
};
307324

308325
tick(Duration::from_millis(100)).await;
309326
assert_eq!(count(), 1, "Should execute once by 100ms");
@@ -350,7 +367,11 @@ async fn interval_executes_when_time_exceeds_interval() {
350367
let captured_messages = CapturedMessages::default();
351368
let mut mqtt = MockMqtt::new(captured_messages.clone());
352369
let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await;
353-
let count = || captured_messages.count();
370+
let count = || {
371+
captured_messages
372+
.retain(|msg| !msg.topic.as_ref().contains("status"))
373+
.count()
374+
};
354375

355376
tick(Duration::from_secs(60)).await;
356377
assert_eq!(count(), 0, "Should not execute before 2 minutes");
@@ -494,4 +515,10 @@ impl CapturedMessages {
494515
let msgs = self.messages.lock().unwrap();
495516
msgs.iter().filter(|m| m.topic.name == topic).count()
496517
}
518+
519+
pub fn retain(&self, predicate: impl Fn(&MqttMessage) -> bool) -> &Self {
520+
let mut messages = self.messages.lock().unwrap();
521+
messages.retain(predicate);
522+
self
523+
}
497524
}

0 commit comments

Comments
 (0)