Skip to content

Commit e6cdbf0

Browse files
committed
feat: publish supported log types on software_update completion
1 parent 695fd9a commit e6cdbf0

File tree

11 files changed

+246
-17
lines changed

11 files changed

+246
-17
lines changed

crates/core/tedge_agent/src/agent.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ impl Agent {
384384
)
385385
.await?;
386386
converter_actor_builder.register_builtin_operation(&mut log_actor);
387+
converter_actor_builder.register_sync_signal_sink(&log_actor);
387388
Some(log_actor)
388389
} else {
389390
None

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::operation_workflows::message_box::CommandDispatcher;
2+
use crate::operation_workflows::message_box::SyncSignalDispatcher;
23
use crate::operation_workflows::persist::WorkflowRepository;
34
use crate::state_repository::state::AgentStateRepository;
45
use async_trait::async_trait;
@@ -53,6 +54,7 @@ pub struct WorkflowActor {
5354
pub(crate) log_dir: Utf8PathBuf,
5455
pub(crate) input_receiver: UnboundedLoggingReceiver<AgentInput>,
5556
pub(crate) builtin_command_dispatcher: CommandDispatcher,
57+
pub(crate) sync_signal_dispatcher: SyncSignalDispatcher,
5658
pub(crate) command_sender: DynSender<InternalCommandState>,
5759
pub(crate) mqtt_publisher: LoggingSender<MqttMessage>,
5860
pub(crate) script_runner: ClientMessageBox<Execute, std::io::Result<Output>>,
@@ -428,7 +430,10 @@ impl WorkflowActor {
428430
new_state: GenericCommandState,
429431
) -> Result<(), RuntimeError> {
430432
if new_state.is_finished() {
431-
self.finalize_builtin_command_update(new_state).await
433+
self.sync_dependent_actors(&new_state).await?;
434+
self.finalize_builtin_command_update(new_state).await?;
435+
436+
Ok(())
432437
} else {
433438
// As not finalized, the builtin state is sent back
434439
// to the builtin operation actor for further processing.
@@ -458,6 +463,18 @@ impl WorkflowActor {
458463
self.process_command_update(adapted_state).await
459464
}
460465

466+
async fn sync_dependent_actors(
467+
&mut self,
468+
command: &GenericCommandState,
469+
) -> Result<(), RuntimeError> {
470+
if let Some(command) = command.operation() {
471+
self.sync_signal_dispatcher
472+
.send(command.as_str().into())
473+
.await?;
474+
}
475+
Ok(())
476+
}
477+
461478
fn open_command_log(
462479
&mut self,
463480
state: &GenericCommandState,

crates/core/tedge_agent/src/operation_workflows/builder.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::operation_workflows::actor::InternalCommandState;
33
use crate::operation_workflows::actor::WorkflowActor;
44
use crate::operation_workflows::config::OperationConfig;
55
use crate::operation_workflows::message_box::CommandDispatcher;
6+
use crate::operation_workflows::message_box::SyncSignalDispatcher;
67
use crate::operation_workflows::persist::WorkflowRepository;
78
use crate::state_repository::state::agent_state_dir;
89
use crate::state_repository::state::AgentStateRepository;
@@ -22,13 +23,15 @@ use tedge_actors::RuntimeRequest;
2223
use tedge_actors::RuntimeRequestSink;
2324
use tedge_actors::Service;
2425
use tedge_actors::UnboundedLoggingReceiver;
26+
use tedge_api::commands::CmdMetaSyncSignal;
2527
use tedge_api::mqtt_topics::ChannelFilter::AnyCommand;
2628
use tedge_api::mqtt_topics::EntityFilter;
2729
use tedge_api::mqtt_topics::EntityTopicId;
2830
use tedge_api::mqtt_topics::MqttSchema;
2931
use tedge_api::workflow::GenericCommandData;
3032
use tedge_api::workflow::GenericCommandState;
3133
use tedge_api::workflow::OperationName;
34+
use tedge_api::workflow::SyncOnCommand;
3235
use tedge_file_system_ext::FsWatchEvent;
3336
use tedge_mqtt_ext::MqttMessage;
3437
use tedge_mqtt_ext::TopicFilter;
@@ -39,6 +42,7 @@ pub struct WorkflowActorBuilder {
3942
input_sender: DynSender<AgentInput>,
4043
input_receiver: UnboundedLoggingReceiver<AgentInput>,
4144
command_dispatcher: CommandDispatcher,
45+
sync_signal_dispatcher: SyncSignalDispatcher,
4246
command_sender: DynSender<InternalCommandState>,
4347
mqtt_publisher: LoggingSender<MqttMessage>,
4448
script_runner: ClientMessageBox<Execute, std::io::Result<Output>>,
@@ -65,6 +69,8 @@ impl WorkflowActorBuilder {
6569
let command_dispatcher = CommandDispatcher::default();
6670
let command_sender = input_sender.sender_clone();
6771

72+
let sync_signal_dispatcher = SyncSignalDispatcher::default();
73+
6874
let mqtt_publisher = mqtt_actor.get_sender();
6975
mqtt_actor.connect_sink(
7076
Self::subscriptions(&config.mqtt_schema, &config.device_topic_id),
@@ -81,6 +87,7 @@ impl WorkflowActorBuilder {
8187
input_sender,
8288
input_receiver,
8389
command_dispatcher,
90+
sync_signal_dispatcher,
8491
command_sender,
8592
mqtt_publisher,
8693
signal_sender,
@@ -98,7 +105,19 @@ impl WorkflowActorBuilder {
98105
actor.connect_sink(NoConfig, &self.input_sender);
99106
for (operation, sender) in actor.into_iter() {
100107
self.command_dispatcher
101-
.register_operation_handler(operation, sender)
108+
.register_operation_handler(operation, sender);
109+
}
110+
}
111+
112+
/// Register an actor to receive sync signals on completion of other commands
113+
pub fn register_sync_signal_sink<OperationActor>(&mut self, actor: &OperationActor)
114+
where
115+
OperationActor: MessageSink<CmdMetaSyncSignal> + SyncOnCommand,
116+
{
117+
let sender = actor.get_sender();
118+
for operation in actor.sync_on_commands() {
119+
self.sync_signal_dispatcher
120+
.register_sync_signal_sender(operation, sender.sender_clone());
102121
}
103122
}
104123

@@ -136,6 +155,7 @@ impl Builder<WorkflowActor> for WorkflowActorBuilder {
136155
log_dir: self.config.log_dir,
137156
input_receiver: self.input_receiver,
138157
builtin_command_dispatcher: self.command_dispatcher,
158+
sync_signal_dispatcher: self.sync_signal_dispatcher,
139159
mqtt_publisher: self.mqtt_publisher,
140160
command_sender: self.command_sender,
141161
script_runner: self.script_runner,

crates/core/tedge_agent/src/operation_workflows/message_box.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::collections::HashMap;
44
use tedge_actors::ChannelError;
55
use tedge_actors::DynSender;
66
use tedge_actors::Sender;
7+
use tedge_api::commands::CmdMetaSyncSignal;
8+
use tedge_api::mqtt_topics::OperationType;
79
use tedge_api::workflow::GenericCommandState;
810
use tedge_api::workflow::OperationName;
911

@@ -44,3 +46,29 @@ impl CommandDispatcher {
4446
self.senders.keys().cloned().collect()
4547
}
4648
}
49+
50+
#[derive(Default)]
51+
pub(crate) struct SyncSignalDispatcher {
52+
senders: HashMap<OperationType, Vec<DynSender<CmdMetaSyncSignal>>>,
53+
}
54+
55+
impl SyncSignalDispatcher {
56+
/// Register where to send sync signals for the given command type
57+
pub(crate) fn register_sync_signal_sender(
58+
&mut self,
59+
operation: OperationType,
60+
sender: DynSender<CmdMetaSyncSignal>,
61+
) {
62+
self.senders.entry(operation).or_default().push(sender);
63+
}
64+
65+
pub(crate) async fn send(&mut self, operation: OperationType) -> Result<(), ChannelError> {
66+
let Some(senders) = self.senders.get_mut(&operation) else {
67+
return Ok(());
68+
};
69+
for sender in senders {
70+
sender.send(()).await?;
71+
}
72+
Ok(())
73+
}
74+
}

crates/core/tedge_api/src/commands.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ pub trait CommandPayload {
298298
}
299299
}
300300

301+
pub type CmdMetaSyncSignal = ();
302+
301303
/// All the messages are serialized using json.
302304
pub trait Jsonify {
303305
fn from_json(json_str: &str) -> Result<Self, serde_json::Error>

crates/core/tedge_api/src/workflow/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,12 @@ pub enum IterationError {
506506
IndexOutOfBounds(usize),
507507
}
508508

509+
/// An actor builder must implement this if it to receive sync signals on completion of other commands
510+
pub trait SyncOnCommand {
511+
/// Return the list of operations for which this actor wants to receive sync signals
512+
fn sync_on_commands(&self) -> Vec<OperationType>;
513+
}
514+
509515
#[cfg(test)]
510516
mod tests {
511517
use super::GenericCommandState;

crates/extensions/tedge_log_manager/src/actor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tedge_actors::MessageReceiver;
1212
use tedge_actors::RuntimeError;
1313
use tedge_actors::Sender;
1414
use tedge_actors::SimpleMessageBox;
15+
use tedge_api::commands::CmdMetaSyncSignal;
1516
use tedge_api::commands::CommandStatus;
1617
use tedge_api::commands::LogUploadCmd;
1718
use tedge_api::commands::LogUploadCmdMetadata;
@@ -34,7 +35,7 @@ type MqttTopic = String;
3435
pub type LogUploadRequest = (MqttTopic, UploadRequest);
3536
pub type LogUploadResult = (MqttTopic, UploadResult);
3637

37-
fan_in_message_type!(LogInput[LogUploadCmd, FsWatchEvent, LogUploadResult] : Debug);
38+
fan_in_message_type!(LogInput[LogUploadCmd, CmdMetaSyncSignal, FsWatchEvent, LogUploadResult] : Debug);
3839
fan_in_message_type!(LogOutput[LogUploadCmd, LogUploadCmdMetadata] : Debug);
3940

4041
impl LogOutput {
@@ -80,6 +81,9 @@ impl Actor for LogManagerActor {
8081
LogInput::LogUploadResult((topic, result)) => {
8182
self.process_uploaded_log(&topic, result).await?;
8283
}
84+
LogInput::CmdMetaSyncSignal(_) => {
85+
self.reload_supported_log_types().await?;
86+
}
8387
}
8488
}
8589
Ok(())

crates/extensions/tedge_log_manager/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub struct LogManagerConfig {
2727
pub plugin_config_path: PathBuf,
2828
pub logtype_reload_topic: Topic,
2929
pub logfile_request_topic: TopicFilter,
30+
pub log_metadata_sync_topics: TopicFilter,
3031
pub sudo_enabled: bool,
3132
}
3233

@@ -62,6 +63,11 @@ impl LogManagerConfig {
6263
ChannelFilter::Command(OperationType::LogUpload),
6364
);
6465

66+
let log_metadata_sync_topics = mqtt_schema.topics(
67+
EntityFilter::Entity(&mqtt_device_topic_id),
68+
ChannelFilter::Command(OperationType::SoftwareUpdate),
69+
);
70+
6571
Ok(Self {
6672
mqtt_schema,
6773
config_dir,
@@ -72,6 +78,7 @@ impl LogManagerConfig {
7278
plugin_config_path,
7379
logtype_reload_topic,
7480
logfile_request_topic,
81+
log_metadata_sync_topics,
7582
sudo_enabled: true,
7683
})
7784
}

crates/extensions/tedge_log_manager/src/lib.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ use tedge_actors::RuntimeRequest;
2424
use tedge_actors::RuntimeRequestSink;
2525
use tedge_actors::Service;
2626
use tedge_actors::SimpleMessageBoxBuilder;
27+
use tedge_api::commands::CmdMetaSyncSignal;
2728
use tedge_api::commands::LogUploadCmd;
2829
use tedge_api::mqtt_topics::OperationType;
2930
use tedge_api::workflow::GenericCommandData;
3031
use tedge_api::workflow::GenericCommandState;
3132
use tedge_api::workflow::OperationName;
33+
use tedge_api::workflow::SyncOnCommand;
3234
use tedge_file_system_ext::FsWatchEvent;
3335
use tedge_utils::file::create_directory_with_defaults;
3436
use tedge_utils::file::move_file;
@@ -144,31 +146,41 @@ impl LogManagerBuilder {
144146

145147
/// List of MQTT topic filters the log actor has to subscribe to
146148
fn subscriptions(config: &LogManagerConfig) -> TopicFilter {
147-
config.logfile_request_topic.clone()
149+
let mut topics = config.logfile_request_topic.clone();
150+
topics.add_all(config.log_metadata_sync_topics.clone());
151+
topics
148152
}
149153

150154
/// Extract a log actor request from an MQTT message
151155
fn mqtt_message_parser(config: &LogManagerConfig) -> impl Fn(MqttMessage) -> Option<LogInput> {
152156
let logfile_request_topic = config.logfile_request_topic.clone();
157+
let log_metadata_sync_topics = config.log_metadata_sync_topics.clone();
153158
let mqtt_schema = config.mqtt_schema.clone();
154159
move |message| {
155-
if !logfile_request_topic.accept(&message) {
160+
if logfile_request_topic.accept(&message) {
161+
LogUploadCmd::parse(&mqtt_schema, message)
162+
.map_err(|err| {
163+
error!(
164+
target: "log plugins",
165+
"Incorrect log request payload: {}", err
166+
)
167+
})
168+
.unwrap_or(None)
169+
.map(|cmd| cmd.into())
170+
} else if log_metadata_sync_topics.accept(&message) {
171+
if let Ok(cmd) = GenericCommandState::from_command_message(&message) {
172+
if cmd.is_finished() {
173+
return Some(LogInput::CmdMetaSyncSignal(()));
174+
}
175+
}
176+
None
177+
} else {
156178
error!(
157179
target: "log plugins",
158180
"Received unexpected message on topic: {}", message.topic.name
159181
);
160-
return None;
182+
None
161183
}
162-
163-
LogUploadCmd::parse(&mqtt_schema, message)
164-
.map_err(|err| {
165-
error!(
166-
target: "log plugins",
167-
"Incorrect log request payload: {}", err
168-
)
169-
})
170-
.unwrap_or(None)
171-
.map(|cmd| cmd.into())
172184
}
173185
}
174186

@@ -240,3 +252,16 @@ impl IntoIterator for &LogManagerBuilder {
240252
vec![(OperationType::LogUpload.to_string(), sender.into())].into_iter()
241253
}
242254
}
255+
256+
impl MessageSink<CmdMetaSyncSignal> for LogManagerBuilder {
257+
fn get_sender(&self) -> DynSender<CmdMetaSyncSignal> {
258+
self.box_builder.get_sender().sender_clone()
259+
}
260+
}
261+
262+
impl SyncOnCommand for LogManagerBuilder {
263+
/// Return the list of operations for which this actor wants to receive sync signals
264+
fn sync_on_commands(&self) -> Vec<OperationType> {
265+
vec![OperationType::SoftwareUpdate]
266+
}
267+
}

0 commit comments

Comments
 (0)