Skip to content

Commit fc802e4

Browse files
Merge pull request #3805 from didier-wenzek/feat/flow-source-types
feat: tedge flows consuming and producing file content
2 parents 0dde8cd + 21c84fe commit fc802e4

File tree

25 files changed

+1628
-140
lines changed

25 files changed

+1628
-140
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ tedge_test_utils = { path = "crates/tests/tedge_test_utils" }
6969
tedge_timer_ext = { path = "crates/extensions/tedge_timer_ext" }
7070
tedge_uploader_ext = { path = "crates/extensions/tedge_uploader_ext" }
7171
tedge_utils = { path = "crates/common/tedge_utils" }
72+
tedge_watch_ext = { path = "crates/extensions/tedge_watch_ext" }
7273
upload = { path = "crates/common/upload" }
7374

7475
# external dependencies

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ impl TestCommand {
7474
}
7575
}
7676
};
77+
let source = SourceTag::Mqtt;
7778
processor
78-
.on_message(timestamp, &message)
79+
.on_message(timestamp, &source, &message)
7980
.await
8081
.into_iter()
8182
.for_each(|msg| self.print_messages(msg))
@@ -89,9 +90,9 @@ impl TestCommand {
8990
.for_each(|msg| self.print_messages(msg))
9091
}
9192

92-
fn print_messages(&self, (flow, messages): (String, Result<Vec<Message>, FlowError>)) {
93-
match messages {
94-
Ok(mut messages) => {
93+
fn print_messages(&self, result: FlowResult) {
94+
match result {
95+
FlowResult::Ok { mut messages, .. } => {
9596
if self.base64_output {
9697
for message in messages.iter_mut() {
9798
message.payload = BASE64_STANDARD.encode(&message.payload).into_bytes();
@@ -101,8 +102,8 @@ impl TestCommand {
101102
println!("{}", message);
102103
}
103104
}
104-
Err(err) => {
105-
tracing::error!("Error in {flow}: {}", err)
105+
FlowResult::Err { flow, error, .. } => {
106+
tracing::error!("Error in {flow}: {}", error)
106107
}
107108
}
108109
}

crates/core/tedge_mapper/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ tedge_mqtt_ext = { workspace = true }
3636
tedge_signal_ext = { workspace = true }
3737
tedge_timer_ext = { workspace = true }
3838
tedge_uploader_ext = { workspace = true }
39+
tedge_watch_ext = { workspace = true }
3940
tracing = { workspace = true }
4041
yansi = { workspace = true }
4142

crates/core/tedge_mapper/src/flows/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::TEdgeComponent;
33
use tedge_config::TEdgeConfig;
44
use tedge_file_system_ext::FsWatchActorBuilder;
55
use tedge_flows::FlowsMapperBuilder;
6+
use tedge_watch_ext::WatchActorBuilder;
67

78
pub struct GenMapper;
89

@@ -21,9 +22,13 @@ impl TEdgeComponent for GenMapper {
2122
flows_mapper.connect(&mut mqtt_actor);
2223
flows_mapper.connect_fs(&mut fs_actor);
2324

25+
let mut cmd_watcher_actor = WatchActorBuilder::new();
26+
cmd_watcher_actor.connect(&mut flows_mapper);
27+
2428
runtime.spawn(flows_mapper).await?;
2529
runtime.spawn(mqtt_actor).await?;
2630
runtime.spawn(fs_actor).await?;
31+
runtime.spawn(cmd_watcher_actor).await?;
2732
runtime.run_to_completion().await?;
2833
Ok(())
2934
}

crates/extensions/tedge_flows/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ serde_json = { workspace = true }
2727
tedge_actors = { workspace = true }
2828
tedge_file_system_ext = { workspace = true }
2929
tedge_mqtt_ext = { workspace = true }
30+
tedge_watch_ext = { workspace = true }
3031
thiserror = { workspace = true }
3132
time = { workspace = true }
3233
tokio = { workspace = true, features = ["fs", "macros", "time", "sync"] }

0 commit comments

Comments
 (0)