Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions savant_core/src/metrics/pipeline_metric_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::metrics::{get_or_create_counter_family, get_or_create_gauge_family};
use crate::pipeline::get_registered_pipelines;
use crate::rust::FrameProcessingStatRecordType;
use crate::webserver::get_registered_pipelines;
use log::debug;

#[derive(Debug)]
Expand Down Expand Up @@ -29,7 +29,7 @@ impl PipelineMetricBuilder {
let stage_latency_label_names =
["record_type", "destination_stage_name", "source_stage_name"].as_slice();

let registered_pipelines = get_registered_pipelines().await;
let registered_pipelines = get_registered_pipelines();
debug!(
"Found {} registered pipeline(s)",
registered_pipelines.len()
Expand Down
51 changes: 50 additions & 1 deletion savant_core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use std::time::SystemTime;

use anyhow::Result;
use hashbrown::HashMap;
use log::debug;
use log::error;
use log::info;
use opentelemetry::Context;
use parking_lot::Mutex;

pub use implementation::PipelineConfiguration;
pub use implementation::PipelineConfigurationBuilder;
Expand All @@ -15,10 +19,55 @@ use crate::primitives::frame::VideoFrameProxy;
use crate::primitives::frame_batch::VideoFrameBatch;
use crate::primitives::frame_update::VideoFrameUpdate;
use crate::primitives::object::BorrowedVideoObject;
use crate::webserver::{register_pipeline, unregister_pipeline};
use lazy_static::lazy_static;

const MAX_TRACKED_STREAMS: usize = 8192; // defines how many streams are tracked for the frame ordering

//

// pipelines: Arc<Mutex<HashMap<String, Arc<implementation::Pipeline>>>>,

lazy_static! {
static ref PIPELINES: Arc<Mutex<HashMap<String, Arc<implementation::Pipeline>>>> =
Arc::new(Mutex::new(HashMap::new()));
}

pub(crate) fn register_pipeline(pipeline: Arc<implementation::Pipeline>) {
let pipelines = PIPELINES.clone();
let mut bind = pipelines.lock();
let name = pipeline.get_name();
let entry = bind.get(&name);
if entry.is_some() {
let message = format!("Pipeline with name {} already exists in registry.", &name);
error!("{}", message);
panic!("{}", message);
}
bind.insert(name.clone(), pipeline.clone());
info!("Pipeline {} registered.", name);
}

pub(crate) fn unregister_pipeline(pipeline: Arc<implementation::Pipeline>) {
let stats = PIPELINES.clone();
let pipeline_name = pipeline.get_name();
let mut bind = stats.lock();
let prev_len = bind.len();
debug!("Removing pipeline {} from stats.", &pipeline_name);
bind.remove(&pipeline_name);
if bind.len() == prev_len {
error!("Failed to remove pipeline from stats.");
}
}

pub(crate) fn get_registered_pipelines() -> HashMap<String, Arc<implementation::Pipeline>> {
let s = PIPELINES.lock();
s.clone()
}

pub fn get_pipeline(name: &str) -> Option<Arc<implementation::Pipeline>> {
let s = PIPELINES.lock();
s.get(name).cloned()
}

pub mod stage;
pub mod stage_function_loader;
pub mod stage_plugin_sample;
Expand Down
4 changes: 1 addition & 3 deletions savant_core/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::get_or_init_async_runtime;
use log::error;
use opentelemetry::global;
use opentelemetry_jaeger_propagator::Propagator;
Expand Down Expand Up @@ -251,8 +250,7 @@ pub fn init(config: &TelemetryConfiguration) {
match configurator.get() {
Some(_) => panic!("Open Telemetry has been configured"),
None => {
let runtime = get_or_init_async_runtime();
let c = runtime.block_on(async { Configurator::new("savant", config) });
let c = Configurator::new("savant", config);
let result = configurator.set(c);
if result.is_err() {
// should not happen
Expand Down
55 changes: 3 additions & 52 deletions savant_core/src/webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tokio::sync::Mutex;
use crate::get_or_init_async_runtime;
use crate::metrics::metric_collector::SystemMetricCollector;
use crate::metrics::pipeline_metric_builder::PipelineMetricBuilder;
use crate::pipeline::implementation;
use crate::primitives::rust::AttributeSet;
use crate::primitives::Attribute;
use crate::protobuf::ToProtobuf;
Expand All @@ -25,7 +24,7 @@ use anyhow::bail;
use futures_util::StreamExt;
use hashbrown::HashMap;
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
use log::{error, info, warn};
use moka::future::Cache;
use moka::Expiry;
use prometheus_client::encoding::text::encode;
Expand Down Expand Up @@ -78,7 +77,6 @@ pub struct KvsOperation {

#[allow(clippy::type_complexity)]
struct WsData {
pipelines: Arc<Mutex<HashMap<String, Arc<implementation::Pipeline>>>>,
status: Arc<Mutex<PipelineStatus>>,
shutdown_token: Arc<OnceLock<String>>,
shutdown_status: Arc<OnceLock<bool>>,
Expand Down Expand Up @@ -131,7 +129,6 @@ impl WsData {
}
});
WsData {
pipelines: Arc::new(Mutex::new(HashMap::new())),
status: Arc::new(Mutex::new(PipelineStatus::Stopped)),
shutdown_token: Arc::new(OnceLock::new()),
shutdown_status: Arc::new(OnceLock::new()),
Expand Down Expand Up @@ -228,52 +225,6 @@ lazy_static! {
static ref PID: Mutex<i32> = Mutex::new(0);
}

pub(crate) fn register_pipeline(pipeline: Arc<implementation::Pipeline>) {
let runtime = get_or_init_async_runtime();
let pipelines = WS_DATA.pipelines.clone();
runtime.block_on(async move {
let mut bind = pipelines.lock().await;
let name = pipeline.get_name();
let entry = bind.get(&name);
if entry.is_some() {
let message = format!("Pipeline with name {} already exists in registry.", &name);
error!("{}", message);
panic!("{}", message);
}
bind.insert(name.clone(), pipeline.clone());
info!("Pipeline {} registered.", name);
});
}

pub(crate) fn unregister_pipeline(pipeline: Arc<implementation::Pipeline>) {
let runtime = get_or_init_async_runtime();
let stats = WS_DATA.pipelines.clone();
let pipeline_name = pipeline.get_name();
runtime.block_on(async move {
let mut bind = stats.lock().await;
let prev_len = bind.len();
debug!("Removing pipeline {} from stats.", &pipeline_name);
bind.remove(&pipeline_name);
if bind.len() == prev_len {
error!("Failed to remove pipeline from stats.");
}
});
}

pub(crate) async fn get_registered_pipelines() -> HashMap<String, Arc<implementation::Pipeline>> {
let s = WS_DATA.pipelines.lock().await;
s.clone()
}

pub fn get_pipeline(name: &str) -> Option<Arc<implementation::Pipeline>> {
let runtime = get_or_init_async_runtime();
let pipelines = WS_DATA.pipelines.clone();
runtime.block_on(async {
let bind = pipelines.lock().await;
bind.get(name).cloned()
})
}

pub fn set_status(s: PipelineStatus) -> anyhow::Result<()> {
WS_DATA.set_status(s)
}
Expand Down Expand Up @@ -552,10 +503,10 @@ mod tests {
set_extra_labels,
};
use crate::pipeline::implementation::create_test_pipeline;
use crate::pipeline::register_pipeline;
use crate::test::gen_frame;
use crate::webserver::{
init_webserver, register_pipeline, set_shutdown_token, set_status, stop_webserver,
PipelineStatus,
init_webserver, set_shutdown_token, set_status, stop_webserver, PipelineStatus,
};
use hashbrown::HashMap;
use prometheus_client::registry::Unit;
Expand Down
15 changes: 7 additions & 8 deletions savant_protobuf/src/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,11 @@ pub struct ResultingSize {
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct VideoFrameTransformation {
#[prost(oneof = "video_frame_transformation::Transformation", tags = "1, 2, 3, 4")]
pub transformation: ::core::option::Option<
video_frame_transformation::Transformation,
>,
#[prost(
oneof = "video_frame_transformation::Transformation",
tags = "1, 2, 3, 4"
)]
pub transformation: ::core::option::Option<video_frame_transformation::Transformation>,
}
/// Nested message and enum types in `VideoFrameTransformation`.
pub mod video_frame_transformation {
Expand Down Expand Up @@ -416,10 +417,8 @@ pub struct Message {
#[prost(string, repeated, tag = "2")]
pub routing_labels: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(map = "string, string", tag = "3")]
pub propagated_context: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
pub propagated_context:
::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
#[prost(uint64, tag = "4")]
pub seq_id: u64,
#[prost(oneof = "message::Content", tags = "5, 6, 7, 8, 9, 10, 11")]
Expand Down
8 changes: 7 additions & 1 deletion services/replay/replay/assets/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
"send_hwm": 1000,
"receive_hwm": 100,
"inflight_ops": 100
}
},
"stats_frame_period": null,
"stats_timestamp_period": {
"secs": 10,
"nanos": 0
},
"telemetry_config_file": null
},
"in_stream": {
"url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}",
Expand Down
1 change: 1 addition & 0 deletions services/replay/replaydb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bincode = { workspace = true }
derive_builder = { workspace = true }
env_logger = { workspace = true }
hashbrown = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
md-5 = { workspace = true }
mini-moka = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion services/replay/replaydb/assets/rocksdb.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
"job_eviction_ttl": {
"secs": 60,
"nanos": 0
}
},
"stats_frame_period": null,
"stats_timestamp_period": {
"secs": 10,
"nanos": 0
},
"telemetry_config_file": null
},
"in_stream": {
"url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}",
Expand Down
8 changes: 7 additions & 1 deletion services/replay/replaydb/assets/rocksdb_opt_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
"job_eviction_ttl": {
"secs": 60,
"nanos": 0
}
},
"stats_frame_period": null,
"stats_timestamp_period": {
"secs": 10,
"nanos": 0
},
"telemetry_config_file": null
},
"in_stream": {
"url": "router+bind:ipc:///tmp/${SOCKET_PATH_IN:-undefined}",
Expand Down
11 changes: 10 additions & 1 deletion services/replay/replaydb/src/service/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::job_writer::{SinkConfiguration, SinkOptions};
use anyhow::{bail, Result};
use savant_core::transport::zeromq::{NonBlockingReader, ReaderConfigBuilder};
use savant_core::{
telemetry::init_from_file,
transport::zeromq::{NonBlockingReader, ReaderConfigBuilder},
};
use serde::{Deserialize, Serialize};
use std::result;
use std::time::Duration;
Expand Down Expand Up @@ -50,6 +53,9 @@ pub struct CommonConfiguration {
pub job_writer_cache_ttl: Duration,
pub job_eviction_ttl: Duration,
pub default_job_sink_options: Option<SinkOptions>,
pub telemetry_config_file: Option<String>,
pub stats_frame_period: Option<i64>,
pub stats_timestamp_period: Option<Duration>,
}

#[config]
Expand All @@ -66,6 +72,9 @@ impl ServiceConfiguration {
if self.common.management_port <= 1024 {
bail!("Management port must be set to a value greater than 1024!");
}
if let Some(telemetry_config_file) = &self.common.telemetry_config_file {
init_from_file(telemetry_config_file.as_str());
}
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions services/replay/replaydb/src/service/rocksdb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl TryFrom<&ServiceConfiguration> for RocksDbStreamProcessor {
output,
configuration.common.stats_period,
configuration.common.pass_metadata_only,
configuration.common.stats_frame_period,
configuration
.common
.stats_timestamp_period
.map(|d| d.as_millis() as i64),
))
}
}
Expand Down
Loading
Loading