diff --git a/Cargo.lock b/Cargo.lock index 105887866cf..a164473d66e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5142,6 +5142,7 @@ dependencies = [ "anyhow", "async-trait", "camino", + "futures", "humantime", "rquickjs", "serde", diff --git a/crates/core/tedge/src/cli/flows/test.rs b/crates/core/tedge/src/cli/flows/test.rs index 1ed0a9886ee..3b4911a248c 100644 --- a/crates/core/tedge/src/cli/flows/test.rs +++ b/crates/core/tedge/src/cli/flows/test.rs @@ -11,6 +11,7 @@ use tedge_flows::MessageProcessor; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio::io::Stdin; +use tokio::time::Instant; pub struct TestCommand { pub flows_dir: Utf8PathBuf, @@ -48,7 +49,10 @@ impl Command for TestCommand { } if self.final_on_interval { let timestamp = DateTime::now(); - self.tick(&mut processor, ×tamp).await; + let now = processor + .last_interval_deadline() + .unwrap_or_else(Instant::now); + self.tick(&mut processor, ×tamp, now).await; } Ok(()) } @@ -77,9 +81,9 @@ impl TestCommand { .for_each(|msg| self.print_messages(msg)) } - async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) { + async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime, now: Instant) { processor - .on_interval(timestamp) + .on_interval(timestamp, now) .await .into_iter() .for_each(|msg| self.print_messages(msg)) diff --git a/crates/extensions/tedge_flows/Cargo.toml b/crates/extensions/tedge_flows/Cargo.toml index d8876f0f188..3f25df3ea57 100644 --- a/crates/extensions/tedge_flows/Cargo.toml +++ b/crates/extensions/tedge_flows/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true anyhow = { workspace = true } async-trait = { workspace = true } camino = { workspace = true, features = ["serde1"] } +futures = { workspace = true } humantime = { workspace = true } rquickjs = { git = "https://github.com/thin-edge/rquickjs", rev = "4ed04ac9af3de453dd41ff09fdd1837c7ceb1f1c", default-features = false, features = [ # disable bindgen and rely on pre-generated bindings due to problems @@ -33,7 +34,9 @@ toml = { workspace = true, features = ["parse"] } tracing = { workspace = true } [dev-dependencies] +tedge_mqtt_ext = { workspace = true, features = ["test-helpers"] } tempfile = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } [lints] workspace = true diff --git a/crates/extensions/tedge_flows/src/actor.rs b/crates/extensions/tedge_flows/src/actor.rs index e2dd32778f1..9c96e47579c 100644 --- a/crates/extensions/tedge_flows/src/actor.rs +++ b/crates/extensions/tedge_flows/src/actor.rs @@ -5,6 +5,8 @@ use crate::InputMessage; use crate::OutputMessage; use async_trait::async_trait; use camino::Utf8PathBuf; +use futures::future::Either; +use std::future::pending; use tedge_actors::Actor; use tedge_actors::MessageReceiver; use tedge_actors::RuntimeError; @@ -14,8 +16,8 @@ use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::SubscriptionDiff; use tedge_mqtt_ext::TopicFilter; -use tokio::time::interval; -use tokio::time::Duration; +use tokio::time::sleep_until; +use tokio::time::Instant; use tracing::error; pub struct FlowsMapper { @@ -31,11 +33,14 @@ impl Actor for FlowsMapper { } async fn run(mut self) -> Result<(), RuntimeError> { - let mut interval = interval(Duration::from_secs(1)); - loop { + let deadline_future = match self.processor.next_interval_deadline() { + Some(deadline) => Either::Left(sleep_until(deadline)), + None => Either::Right(pending()), + }; + tokio::select! { - _ = interval.tick() => { + _ = deadline_future => { self.on_interval().await?; } message = self.messages.recv() => { @@ -130,12 +135,13 @@ impl FlowsMapper { } async fn on_interval(&mut self) -> Result<(), RuntimeError> { + let now = Instant::now(); let timestamp = DateTime::now(); if timestamp.seconds % 300 == 0 { self.processor.dump_memory_stats().await; self.processor.dump_processing_stats().await; } - for (flow_id, flow_messages) in self.processor.on_interval(×tamp).await { + for (flow_id, flow_messages) in self.processor.on_interval(×tamp, now).await { match flow_messages { Ok(messages) => { for message in messages { diff --git a/crates/extensions/tedge_flows/src/config.rs b/crates/extensions/tedge_flows/src/config.rs index cf7585012d6..83d61b6bfec 100644 --- a/crates/extensions/tedge_flows/src/config.rs +++ b/crates/extensions/tedge_flows/src/config.rs @@ -84,6 +84,7 @@ impl FlowConfig { js_runtime.load_script(&mut step.script).await?; step.check(&source); step.fix(); + step.script.init_next_execution(); steps.push(step); } Ok(Flow { diff --git a/crates/extensions/tedge_flows/src/flow.rs b/crates/extensions/tedge_flows/src/flow.rs index d3e72d3a788..9d3dd79917b 100644 --- a/crates/extensions/tedge_flows/src/flow.rs +++ b/crates/extensions/tedge_flows/src/flow.rs @@ -9,6 +9,7 @@ use serde_json::Value; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; use time::OffsetDateTime; +use tokio::time::Instant; use tracing::warn; /// A chain of transformation of MQTT messages @@ -119,10 +120,11 @@ impl Flow { js_runtime: &JsRuntime, stats: &mut Counter, timestamp: &DateTime, + now: Instant, ) -> Result, FlowError> { let stated_at = stats.flow_on_interval_start(self.source.as_str()); let mut messages = vec![]; - for step in self.steps.iter() { + for step in self.steps.iter_mut() { let js = step.script.source(); // Process first the messages triggered upstream by the tick let mut transformed_messages = vec![]; @@ -138,16 +140,18 @@ impl Flow { transformed_messages.extend(step_output?); } - // Only then process the tick - let step_started_at = stats.flow_step_start(&js, "onInterval"); - let tick_output = step.script.on_interval(js_runtime, timestamp).await; - match &tick_output { - Ok(messages) => { - stats.flow_step_done(&js, "onInterval", step_started_at, messages.len()) + // Only then process the tick if it's time to execute + if step.script.should_execute_interval(now) { + let step_started_at = stats.flow_step_start(&js, "onInterval"); + let tick_output = step.script.on_interval(js_runtime, timestamp).await; + match &tick_output { + Ok(messages) => { + stats.flow_step_done(&js, "onInterval", step_started_at, messages.len()) + } + Err(_) => stats.flow_step_failed(&js, "onInterval"), } - Err(_) => stats.flow_step_failed(&js, "onInterval"), + transformed_messages.extend(tick_output?); } - transformed_messages.extend(tick_output?); // Iterate with all the messages collected at this step messages = transformed_messages; diff --git a/crates/extensions/tedge_flows/src/js_script.rs b/crates/extensions/tedge_flows/src/js_script.rs index cc44aa032e1..e97dd361197 100644 --- a/crates/extensions/tedge_flows/src/js_script.rs +++ b/crates/extensions/tedge_flows/src/js_script.rs @@ -6,14 +6,16 @@ use crate::js_runtime::JsRuntime; use crate::js_value::JsonValue; use camino::Utf8Path; use camino::Utf8PathBuf; +use tokio::time::Instant; use tracing::debug; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct JsScript { pub module_name: String, pub path: Utf8PathBuf, pub config: JsonValue, pub interval: std::time::Duration, + pub next_execution: Option, pub no_js_on_message_fun: bool, pub no_js_on_config_update_fun: bool, pub no_js_on_interval_fun: bool, @@ -27,6 +29,7 @@ impl JsScript { path, config: JsonValue::default(), interval: std::time::Duration::ZERO, + next_execution: None, no_js_on_message_fun: true, no_js_on_config_update_fun: true, no_js_on_interval_fun: true, @@ -116,6 +119,36 @@ impl JsScript { Ok(()) } + /// Initialize the next execution time for this script's interval + /// Should be called after the script is loaded and interval is set + pub fn init_next_execution(&mut self) { + if !self.no_js_on_interval_fun && !self.interval.is_zero() { + self.next_execution = Some(Instant::now() + self.interval); + } + } + + /// Check if this script should execute its interval function now + /// Returns true and updates next_execution if it's time to execute + pub fn should_execute_interval(&mut self, now: Instant) -> bool { + if self.no_js_on_interval_fun || self.interval.is_zero() { + return false; + } + + match self.next_execution { + Some(deadline) if now >= deadline => { + // Time to execute - schedule next execution + self.next_execution = Some(now + self.interval); + true + } + None => { + // First execution - initialize and execute + self.next_execution = Some(now + self.interval); + true + } + _ => false, + } + } + /// Trigger the onInterval function of the JS module /// /// The "onInterval" function is passed 2 arguments @@ -123,17 +156,13 @@ impl JsScript { /// - the current flow step config /// /// Return zero, one or more messages + /// + /// Note: Caller should check should_execute_interval() before calling this pub async fn on_interval( &self, js: &JsRuntime, timestamp: &DateTime, ) -> Result, FlowError> { - if self.no_js_on_interval_fun { - return Ok(vec![]); - } - if !timestamp.tick_now(self.interval) { - return Ok(vec![]); - } debug!(target: "flows", "{}: onInterval({timestamp:?})", self.module_name()); let input = vec![timestamp.clone().into(), self.config.clone()]; js.call_function(&self.module_name(), "onInterval", input) diff --git a/crates/extensions/tedge_flows/src/runtime.rs b/crates/extensions/tedge_flows/src/runtime.rs index f5aaef29dde..c6ef005df4c 100644 --- a/crates/extensions/tedge_flows/src/runtime.rs +++ b/crates/extensions/tedge_flows/src/runtime.rs @@ -13,6 +13,7 @@ use std::path::Path; use tedge_mqtt_ext::TopicFilter; use tokio::fs::read_dir; use tokio::fs::read_to_string; +use tokio::time::Instant; use tracing::error; use tracing::info; use tracing::warn; @@ -92,6 +93,29 @@ impl MessageProcessor { topics } + /// Get the next deadline for interval execution across all scripts + /// Returns None if no scripts have intervals configured + pub fn next_interval_deadline(&self) -> Option { + self.flows + .values() + .flat_map(|flow| &flow.steps) + .filter_map(|step| step.script.next_execution) + .min() + } + + /// Get the last deadline for interval execution across all scripts Returns + /// None if no scripts have intervals configured + /// + /// This is intended for `tedge flows test` to ensure it processes all + /// intervals + pub fn last_interval_deadline(&self) -> Option { + self.flows + .values() + .flat_map(|flow| &flow.steps) + .filter_map(|step| step.script.next_execution) + .max() + } + pub async fn on_message( &mut self, timestamp: &DateTime, @@ -117,11 +141,12 @@ impl MessageProcessor { pub async fn on_interval( &mut self, timestamp: &DateTime, + now: Instant, ) -> Vec<(String, Result, FlowError>)> { let mut out_messages = vec![]; for (flow_id, flow) in self.flows.iter_mut() { let flow_output = flow - .on_interval(&self.js_runtime, &mut self.stats, timestamp) + .on_interval(&self.js_runtime, &mut self.stats, timestamp, now) .await; if flow_output.is_err() { self.stats.flow_on_interval_failed(flow_id); @@ -145,6 +170,7 @@ impl MessageProcessor { if step.script.path() == path { match self.js_runtime.load_script(&mut step.script).await { Ok(()) => { + step.script.init_next_execution(); info!(target: "flows", "Reloaded flow script {path}"); } Err(e) => { diff --git a/crates/extensions/tedge_flows/tests/interval.rs b/crates/extensions/tedge_flows/tests/interval.rs new file mode 100644 index 00000000000..c572b1b6505 --- /dev/null +++ b/crates/extensions/tedge_flows/tests/interval.rs @@ -0,0 +1,497 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; +use tedge_actors::Actor; +use tedge_actors::Builder; +use tedge_actors::CloneSender; +use tedge_actors::MappingSender; +use tedge_actors::MessageSink; +use tedge_actors::MessageSource; +use tedge_actors::NoConfig; +use tedge_actors::SimpleMessageBoxBuilder; +use tedge_flows::FlowsMapperBuilder; +use tedge_mqtt_ext::DynSubscriptions; +use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::MqttRequest; +use tempfile::TempDir; + +#[tokio::test(start_paused = true)] +async fn interval_executes_at_configured_frequency() { + let config_dir = create_test_flow_dir(); + + write_file( + &config_dir, + "counter.js", + r#" + let count = 0; + export function onInterval(timestamp, config) { + count++; + return [{ + topic: "test/interval/count", + payload: JSON.stringify({count: count}) + }]; + } + "#, + ); + + write_file( + &config_dir, + "counter_flow.toml", + r#" + input.mqtt.topics = ["test/input"] + + [[steps]] + script = "counter.js" + interval = "1s" + "#, + ); + + let captured_messages = CapturedMessages::default(); + let mut mqtt = MockMqtt::new(captured_messages.clone()); + let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + + tick(Duration::from_millis(500)).await; + assert_eq!(captured_messages.count(), 0, "Should not execute before 1s"); + + tick(Duration::from_millis(600)).await; + assert_eq!(captured_messages.count(), 1, "Should execute once at 1s"); + + tick(Duration::from_secs(1)).await; + assert_eq!(captured_messages.count(), 2, "Should execute twice at 2s"); + + tick(Duration::from_secs(1)).await; + assert_eq!( + captured_messages.count(), + 3, + "Should execute three times at 3s" + ); + + actor_handle.abort(); + let _ = actor_handle.await; +} + +#[tokio::test(start_paused = true)] +async fn multiple_scripts_execute_at_independent_frequencies() { + let config_dir = create_test_flow_dir(); + + write_file( + &config_dir, + "fast.js", + r#" + let count = 0; + export function onInterval(timestamp, config) { + count++; + return [{ + topic: "test/fast", + payload: JSON.stringify({count: count}) + }]; + } + "#, + ); + + write_file( + &config_dir, + "slow.js", + r#" + let count = 0; + export function onInterval(timestamp, config) { + count++; + return [{ + topic: "test/slow", + payload: JSON.stringify({count: count}) + }]; + } + "#, + ); + + write_file( + &config_dir, + "multi_interval.toml", + r#" + input.mqtt.topics = ["test/input"] + + [[steps]] + script = "fast.js" + interval = "500ms" + + [[steps]] + script = "slow.js" + interval = "2s" + "#, + ); + + let captured_messages = CapturedMessages::default(); + let mut mqtt = MockMqtt::new(captured_messages.clone()); + let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + let count = |topic: &str| captured_messages.count_topic(topic); + + tick(Duration::from_millis(500)).await; + assert_eq!(count("test/fast"), 1, "Fast should execute once at 500ms"); + assert_eq!(count("test/slow"), 0, "Slow should not execute yet"); + + tick(Duration::from_millis(500)).await; + assert_eq!(count("test/fast"), 2, "Fast should execute twice by 1s"); + assert_eq!(count("test/slow"), 0, "Slow still shouldn't fire"); + + // 1.5s -> fast should execute again, slow still not + tick(Duration::from_millis(500)).await; + + // 2s -> both should execute + tick(Duration::from_millis(500)).await; + + assert_eq!(count("test/fast"), 4, "Fast should execute 4 times by 2s"); + assert_eq!(count("test/slow"), 1, "Slow should execute once at 2s"); + + actor_handle.abort(); + let _ = actor_handle.await; +} + +#[tokio::test(start_paused = true)] +async fn script_with_oninterval_but_no_config_gets_default_1s_interval() { + let config_dir = create_test_flow_dir(); + + write_file( + &config_dir, + "no_interval.js", + r#" + export function onInterval(timestamp, config) { + return [{ + topic: "test/default/interval", + payload: "tick" + }]; + } + "#, + ); + + write_file( + &config_dir, + "no_interval.toml", + r#" + input.mqtt.topics = ["test/input"] + + [[steps]] + script = "no_interval.js" + "#, + ); + + let captured_messages = CapturedMessages::default(); + let mut mqtt = MockMqtt::new(captured_messages.clone()); + let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + let count = || captured_messages.count(); + + tick(Duration::from_millis(500)).await; + assert_eq!(count(), 0, "Shouldn't execute before default 1s interval"); + + tick(Duration::from_millis(500)).await; + assert_eq!(count(), 1, "Should execute once with default 1s interval"); + + actor_handle.abort(); + let _ = actor_handle.await; +} + +#[tokio::test(start_paused = true)] +async fn interval_executes_independently_from_message_processing() { + let config_dir = create_test_flow_dir(); + + write_file( + &config_dir, + "dual.js", + r#" + export function onMessage(msg, config) { + return [{ + topic: "onMessage", + payload: msg.payload + }]; + } + + export function onInterval(timestamp, config) { + return [{ + topic: "onInterval", + payload: "tick" + }]; + } + "#, + ); + + write_file( + &config_dir, + "dual.toml", + r#" + input.mqtt.topics = ["test/input"] + + [[steps]] + script = "dual.js" + interval = "1s" + "#, + ); + + let captured_messages = CapturedMessages::default(); + let mut mqtt = MockMqtt::new(captured_messages.clone()); + let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + let count = |topic: &str| captured_messages.count_topic(topic); + + tick(Duration::from_millis(1000)).await; + assert_eq!( + count("onInterval"), + 1, + "Should get 1 interval message after 1s" + ); + + tick(Duration::from_millis(1000)).await; + assert_eq!( + count("onInterval"), + 2, + "Should get 2 interval messages after 2s" + ); + + assert_eq!( + count("onMessage"), + 0, + "No input messages sent, should get 0 output messages" + ); + + // Now publish a message and verify onMessage is called but onInterval is not + mqtt.publish("test/input", "hello").await; + tick(Duration::from_millis(100)).await; + + assert_eq!( + count("onMessage"), + 1, + "Should get 1 message output after publishing input" + ); + assert_eq!( + count("onInterval"), + 2, + "Interval should not have fired again" + ); + + actor_handle.abort(); + let _ = actor_handle.await; +} + +#[tokio::test(start_paused = true)] +async fn very_short_intervals_execute_correctly() { + let config_dir = create_test_flow_dir(); + + write_file( + &config_dir, + "rapid.js", + r#" + let count = 0; + export function onInterval(timestamp, config) { + count++; + return [{ + topic: "test/rapid", + payload: String(count) + }]; + } + "#, + ); + + write_file( + &config_dir, + "rapid.toml", + r#" + input.mqtt.topics = ["test/input"] + + [[steps]] + script = "rapid.js" + interval = "100ms" + "#, + ); + + let captured_messages = CapturedMessages::default(); + let mut mqtt = MockMqtt::new(captured_messages.clone()); + let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + let count = || captured_messages.count(); + + tick(Duration::from_millis(100)).await; + assert_eq!(count(), 1, "Should execute once by 100ms"); + + tick(Duration::from_millis(100)).await; + assert_eq!(count(), 2, "Should execute twice by 200ms"); + + tick(Duration::from_millis(100)).await; + assert_eq!(count(), 3, "Should execute 3 times by 300ms"); + + actor_handle.abort(); + let _ = actor_handle.await; +} + +#[tokio::test(start_paused = true)] +async fn interval_executes_when_time_exceeds_interval() { + let config_dir = create_test_flow_dir(); + + write_file( + &config_dir, + "skip.js", + r#" + export function onInterval(timestamp, config) { + return [{ + topic: "test/skip", + payload: "executed" + }]; + } + "#, + ); + + write_file( + &config_dir, + "skip.toml", + r#" + input.mqtt.topics = ["test/input"] + + [[steps]] + script = "skip.js" + interval = "120s" + "#, + ); + + let captured_messages = CapturedMessages::default(); + let mut mqtt = MockMqtt::new(captured_messages.clone()); + let actor_handle = spawn_flows_actor(&config_dir, &mut mqtt).await; + let count = || captured_messages.count(); + + tick(Duration::from_secs(60)).await; + assert_eq!(count(), 0, "Should not execute before 2 minutes"); + + // Jump over the 2 minutes mark by waiting 2 minutes more (total 3 minutes) + tick(Duration::from_secs(120)).await; + assert_eq!( + count(), + 1, + "Should execute once even though we skipped over the 2 minute mark" + ); + + // Allow another 2 minutes to pass (total 5 minutes) + tick(Duration::from_secs(120)).await; + assert_eq!(count(), 2, "Should execute again after a further 2 minutes"); + + actor_handle.abort(); + let _ = actor_handle.await; +} + +fn create_test_flow_dir() -> TempDir { + tempfile::tempdir().unwrap() +} + +fn write_file(dir: &TempDir, name: &str, content: &str) { + std::fs::write(dir.path().join(name), content).expect("Failed to write file"); +} + +async fn tick(duration: Duration) { + tokio::time::advance(duration).await; + + // Give actor time to process any actions triggered by the passage of time + tokio::time::sleep(Duration::from_millis(10)).await; +} + +type ActorHandle = tokio::task::JoinHandle>; + +async fn spawn_flows_actor(config_dir: &TempDir, mqtt: &mut MockMqtt) -> ActorHandle { + let mut flows_builder = FlowsMapperBuilder::try_new(config_dir.path().to_str().unwrap()) + .await + .expect("Failed to create FlowsMapper"); + + flows_builder.connect(mqtt); + let flows_actor = flows_builder.build(); + + mqtt.build(); + + let handle = tokio::spawn(flows_actor.run()); + + // Give actor time to initialize + tokio::time::sleep(Duration::from_millis(10)).await; + + handle +} + +struct MockMqtt { + inbox: Option>, + captured: CapturedMessages, + sender: Mutex>>, + message_box: Option>, +} + +impl MockMqtt { + fn new(captured: CapturedMessages) -> Self { + Self { + inbox: Some(SimpleMessageBoxBuilder::new("MockMqtt", 16)), + captured, + sender: Mutex::new(None), + message_box: None, + } + } + + fn build(&mut self) { + // Build the message box after it's been connected + let builder = self.inbox.take().unwrap(); + let message_box = builder.build(); + self.message_box = Some(message_box); + } + + async fn publish(&mut self, topic: &str, payload: &str) { + use tedge_actors::Sender; + + let msg = MqttMessage::new( + &tedge_mqtt_ext::Topic::new_unchecked(topic), + payload.as_bytes(), + ); + + if let Some(message_box) = self.message_box.as_mut() { + message_box.send(msg).await.expect("Failed to send message"); + } + } +} + +impl MessageSource for MockMqtt { + fn connect_sink( + &mut self, + config: &mut DynSubscriptions, + peer: &impl MessageSink, + ) { + config.set_client_id_usize(0); + let inbox = self + .inbox + .as_mut() + .expect("Must connect sinks before building"); + inbox.connect_sink(NoConfig, peer); + } +} + +impl MessageSink for MockMqtt { + fn get_sender(&self) -> tedge_actors::DynSender { + let mut cached_sender = self.sender.lock().unwrap(); + if let Some(sender) = &*cached_sender { + return sender.sender_clone(); + } + + let captured = self.captured.messages.clone(); + let inbox_sender = self.inbox.as_ref().unwrap().get_sender(); + let sender = Box::new(MappingSender::new(inbox_sender, move |req| { + if let MqttRequest::Publish(msg) = &req { + captured.lock().unwrap().push(msg.clone()); + } + Some(req) + })); + + *cached_sender = Some(sender.sender_clone()); + sender + } +} + +#[derive(Clone, Default)] +struct CapturedMessages { + messages: Arc>>, +} + +impl CapturedMessages { + pub fn count(&self) -> usize { + self.messages.lock().unwrap().len() + } + + pub fn count_topic(&self, topic: &str) -> usize { + let msgs = self.messages.lock().unwrap(); + msgs.iter().filter(|m| m.topic.name == topic).count() + } +}