From caf58c515723a9f65b79e795ff0c518086eed2e7 Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Tue, 15 Apr 2025 20:35:57 +0200 Subject: [PATCH 1/9] Refactored pipeline registry storage: moved from tokio async mutexes to sync, moved the functions to the proper namespace. Implemented pipeline support in the Replay service. --- .../src/metrics/pipeline_metric_builder.rs | 4 +- savant_core/src/pipeline.rs | 51 +++++++++- savant_core/src/telemetry.rs | 4 +- savant_core/src/webserver.rs | 55 +---------- services/replay/replay/assets/test.json | 8 +- services/replay/replaydb/Cargo.toml | 1 + services/replay/replaydb/assets/rocksdb.json | 8 +- .../replaydb/assets/rocksdb_opt_out.json | 8 +- .../replaydb/src/service/configuration.rs | 11 ++- .../replaydb/src/service/rocksdb_service.rs | 5 + .../replay/replaydb/src/stream_processor.rs | 92 ++++++++++++++++--- .../replay/samples/file_restreaming/README.md | 2 +- .../file_restreaming/replay_config.json | 7 +- 13 files changed, 181 insertions(+), 75 deletions(-) diff --git a/savant_core/src/metrics/pipeline_metric_builder.rs b/savant_core/src/metrics/pipeline_metric_builder.rs index a72d9177..8c430fd7 100644 --- a/savant_core/src/metrics/pipeline_metric_builder.rs +++ b/savant_core/src/metrics/pipeline_metric_builder.rs @@ -1,6 +1,6 @@ use crate::metrics::{get_or_create_counter_family, get_or_create_gauge_family}; +use crate::pipeline::get_registered_pipelines; use crate::rust::FrameProcessingStatRecordType; -use crate::webserver::get_registered_pipelines; use log::debug; #[derive(Debug)] @@ -29,7 +29,7 @@ impl PipelineMetricBuilder { let stage_latency_label_names = ["record_type", "destination_stage_name", "source_stage_name"].as_slice(); - let registered_pipelines = get_registered_pipelines().await; + let registered_pipelines = get_registered_pipelines(); debug!( "Found {} registered pipeline(s)", registered_pipelines.len() diff --git a/savant_core/src/pipeline.rs b/savant_core/src/pipeline.rs index 0b2f3d65..25afe048 100644 --- a/savant_core/src/pipeline.rs +++ b/savant_core/src/pipeline.rs @@ -3,7 +3,11 @@ use std::time::SystemTime; use anyhow::Result; use hashbrown::HashMap; +use log::debug; +use log::error; +use log::info; use opentelemetry::Context; +use parking_lot::Mutex; pub use implementation::PipelineConfiguration; pub use implementation::PipelineConfigurationBuilder; @@ -15,10 +19,55 @@ use crate::primitives::frame::VideoFrameProxy; use crate::primitives::frame_batch::VideoFrameBatch; use crate::primitives::frame_update::VideoFrameUpdate; use crate::primitives::object::BorrowedVideoObject; -use crate::webserver::{register_pipeline, unregister_pipeline}; +use lazy_static::lazy_static; const MAX_TRACKED_STREAMS: usize = 8192; // defines how many streams are tracked for the frame ordering +// + +// pipelines: Arc>>>, + +lazy_static! { + static ref PIPELINES: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); +} + +pub(crate) fn register_pipeline(pipeline: Arc) { + let pipelines = PIPELINES.clone(); + let mut bind = pipelines.lock(); + let name = pipeline.get_name(); + let entry = bind.get(&name); + if entry.is_some() { + let message = format!("Pipeline with name {} already exists in registry.", &name); + error!("{}", message); + panic!("{}", message); + } + bind.insert(name.clone(), pipeline.clone()); + info!("Pipeline {} registered.", name); +} + +pub(crate) fn unregister_pipeline(pipeline: Arc) { + let stats = PIPELINES.clone(); + let pipeline_name = pipeline.get_name(); + let mut bind = stats.lock(); + let prev_len = bind.len(); + debug!("Removing pipeline {} from stats.", &pipeline_name); + bind.remove(&pipeline_name); + if bind.len() == prev_len { + error!("Failed to remove pipeline from stats."); + } +} + +pub(crate) fn get_registered_pipelines() -> HashMap> { + let s = PIPELINES.lock(); + s.clone() +} + +pub fn get_pipeline(name: &str) -> Option> { + let s = PIPELINES.lock(); + s.get(name).cloned() +} + pub mod stage; pub mod stage_function_loader; pub mod stage_plugin_sample; diff --git a/savant_core/src/telemetry.rs b/savant_core/src/telemetry.rs index 447127d0..29f7e4a9 100644 --- a/savant_core/src/telemetry.rs +++ b/savant_core/src/telemetry.rs @@ -1,4 +1,3 @@ -use crate::get_or_init_async_runtime; use log::error; use opentelemetry::global; use opentelemetry_jaeger_propagator::Propagator; @@ -251,8 +250,7 @@ pub fn init(config: &TelemetryConfiguration) { match configurator.get() { Some(_) => panic!("Open Telemetry has been configured"), None => { - let runtime = get_or_init_async_runtime(); - let c = runtime.block_on(async { Configurator::new("savant", config) }); + let c = Configurator::new("savant", config); let result = configurator.set(c); if result.is_err() { // should not happen diff --git a/savant_core/src/webserver.rs b/savant_core/src/webserver.rs index c574f54d..e2447ac0 100644 --- a/savant_core/src/webserver.rs +++ b/savant_core/src/webserver.rs @@ -10,7 +10,6 @@ use tokio::sync::Mutex; use crate::get_or_init_async_runtime; use crate::metrics::metric_collector::SystemMetricCollector; use crate::metrics::pipeline_metric_builder::PipelineMetricBuilder; -use crate::pipeline::implementation; use crate::primitives::rust::AttributeSet; use crate::primitives::Attribute; use crate::protobuf::ToProtobuf; @@ -25,7 +24,7 @@ use anyhow::bail; use futures_util::StreamExt; use hashbrown::HashMap; use lazy_static::lazy_static; -use log::{debug, error, info, warn}; +use log::{error, info, warn}; use moka::future::Cache; use moka::Expiry; use prometheus_client::encoding::text::encode; @@ -78,7 +77,6 @@ pub struct KvsOperation { #[allow(clippy::type_complexity)] struct WsData { - pipelines: Arc>>>, status: Arc>, shutdown_token: Arc>, shutdown_status: Arc>, @@ -131,7 +129,6 @@ impl WsData { } }); WsData { - pipelines: Arc::new(Mutex::new(HashMap::new())), status: Arc::new(Mutex::new(PipelineStatus::Stopped)), shutdown_token: Arc::new(OnceLock::new()), shutdown_status: Arc::new(OnceLock::new()), @@ -228,52 +225,6 @@ lazy_static! { static ref PID: Mutex = Mutex::new(0); } -pub(crate) fn register_pipeline(pipeline: Arc) { - let runtime = get_or_init_async_runtime(); - let pipelines = WS_DATA.pipelines.clone(); - runtime.block_on(async move { - let mut bind = pipelines.lock().await; - let name = pipeline.get_name(); - let entry = bind.get(&name); - if entry.is_some() { - let message = format!("Pipeline with name {} already exists in registry.", &name); - error!("{}", message); - panic!("{}", message); - } - bind.insert(name.clone(), pipeline.clone()); - info!("Pipeline {} registered.", name); - }); -} - -pub(crate) fn unregister_pipeline(pipeline: Arc) { - let runtime = get_or_init_async_runtime(); - let stats = WS_DATA.pipelines.clone(); - let pipeline_name = pipeline.get_name(); - runtime.block_on(async move { - let mut bind = stats.lock().await; - let prev_len = bind.len(); - debug!("Removing pipeline {} from stats.", &pipeline_name); - bind.remove(&pipeline_name); - if bind.len() == prev_len { - error!("Failed to remove pipeline from stats."); - } - }); -} - -pub(crate) async fn get_registered_pipelines() -> HashMap> { - let s = WS_DATA.pipelines.lock().await; - s.clone() -} - -pub fn get_pipeline(name: &str) -> Option> { - let runtime = get_or_init_async_runtime(); - let pipelines = WS_DATA.pipelines.clone(); - runtime.block_on(async { - let bind = pipelines.lock().await; - bind.get(name).cloned() - }) -} - pub fn set_status(s: PipelineStatus) -> anyhow::Result<()> { WS_DATA.set_status(s) } @@ -552,10 +503,10 @@ mod tests { set_extra_labels, }; use crate::pipeline::implementation::create_test_pipeline; + use crate::pipeline::register_pipeline; use crate::test::gen_frame; use crate::webserver::{ - init_webserver, register_pipeline, set_shutdown_token, set_status, stop_webserver, - PipelineStatus, + init_webserver, set_shutdown_token, set_status, stop_webserver, PipelineStatus, }; use hashbrown::HashMap; use prometheus_client::registry::Unit; diff --git a/services/replay/replay/assets/test.json b/services/replay/replay/assets/test.json index e6fbd9e6..3b5abf5d 100644 --- a/services/replay/replay/assets/test.json +++ b/services/replay/replay/assets/test.json @@ -29,7 +29,13 @@ "send_hwm": 1000, "receive_hwm": 100, "inflight_ops": 100 - } + }, + "stats_frame_period": null, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}", diff --git a/services/replay/replaydb/Cargo.toml b/services/replay/replaydb/Cargo.toml index cf2226c8..d2a8b4eb 100644 --- a/services/replay/replaydb/Cargo.toml +++ b/services/replay/replaydb/Cargo.toml @@ -20,6 +20,7 @@ bincode = { workspace = true } derive_builder = { workspace = true } env_logger = { workspace = true } hashbrown = { workspace = true } +lazy_static = { workspace = true } log = { workspace = true } md-5 = { workspace = true } mini-moka = { workspace = true } diff --git a/services/replay/replaydb/assets/rocksdb.json b/services/replay/replaydb/assets/rocksdb.json index 4890c6da..0139800d 100644 --- a/services/replay/replaydb/assets/rocksdb.json +++ b/services/replay/replaydb/assets/rocksdb.json @@ -14,7 +14,13 @@ "job_eviction_ttl": { "secs": 60, "nanos": 0 - } + }, + "stats_frame_period": null, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}", diff --git a/services/replay/replaydb/assets/rocksdb_opt_out.json b/services/replay/replaydb/assets/rocksdb_opt_out.json index f675987b..e2d644b3 100644 --- a/services/replay/replaydb/assets/rocksdb_opt_out.json +++ b/services/replay/replaydb/assets/rocksdb_opt_out.json @@ -14,7 +14,13 @@ "job_eviction_ttl": { "secs": 60, "nanos": 0 - } + }, + "stats_frame_period": null, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}", diff --git a/services/replay/replaydb/src/service/configuration.rs b/services/replay/replaydb/src/service/configuration.rs index 6e4fb527..aa652bd8 100644 --- a/services/replay/replaydb/src/service/configuration.rs +++ b/services/replay/replaydb/src/service/configuration.rs @@ -1,6 +1,9 @@ use crate::job_writer::{SinkConfiguration, SinkOptions}; use anyhow::{bail, Result}; -use savant_core::transport::zeromq::{NonBlockingReader, ReaderConfigBuilder}; +use savant_core::{ + telemetry::init_from_file, + transport::zeromq::{NonBlockingReader, ReaderConfigBuilder}, +}; use serde::{Deserialize, Serialize}; use std::result; use std::time::Duration; @@ -50,6 +53,9 @@ pub struct CommonConfiguration { pub job_writer_cache_ttl: Duration, pub job_eviction_ttl: Duration, pub default_job_sink_options: Option, + pub telemetry_config_file: Option, + pub stats_frame_period: Option, + pub stats_timestamp_period: Option, } #[config] @@ -66,6 +72,9 @@ impl ServiceConfiguration { if self.common.management_port <= 1024 { bail!("Management port must be set to a value greater than 1024!"); } + if let Some(telemetry_config_file) = &self.common.telemetry_config_file { + init_from_file(telemetry_config_file.as_str()); + } Ok(()) } diff --git a/services/replay/replaydb/src/service/rocksdb_service.rs b/services/replay/replaydb/src/service/rocksdb_service.rs index 0d887a8b..d892485d 100644 --- a/services/replay/replaydb/src/service/rocksdb_service.rs +++ b/services/replay/replaydb/src/service/rocksdb_service.rs @@ -72,6 +72,11 @@ impl TryFrom<&ServiceConfiguration> for RocksDbStreamProcessor { output, configuration.common.stats_period, configuration.common.pass_metadata_only, + configuration.common.stats_frame_period, + configuration + .common + .stats_timestamp_period + .map(|d| d.as_millis() as i64), )) } } diff --git a/services/replay/replaydb/src/stream_processor.rs b/services/replay/replaydb/src/stream_processor.rs index d9fe4150..4914a1da 100644 --- a/services/replay/replaydb/src/stream_processor.rs +++ b/services/replay/replaydb/src/stream_processor.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use savant_core::message::Message; +use savant_core::pipeline::{Pipeline, PipelineConfigurationBuilder, PipelineStagePayloadType}; use savant_core::transport::zeromq::{ NonBlockingReader, NonBlockingWriter, ReaderResult, WriterResult, }; @@ -27,6 +28,7 @@ struct StreamProcessor { stats_period: Duration, send_metadata_only: bool, stop_flag: bool, + pipeline: Pipeline, } impl StreamProcessor @@ -39,8 +41,42 @@ where output: Option, stats_period: Duration, send_metadata_only: bool, - ) -> Self { - Self { + frame_period: Option, + timestamp_period: Option, + ) -> Result { + let conf = PipelineConfigurationBuilder::default() + .frame_period(frame_period) + .timestamp_period(timestamp_period) + .build() + .expect("Failed to build pipeline configuration"); + + let pipeline = Pipeline::new( + "replay", + vec![ + ( + "store".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ( + "egress".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ( + "cleanup".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ], + conf, + ) + .expect("Failed to create pipeline"); + + Ok(Self { db, input, output, @@ -52,7 +88,8 @@ where last_stats: Instant::now(), send_metadata_only, stop_flag: false, - } + pipeline, + }) } async fn receive_message(&mut self) -> Result { @@ -147,6 +184,16 @@ where routing_id, data, } => { + let frame_id = if message.is_video_frame() { + let context = message.get_span_context().extract(); + Some(self.pipeline.add_frame_with_telemetry( + "store", + message.as_video_frame().unwrap(), + context, + )?) + } else { + None + }; self.stats.packet_counter += 1; self.stats.byte_counter += data.iter().map(|v| v.len() as u64).sum::(); log::debug!( @@ -169,6 +216,9 @@ where .await .add_message(&message, &topic, &data) .await?; + if let Some(frame_id) = frame_id { + self.pipeline.move_as_is("egress", vec![frame_id])?; + } } let data_slice = if self.send_metadata_only { vec![] @@ -178,6 +228,10 @@ where self.send_message(std::str::from_utf8(&topic)?, &message, &data_slice) .await?; + if let Some(frame_id) = frame_id { + self.pipeline.move_as_is("cleanup", vec![frame_id])?; + self.pipeline.delete(frame_id)?; + } } ReaderResult::Timeout => { log::debug!( @@ -251,14 +305,21 @@ impl RocksDbStreamProcessor { output: Option, stats_period: Duration, send_metadata_only: bool, + stats_frame_period: Option, + stats_timestamp_period: Option, ) -> Self { - Self(StreamProcessor::new( - db, - input, - output, - stats_period, - send_metadata_only, - )) + Self( + StreamProcessor::new( + db, + input, + output, + stats_period, + send_metadata_only, + stats_frame_period, + stats_timestamp_period, + ) + .unwrap(), + ) } pub async fn run_once(&mut self) -> Result<()> { @@ -360,7 +421,10 @@ mod tests { Some(out_writer), Duration::from_secs(30), false, - ); + None, + None, + ) + .unwrap(); let f = gen_properly_filled_frame(true); let uuid = f.get_uuid_u128(); @@ -396,6 +460,12 @@ mod tests { ReaderResult::TooShort(_) => { panic!("Too short"); } + ReaderResult::MessageVersionMismatch { + topic, + routing_id, + sender_version, + expected_version, + } => panic!("Message version mismatch: topic: {:?}, routing_id: {:?}, sender_version: {:?}, expected_version: {:?}", topic, routing_id, sender_version, expected_version), } Ok(()) } diff --git a/services/replay/samples/file_restreaming/README.md b/services/replay/samples/file_restreaming/README.md index 61f4feff..5ac22410 100644 --- a/services/replay/samples/file_restreaming/README.md +++ b/services/replay/samples/file_restreaming/README.md @@ -58,7 +58,7 @@ We are going to use the file source adapter to ingest the video file to Replay. docker run --rm -it --name source-video-files-test \ --network host \ -e FILE_TYPE=video \ - -e SYNC_OUTPUT=False \ + -e SYNC_OUTPUT=True \ -e ZMQ_ENDPOINT=dealer+connect:tcp://127.0.0.1:5555 \ -e SOURCE_ID=in-video \ -e LOCATION=/data/shuffle_dance.mp4 \ diff --git a/services/replay/samples/file_restreaming/replay_config.json b/services/replay/samples/file_restreaming/replay_config.json index d67b17a6..d82ac5bd 100644 --- a/services/replay/samples/file_restreaming/replay_config.json +++ b/services/replay/samples/file_restreaming/replay_config.json @@ -29,7 +29,12 @@ "send_hwm": 1000, "receive_hwm": 100, "inflight_ops": 100 - } + }, + "stats_timestamp_period": { + "secs": 10, + "nanos": 0 + }, + "telemetry_config_file": null }, "in_stream": { "url": "router+bind:tcp://127.0.0.1:5555", From ea4c21576b765e6305b3a7327d8fb7fb9397035f Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Tue, 15 Apr 2025 20:40:38 +0200 Subject: [PATCH 2/9] cargo fmt --- savant_protobuf/src/generated.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/savant_protobuf/src/generated.rs b/savant_protobuf/src/generated.rs index 841c78b3..076e4efc 100644 --- a/savant_protobuf/src/generated.rs +++ b/savant_protobuf/src/generated.rs @@ -326,10 +326,11 @@ pub struct ResultingSize { } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct VideoFrameTransformation { - #[prost(oneof = "video_frame_transformation::Transformation", tags = "1, 2, 3, 4")] - pub transformation: ::core::option::Option< - video_frame_transformation::Transformation, - >, + #[prost( + oneof = "video_frame_transformation::Transformation", + tags = "1, 2, 3, 4" + )] + pub transformation: ::core::option::Option, } /// Nested message and enum types in `VideoFrameTransformation`. pub mod video_frame_transformation { @@ -416,10 +417,8 @@ pub struct Message { #[prost(string, repeated, tag = "2")] pub routing_labels: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(map = "string, string", tag = "3")] - pub propagated_context: ::std::collections::HashMap< - ::prost::alloc::string::String, - ::prost::alloc::string::String, - >, + pub propagated_context: + ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, #[prost(uint64, tag = "4")] pub seq_id: u64, #[prost(oneof = "message::Content", tags = "5, 6, 7, 8, 9, 10, 11")] From 08534021a6a69c2515d7bba46b14b473242707db Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Tue, 15 Apr 2025 20:45:59 +0200 Subject: [PATCH 3/9] fixed error handling --- savant_core/src/pipeline.rs | 4 +++- savant_protobuf/src/generated.rs | 15 ++++++++------- services/replay/replaydb/src/stream_processor.rs | 6 ++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/savant_core/src/pipeline.rs b/savant_core/src/pipeline.rs index 25afe048..139c1ccb 100644 --- a/savant_core/src/pipeline.rs +++ b/savant_core/src/pipeline.rs @@ -54,7 +54,9 @@ pub(crate) fn unregister_pipeline(pipeline: Arc) { debug!("Removing pipeline {} from stats.", &pipeline_name); bind.remove(&pipeline_name); if bind.len() == prev_len { - error!("Failed to remove pipeline from stats."); + let message = format!("Failed to remove pipeline {} from stats.", &pipeline_name); + error!("{}", message); + panic!("{}", message); } } diff --git a/savant_protobuf/src/generated.rs b/savant_protobuf/src/generated.rs index 076e4efc..841c78b3 100644 --- a/savant_protobuf/src/generated.rs +++ b/savant_protobuf/src/generated.rs @@ -326,11 +326,10 @@ pub struct ResultingSize { } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct VideoFrameTransformation { - #[prost( - oneof = "video_frame_transformation::Transformation", - tags = "1, 2, 3, 4" - )] - pub transformation: ::core::option::Option, + #[prost(oneof = "video_frame_transformation::Transformation", tags = "1, 2, 3, 4")] + pub transformation: ::core::option::Option< + video_frame_transformation::Transformation, + >, } /// Nested message and enum types in `VideoFrameTransformation`. pub mod video_frame_transformation { @@ -417,8 +416,10 @@ pub struct Message { #[prost(string, repeated, tag = "2")] pub routing_labels: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(map = "string, string", tag = "3")] - pub propagated_context: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + pub propagated_context: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, #[prost(uint64, tag = "4")] pub seq_id: u64, #[prost(oneof = "message::Content", tags = "5, 6, 7, 8, 9, 10, 11")] diff --git a/services/replay/replaydb/src/stream_processor.rs b/services/replay/replaydb/src/stream_processor.rs index 4914a1da..db566970 100644 --- a/services/replay/replaydb/src/stream_processor.rs +++ b/services/replay/replaydb/src/stream_processor.rs @@ -47,8 +47,7 @@ where let conf = PipelineConfigurationBuilder::default() .frame_period(frame_period) .timestamp_period(timestamp_period) - .build() - .expect("Failed to build pipeline configuration"); + .build()?; let pipeline = Pipeline::new( "replay", @@ -73,8 +72,7 @@ where ), ], conf, - ) - .expect("Failed to create pipeline"); + )?; Ok(Self { db, From dd0445809e7c6a6c6edbe7b33a0736bb5aabd40d Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Wed, 16 Apr 2025 09:39:01 +0200 Subject: [PATCH 4/9] fixed github action to address: https://gh.io/gha-cache-sunset --- .github/workflows/service-replay.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/service-replay.yml b/.github/workflows/service-replay.yml index 6bc4cfa4..98375420 100644 --- a/.github/workflows/service-replay.yml +++ b/.github/workflows/service-replay.yml @@ -24,6 +24,7 @@ jobs: contents: read packages: write id-token: write + deployments: write strategy: matrix: include: From 8cfbb63710da1b44ca2b3a1662d8a751c0518eb6 Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Wed, 16 Apr 2025 10:00:39 +0200 Subject: [PATCH 5/9] pipeline stages optimization --- .../replay/replaydb/src/stream_processor.rs | 52 ++++++++++++------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/services/replay/replaydb/src/stream_processor.rs b/services/replay/replaydb/src/stream_processor.rs index db566970..9ca70a38 100644 --- a/services/replay/replaydb/src/stream_processor.rs +++ b/services/replay/replaydb/src/stream_processor.rs @@ -49,30 +49,34 @@ where .timestamp_period(timestamp_period) .build()?; - let pipeline = Pipeline::new( - "replay", - vec![ - ( - "store".to_string(), - PipelineStagePayloadType::Frame, - None, - None, - ), + let mut stages = vec![ + ( + "store".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ( + "cleanup".to_string(), + PipelineStagePayloadType::Frame, + None, + None, + ), + ]; + + if output.is_some() { + stages.insert( + 1, ( "egress".to_string(), PipelineStagePayloadType::Frame, None, None, ), - ( - "cleanup".to_string(), - PipelineStagePayloadType::Frame, - None, - None, - ), - ], - conf, - )?; + ); + } + + let pipeline = Pipeline::new("replay", stages, conf)?; Ok(Self { db, @@ -215,7 +219,12 @@ where .add_message(&message, &topic, &data) .await?; if let Some(frame_id) = frame_id { - self.pipeline.move_as_is("egress", vec![frame_id])?; + let stage_name = if self.output.is_some() { + "egress" + } else { + "cleanup" + }; + self.pipeline.move_as_is(stage_name, vec![frame_id])?; } } let data_slice = if self.send_metadata_only { @@ -226,8 +235,11 @@ where self.send_message(std::str::from_utf8(&topic)?, &message, &data_slice) .await?; + if let Some(frame_id) = frame_id { - self.pipeline.move_as_is("cleanup", vec![frame_id])?; + if self.output.is_some() { + self.pipeline.move_as_is("cleanup", vec![frame_id])?; + } self.pipeline.delete(frame_id)?; } } From 3c9b7853b260aa5bfdfedd6cc153d3df55253be1 Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Wed, 16 Apr 2025 10:38:00 +0200 Subject: [PATCH 6/9] fixed github action to address: https://gh.io/gha-cache-sunset --- .github/workflows/service-replay.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/service-replay.yml b/.github/workflows/service-replay.yml index 98375420..e3025c28 100644 --- a/.github/workflows/service-replay.yml +++ b/.github/workflows/service-replay.yml @@ -70,8 +70,8 @@ jobs: tags: "${{matrix.tag}}:${{ env.GITHUB_REF_SLUG }}" load: true context: . - cache-from: type=gha - cache-to: type=gha,mode=max + # cache-from: type=gha + # cache-to: type=gha,mode=max # Login against a Docker registry except on PR # https://github.com/docker/login-action @@ -101,8 +101,8 @@ jobs: tags: "${{matrix.tag}}:${{ env.TAG }}" push: true context: . - cache-from: type=gha - cache-to: type=gha,mode=max + # cache-from: type=gha + # cache-to: type=gha,mode=max - name: Configure Docker rolling release image tag ${{matrix.tag}}:${{ env.GITHUB_REF_SLUG }} if: github.ref == 'refs/heads/main' || github.ref == 'refs/tags/*' @@ -122,5 +122,5 @@ jobs: tags: "${{matrix.tag}}:${{ env.SAVANT_RS_VERSION }}-rolling" push: true context: . - cache-from: type=gha - cache-to: type=gha,mode=max + # cache-from: type=gha + # cache-to: type=gha,mode=max From 76be508a777c769495b482b2fa2a13fdf5f21a65 Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Wed, 16 Apr 2025 14:08:47 +0200 Subject: [PATCH 7/9] doc update --- docs/source/services/replay/2_installation.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/source/services/replay/2_installation.rst b/docs/source/services/replay/2_installation.rst index a3280aa4..a3cadb85 100644 --- a/docs/source/services/replay/2_installation.rst +++ b/docs/source/services/replay/2_installation.rst @@ -197,6 +197,18 @@ Configuration Parameters - Default sink options to be applied to jobs if they don't specify their own options. If not set, jobs must provide their own sink options. - ``null`` - See ``out_stream.options`` format. + * - ``common.telemetry_config_file`` + - The path to a file containing telemetry configuration. When set, the service loads telemetry settings from this file. + - ``null`` + - ``"/opt/telemetry_config.json"`` + * - ``common.stats_frame_period`` + - Defines how frequently the service should report statistics based on the number of frames processed. When set, statistics are logged after processing the specified number of frames. + - ``null`` + - ``1000`` + * - ``common.stats_timestamp_period`` + - Defines how frequently the service should report statistics based on elapsed time. Controls the time interval between statistics reports. + - ``null`` + - ``{"secs": 10, "nanos": 0}`` * - ``in_stream.url`` - The URL for the data ingress in Savant ZMQ format. - ``router+bind:tcp://0.0.0.0:5555`` From 4a7e3221851008da75195170f14dd21dccc341ff Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Wed, 16 Apr 2025 14:16:56 +0200 Subject: [PATCH 8/9] documentation update --- .../source/services/replay/2_installation.rst | 80 +------------------ .../src/zeromq_src/message_handlers.rs | 2 +- 2 files changed, 3 insertions(+), 79 deletions(-) diff --git a/docs/source/services/replay/2_installation.rst b/docs/source/services/replay/2_installation.rst index a3cadb85..50a7e83c 100644 --- a/docs/source/services/replay/2_installation.rst +++ b/docs/source/services/replay/2_installation.rst @@ -70,84 +70,8 @@ Configuration File The configuration file is a JSON file that contains the following parameters: -.. code-block:: json - - { - "common": { - "pass_metadata_only": false, - "management_port": 8080, - "stats_period": { - "secs": 60, - "nanos": 0 - }, - "job_writer_cache_max_capacity": 1000, - "job_writer_cache_ttl": { - "secs": 60, - "nanos": 0 - }, - "job_eviction_ttl": { - "secs": 60, - "nanos": 0 - }, - "default_job_sink_options": { - "send_timeout": { - "secs": 1, - "nanos": 0 - }, - "send_retries": 3, - "receive_timeout": { - "secs": 1, - "nanos": 0 - }, - "receive_retries": 3, - "send_hwm": 1000, - "receive_hwm": 100, - "inflight_ops": 100 - } - }, - "in_stream": { - "url": "router+bind:tcp://0.0.0.0:5555", - "options": { - "receive_timeout": { - "secs": 1, - "nanos": 0 - }, - "receive_hwm": 1000, - "topic_prefix_spec": { - "none": null - }, - "source_cache_size": 1000, - "inflight_ops": 100 - } - }, - "out_stream": { - "url": "pub+bind:tcp://0.0.0.0:5556", - "options": { - "send_timeout": { - "secs": 1, - "nanos": 0 - }, - "send_retries": 3, - "receive_timeout": { - "secs": 1, - "nanos": 0 - }, - "receive_retries": 3, - "send_hwm": 1000, - "receive_hwm": 100, - "inflight_ops": 100 - } - }, - "storage": { - "rocksdb": { - "path": "${DB_PATH:-/tmp/rocksdb}", - "data_expiration_ttl": { - "secs": 60, - "nanos": 0 - } - } - } - } +.. literalinclude:: ../../../../services/replay/replay/assets/test.json + :language: json The above-mentioned configuration file is used by default, when you launch Replay without specifying the configuration file. You can override the default configuration by providing your own configuration file and specifying it in the launch command: diff --git a/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs b/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs index 396d2ba6..3712e794 100644 --- a/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs +++ b/savant_gstreamer_elements/src/zeromq_src/message_handlers.rs @@ -3,13 +3,13 @@ use gstreamer::subclass::prelude::*; use gstreamer::{prelude::*, FlowError}; use gstreamer_base::subclass::base_src::CreateSuccess; use savant_core::message::{save_message, Message}; +use savant_core::pipeline::get_pipeline; use savant_core::primitives::eos::EndOfStream; use savant_core::primitives::rust::{VideoFrameContent, VideoFrameProxy}; use savant_core::primitives::shutdown::Shutdown; use savant_core::rust::{Pipeline, PropagatedContext}; use savant_core::transport::zeromq::ReaderResult; use savant_core::utils::bytes_to_hex_string; -use savant_core::webserver::get_pipeline; use savant_gstreamer::id_meta::SavantIdMetaKind; use crate::utils::convert_ts; From d2ea148408849acef7bc7694fd8c2d1c235d3942 Mon Sep 17 00:00:00 2001 From: "Ivan A. Kudriavtsev" Date: Wed, 16 Apr 2025 16:08:55 +0200 Subject: [PATCH 9/9] implemented otlp in replay --- Cargo.toml | 2 +- savant_core/src/lib.rs | 2 +- savant_core/src/pipeline.rs | 19 +++++++++++++------ savant_core/src/telemetry.rs | 9 ++++++++- .../replay/replaydb/src/stream_processor.rs | 1 + 5 files changed, 24 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cbf67fe9..c4a8e524 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ default-members = [ ] [workspace.package] -version = "1.0.5" +version = "1.1.0" edition = "2021" authors = ["Ivan Kudriavtsev "] description = "Savant Rust core functions library" diff --git a/savant_core/src/lib.rs b/savant_core/src/lib.rs index 9dddc116..6149d639 100644 --- a/savant_core/src/lib.rs +++ b/savant_core/src/lib.rs @@ -59,7 +59,7 @@ pub fn fast_hash(bytes: &[u8]) -> u32 { #[inline] pub fn get_tracer() -> BoxedTracer { - global::tracer("video_pipeline") + global::tracer("savant-tracer") } pub mod rust { diff --git a/savant_core/src/pipeline.rs b/savant_core/src/pipeline.rs index 139c1ccb..8a616903 100644 --- a/savant_core/src/pipeline.rs +++ b/savant_core/src/pipeline.rs @@ -344,6 +344,7 @@ pub(super) mod implementation { id_counter: AtomicI64, frame_counter: AtomicI64, root_spans: SavantRwLock>, + outer_spans: SavantRwLock>, stages: Vec, frame_locations: SavantRwLock>, frame_ordering: SavantRwLock>, @@ -362,6 +363,7 @@ pub(super) mod implementation { id_counter: AtomicI64::new(0), frame_counter: AtomicI64::new(0), root_spans: SavantRwLock::new(HashMap::new()), + outer_spans: SavantRwLock::new(HashMap::new()), stages: Vec::new(), frame_locations: SavantRwLock::new(HashMap::new()), frame_ordering: SavantRwLock::new(LruCache::new( @@ -639,6 +641,9 @@ pub(super) mod implementation { .write() .insert(id_counter, Context::current_with_span(span)); } + + self.outer_spans.write().insert(id_counter, parent_ctx); + let source_id_compatibility_hash = frame.stream_compatibility_hash(); let mut ordering = self.frame_ordering.write(); let prev_ordering_seq = ordering.get(&source_id); @@ -727,17 +732,18 @@ pub(super) mod implementation { bail!("Object {} is not found in the stage {}", id, stage.name) } - let mut bind = self.root_spans.write(); + // let mut bind = self.root_spans.write(); match removed.unwrap() { PipelinePayload::Frame(frame, _, ctx, _, _) => { self.stats.register_frame(frame.get_object_count()); self.add_frame_json(&frame, &ctx); ctx.span().end(); - let root_ctx = bind.remove(&id).unwrap(); - Ok(HashMap::from([(id, root_ctx)])) + self.root_spans.write().remove(&id).unwrap(); + let outer_ctx = self.outer_spans.write().remove(&id).unwrap(); + Ok(HashMap::from([(id, outer_ctx)])) } PipelinePayload::Batch(batch, _, contexts, _, _) => Ok({ - let mut bind = self.root_spans.write(); + //let mut bind = self.root_spans.write(); contexts .into_iter() .map(|(frame_id, ctx)| { @@ -754,8 +760,9 @@ pub(super) mod implementation { ) } ctx.span().end(); - let root_ctx = bind.remove(&id).unwrap(); - Ok((id, root_ctx)) + self.root_spans.write().remove(&id).unwrap(); + let outer_ctx = self.outer_spans.write().remove(&id).unwrap(); + Ok((id, outer_ctx)) }) .collect::, _>>()? }), diff --git a/savant_core/src/telemetry.rs b/savant_core/src/telemetry.rs index 29f7e4a9..e2cc9899 100644 --- a/savant_core/src/telemetry.rs +++ b/savant_core/src/telemetry.rs @@ -13,6 +13,8 @@ use std::fs; use std::time::Duration; use twelf::{config, Layer}; +use crate::get_or_init_async_runtime; + #[derive(Debug, Serialize, Deserialize, Clone)] pub enum ContextPropagationFormat { #[serde(rename = "jaeger")] @@ -250,7 +252,12 @@ pub fn init(config: &TelemetryConfiguration) { match configurator.get() { Some(_) => panic!("Open Telemetry has been configured"), None => { - let c = Configurator::new("savant", config); + let c = if tokio::runtime::Handle::try_current().is_ok() { + Configurator::new("savant", config) + } else { + let rt = get_or_init_async_runtime(); + rt.block_on(async { Configurator::new("savant", config) }) + }; let result = configurator.set(c); if result.is_err() { // should not happen diff --git a/services/replay/replaydb/src/stream_processor.rs b/services/replay/replaydb/src/stream_processor.rs index 9ca70a38..2f48aa84 100644 --- a/services/replay/replaydb/src/stream_processor.rs +++ b/services/replay/replaydb/src/stream_processor.rs @@ -77,6 +77,7 @@ where } let pipeline = Pipeline::new("replay", stages, conf)?; + pipeline.set_root_span_name("replay")?; Ok(Self { db,