Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions crates/core/tedge/src/cli/flows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tedge_flows::MessageProcessor;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Stdin;
use tokio::time::Instant;

pub struct TestCommand {
pub flows_dir: Utf8PathBuf,
Expand Down Expand Up @@ -48,7 +49,10 @@ impl Command for TestCommand {
}
if self.final_on_interval {
let timestamp = DateTime::now();
self.tick(&mut processor, &timestamp).await;
let now = processor
.last_interval_deadline()
.unwrap_or_else(Instant::now);
self.tick(&mut processor, &timestamp, now).await;
}
Ok(())
}
Expand Down Expand Up @@ -77,9 +81,9 @@ impl TestCommand {
.for_each(|msg| self.print_messages(msg))
}

async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) {
async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime, now: Instant) {
processor
.on_interval(timestamp)
.on_interval(timestamp, now)
.await
.into_iter()
.for_each(|msg| self.print_messages(msg))
Expand Down
3 changes: 3 additions & 0 deletions crates/extensions/tedge_flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
anyhow = { workspace = true }
async-trait = { workspace = true }
camino = { workspace = true, features = ["serde1"] }
futures = { workspace = true }
humantime = { workspace = true }
rquickjs = { git = "https://github.com/thin-edge/rquickjs", rev = "4ed04ac9af3de453dd41ff09fdd1837c7ceb1f1c", default-features = false, features = [
# disable bindgen and rely on pre-generated bindings due to problems
Expand All @@ -33,7 +34,9 @@ toml = { workspace = true, features = ["parse"] }
tracing = { workspace = true }

[dev-dependencies]
tedge_mqtt_ext = { workspace = true, features = ["test-helpers"] }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }

[lints]
workspace = true
18 changes: 12 additions & 6 deletions crates/extensions/tedge_flows/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::InputMessage;
use crate::OutputMessage;
use async_trait::async_trait;
use camino::Utf8PathBuf;
use futures::future::Either;
use std::future::pending;
use tedge_actors::Actor;
use tedge_actors::MessageReceiver;
use tedge_actors::RuntimeError;
Expand All @@ -14,8 +16,8 @@ use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::SubscriptionDiff;
use tedge_mqtt_ext::TopicFilter;
use tokio::time::interval;
use tokio::time::Duration;
use tokio::time::sleep_until;
use tokio::time::Instant;
use tracing::error;

pub struct FlowsMapper {
Expand All @@ -31,11 +33,14 @@ impl Actor for FlowsMapper {
}

async fn run(mut self) -> Result<(), RuntimeError> {
let mut interval = interval(Duration::from_secs(1));

loop {
let deadline_future = match self.processor.next_interval_deadline() {
Some(deadline) => Either::Left(sleep_until(deadline)),
None => Either::Right(pending()),
};

tokio::select! {
_ = interval.tick() => {
_ = deadline_future => {
self.on_interval().await?;
}
message = self.messages.recv() => {
Expand Down Expand Up @@ -130,12 +135,13 @@ impl FlowsMapper {
}

async fn on_interval(&mut self) -> Result<(), RuntimeError> {
let now = Instant::now();
let timestamp = DateTime::now();
if timestamp.seconds % 300 == 0 {
self.processor.dump_memory_stats().await;
self.processor.dump_processing_stats().await;
}
for (flow_id, flow_messages) in self.processor.on_interval(&timestamp).await {
for (flow_id, flow_messages) in self.processor.on_interval(&timestamp, now).await {
match flow_messages {
Ok(messages) => {
for message in messages {
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/tedge_flows/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl FlowConfig {
js_runtime.load_script(&mut step.script).await?;
step.check(&source);
step.fix();
step.script.init_next_execution();
steps.push(step);
}
Ok(Flow {
Expand Down
22 changes: 13 additions & 9 deletions crates/extensions/tedge_flows/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde_json::Value;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
use time::OffsetDateTime;
use tokio::time::Instant;
use tracing::warn;

/// A chain of transformation of MQTT messages
Expand Down Expand Up @@ -119,10 +120,11 @@ impl Flow {
js_runtime: &JsRuntime,
stats: &mut Counter,
timestamp: &DateTime,
now: Instant,
) -> Result<Vec<Message>, FlowError> {
let stated_at = stats.flow_on_interval_start(self.source.as_str());
let mut messages = vec![];
for step in self.steps.iter() {
for step in self.steps.iter_mut() {
let js = step.script.source();
// Process first the messages triggered upstream by the tick
let mut transformed_messages = vec![];
Expand All @@ -138,16 +140,18 @@ impl Flow {
transformed_messages.extend(step_output?);
}

// Only then process the tick
let step_started_at = stats.flow_step_start(&js, "onInterval");
let tick_output = step.script.on_interval(js_runtime, timestamp).await;
match &tick_output {
Ok(messages) => {
stats.flow_step_done(&js, "onInterval", step_started_at, messages.len())
// Only then process the tick if it's time to execute
if step.script.should_execute_interval(now) {
let step_started_at = stats.flow_step_start(&js, "onInterval");
let tick_output = step.script.on_interval(js_runtime, timestamp).await;
match &tick_output {
Ok(messages) => {
stats.flow_step_done(&js, "onInterval", step_started_at, messages.len())
}
Err(_) => stats.flow_step_failed(&js, "onInterval"),
}
Err(_) => stats.flow_step_failed(&js, "onInterval"),
transformed_messages.extend(tick_output?);
}
transformed_messages.extend(tick_output?);

// Iterate with all the messages collected at this step
messages = transformed_messages;
Expand Down
43 changes: 36 additions & 7 deletions crates/extensions/tedge_flows/src/js_script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ use crate::js_runtime::JsRuntime;
use crate::js_value::JsonValue;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use tokio::time::Instant;
use tracing::debug;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct JsScript {
pub module_name: String,
pub path: Utf8PathBuf,
pub config: JsonValue,
pub interval: std::time::Duration,
pub next_execution: Option<Instant>,
pub no_js_on_message_fun: bool,
pub no_js_on_config_update_fun: bool,
pub no_js_on_interval_fun: bool,
Expand All @@ -27,6 +29,7 @@ impl JsScript {
path,
config: JsonValue::default(),
interval: std::time::Duration::ZERO,
next_execution: None,
no_js_on_message_fun: true,
no_js_on_config_update_fun: true,
no_js_on_interval_fun: true,
Expand Down Expand Up @@ -116,24 +119,50 @@ impl JsScript {
Ok(())
}

/// Initialize the next execution time for this script's interval
/// Should be called after the script is loaded and interval is set
pub fn init_next_execution(&mut self) {
if !self.no_js_on_interval_fun && !self.interval.is_zero() {
self.next_execution = Some(Instant::now() + self.interval);
}
}

/// Check if this script should execute its interval function now
/// Returns true and updates next_execution if it's time to execute
pub fn should_execute_interval(&mut self, now: Instant) -> bool {
if self.no_js_on_interval_fun || self.interval.is_zero() {
return false;
}

match self.next_execution {
Some(deadline) if now >= deadline => {
// Time to execute - schedule next execution
self.next_execution = Some(now + self.interval);
true
}
None => {
// First execution - initialize and execute
self.next_execution = Some(now + self.interval);
true
}
_ => false,
}
}

/// Trigger the onInterval function of the JS module
///
/// The "onInterval" function is passed 2 arguments
/// - the current timestamp
/// - the current flow step config
///
/// Return zero, one or more messages
///
/// Note: Caller should check should_execute_interval() before calling this
pub async fn on_interval(
Comment on lines +160 to 161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see pros and cons to externalize or not the check if this call is timely. What are your motivation to move this check out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the functions the javascript module exports for tedge-flows to call, my expectation is that the methods on JsScript serve as a lightweight wrapper, doing minimal work beyond calling the function with the provided input. It also feels like a bit of a hack to pretend the function produced no output when it wasn't called at all (or if it entirely doesn't exist).

&self,
js: &JsRuntime,
timestamp: &DateTime,
) -> Result<Vec<Message>, FlowError> {
if self.no_js_on_interval_fun {
return Ok(vec![]);
}
if !timestamp.tick_now(self.interval) {
return Ok(vec![]);
}
debug!(target: "flows", "{}: onInterval({timestamp:?})", self.module_name());
let input = vec![timestamp.clone().into(), self.config.clone()];
js.call_function(&self.module_name(), "onInterval", input)
Expand Down
28 changes: 27 additions & 1 deletion crates/extensions/tedge_flows/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::path::Path;
use tedge_mqtt_ext::TopicFilter;
use tokio::fs::read_dir;
use tokio::fs::read_to_string;
use tokio::time::Instant;
use tracing::error;
use tracing::info;
use tracing::warn;
Expand Down Expand Up @@ -92,6 +93,29 @@ impl MessageProcessor {
topics
}

/// Get the next deadline for interval execution across all scripts
/// Returns None if no scripts have intervals configured
pub fn next_interval_deadline(&self) -> Option<tokio::time::Instant> {
self.flows
.values()
.flat_map(|flow| &flow.steps)
.filter_map(|step| step.script.next_execution)
.min()
}

/// Get the last deadline for interval execution across all scripts Returns
/// None if no scripts have intervals configured
///
/// This is intended for `tedge flows test` to ensure it processes all
/// intervals
pub fn last_interval_deadline(&self) -> Option<tokio::time::Instant> {
self.flows
.values()
.flat_map(|flow| &flow.steps)
.filter_map(|step| step.script.next_execution)
.max()
}

pub async fn on_message(
&mut self,
timestamp: &DateTime,
Expand All @@ -117,11 +141,12 @@ impl MessageProcessor {
pub async fn on_interval(
&mut self,
timestamp: &DateTime,
now: Instant,
) -> Vec<(String, Result<Vec<Message>, FlowError>)> {
let mut out_messages = vec![];
for (flow_id, flow) in self.flows.iter_mut() {
let flow_output = flow
.on_interval(&self.js_runtime, &mut self.stats, timestamp)
.on_interval(&self.js_runtime, &mut self.stats, timestamp, now)
.await;
if flow_output.is_err() {
self.stats.flow_on_interval_failed(flow_id);
Expand All @@ -145,6 +170,7 @@ impl MessageProcessor {
if step.script.path() == path {
match self.js_runtime.load_script(&mut step.script).await {
Ok(()) => {
step.script.init_next_execution();
info!(target: "flows", "Reloaded flow script {path}");
}
Err(e) => {
Expand Down
Loading